feat: add npx CLI with monorepo, CI/CD, and ephemeral worker architecture (#256)

* feat: integrate npx CLI, CI/CD, and ephemeral worker architecture

Bring in changes from shannon-npx: npx-distributable CLI package (cli/),
semantic-release CI/CD workflows, ephemeral per-scan worker containers,
TOML config support, setup wizard, and workspace management.

Preserves all shannon-only changes: security hardening (localhost-bound
ports, MCP env allowlist, path traversal guard), updated benchmarks
(XBEN 19/31/35/44), README assets, and prompt injection disclaimer.

Applies security hardening to cli/infra/compose.yml as well.

* refactor: migrate to Turborepo + pnpm + Biome monorepo

Restructure into apps/worker, apps/cli, packages/mcp-server with
Turborepo task orchestration, pnpm workspaces, Biome linting/formatting,
and tsdown CLI bundling.

Key changes:
- src/ -> apps/worker/src/, cli/ -> apps/cli/, mcp-server/ -> packages/mcp-server/
- prompts/ and configs/ moved into apps/worker/
- npm replaced with pnpm, package-lock.json replaced with pnpm-lock.yaml
- Dockerfile updated for pnpm-based builds
- CLI logs command rewritten with chokidar for cross-platform reliability
- Router health checking added for auto-detected router mode
- Centralized path resolution via apps/worker/src/paths.ts

* fix: resolve all biome warnings and formatting issues

- Remove unnecessary non-null assertions where values are guaranteed
- Replace array index access with .at() for safer element retrieval
- Use local variables to avoid repeated process.env lookups
- Replace any types with unknown in functional utilities
- Use nullish coalescing for TOTP hash byte access
- Auto-format security patches to match biome config

* fix: pin pnpm to 10.12.1 in Dockerfile for catalog support

* fix: handle Esc cancellation in Bedrock setup flow

Replace p.group() with individual prompts and per-field cancel checks,
matching the pattern used by all other provider setup flows.

* feat: add optional model customization to Anthropic setup

* fix: resolve Docker bind mount permission errors on Linux

Use entrypoint-based UID remapping instead of --user flag so the
container's pentest user matches the host UID/GID, keeping bind-mounted
volumes writable. Git config moved to --system level to survive remapping.

* fix: show resumed workflow ID in splash screen URL

When resuming a workflow, the Temporal Web UI link pointed to the old
(terminated) workflow ID. Now extracts "New Workflow ID" from the resume
header in workflow.log, falling back to the original ID for fresh scans.

* style: fix biome formatting in docker.ts

* fix: align TypeScript config types with JSON Schema

- SuccessCondition.type: use schema values (url_contains,
  element_present, url_equals_exactly, text_contains) instead of
  stale values (url, cookie, element, redirect)
- Authentication.login_flow: mark optional to match schema which
  does not require it

* feat: mark GitHub release as latest during rollback

* fix: use native ARM64 runners for Docker multi-platform builds

Replace QEMU emulation with parallel native builds using a matrix
strategy (ubuntu-latest for amd64, ubuntu-24.04-arm for arm64).
Each platform pushes by digest, then a merge job creates the
multi-arch manifest list before signing with cosign.

* fix: resolve SessionMutex race condition with 3+ concurrent waiters

* fix: skip POSIX permission check on Windows

writeFileSync mode option is ignored on Windows, so config.toml
gets 0o666 and the guard rejects it.

* fix: resolve unsubstituted placeholders in report prompt

Remove unused {{GITHUB_URL}} placeholder and wire up {{AUTH_CONTEXT}}
with structured auth context (login type, username, URL, MFA status).

* fix: remove duplicate environment gate from merge-docker job

Move DOCKERHUB_USERNAME from vars to secrets so merge-docker can access
credentials without its own environment scope. This eliminates the
redundant double approval since build-docker already gates on
release-publish.

* fix: replace POSIX sleep binary with cross-platform async sleep

execFileSync('sleep') is unavailable on Windows. Use node:timers/promises
setTimeout instead, making ensureInfra async.

* fix: use session.json for workflow ID on resume instead of parsing workflow.log

On resume, workflow.log already exists with stale headers from the
previous run. The CLI poll found '====' immediately and extracted the
old workflow ID, producing a wrong Temporal Web UI URL.

Read the workflow ID from session.json instead — the worker writes
resume attempts there atomically. For fresh runs, poll until
originalWorkflowId appears. For resumes, poll until a new
resumeAttempts entry is appended.

* feat: add custom base URL support for Anthropic-compatible proxies

Support ANTHROPIC_BASE_URL + ANTHROPIC_AUTH_TOKEN to route SDK requests
through LiteLLM or any Anthropic-compatible proxy. Adds TUI wizard
option, TOML config mapping, credential validation, and preflight
endpoint reachability check via SDK query.

* fix: remove environment gates and add NPM_TOKEN to publish step

* feat: add beta release and rollback workflows with cosign signing

* fix: remove redundant checkout and pnpm steps from beta release workflow

* docs: normalize README commands to mode-neutral shorthand

Add a substitution note after Quick Start sections so all subsequent
examples use bare `shannon` instead of mixing `./shannon` and
`npx @keygraph/shannon`. Mode-specific commands (build, update,
uninstall) get inline annotations. Also fixes a broken command in the
Custom Base URL section.

* fix: remove redundant `update` command

Image is already auto-pulled by `ensureImage()` during `start` when the
pinned version tag is missing locally. Manual `update` was unnecessary.

* docs: add CLI package README stub

* docs: update README setup instructions for dual CLI modes

* docs: update announcement banner to npx availability

* feat: migrate from MCP tools to CLI based tools (#252)

* feat: migrate from MCP tools to CLI tools

* fix: restore browser action emoji formatters for CLI output

Adapt formatBrowserAction for playwright-cli commands, replacing the old
mcp__playwright__browser_* tool name matching removed during migration.

* fix: mount credential file to fixed container path for Vertex AI

GOOGLE_APPLICATION_CREDENTIALS was forwarded as-is to the container,
causing the relative host path to resolve against the repo mount
instead of the credentials mount. Now both local and npx modes mount
the resolved file to /app/credentials/google-sa-key.json and rewrite
the env var to match.

* feat: add git awareness and optional description field to config

* fix: drop redundant --ipc host flag from worker container

* fix: align announcement banner URL with main branch

* feat: add target URL reachability preflight check (#254)

* Moving asset benchmark graph image to this folder

* Move benchmark results to benchmark repo

Windows Defender flags exploit code in the pentest reports as false positives, forcing every Windows user to add a Defender exclusion just to clone Shannon.

* Updated README

* fix: case-insensitive grep for semantic-release version probe

* fix: harden supply chain security (#255)

* fix: patch smol-toml and tsdown vulnerabilities

Update smol-toml 1.6.0→1.6.1 (DoS via recursive comment parsing) and
tsdown 0.21.2→0.21.5 (picomatch ReDoS + method injection).

* fix: pin all unpinned dependency versions in Dockerfile

Pins subfinder v2.13.0, WhatWeb v0.6.3 (switched from git clone to
release tarball), schemathesis 4.13.0, addressable 2.8.9,
claude-code 2.1.84, and playwright-cli 0.1.1 for reproducible builds.

* fix: pin GitHub Actions to commit SHAs for supply chain security

* fix: pin GitHub Actions to commit SHAs in beta and rollback workflows
This commit is contained in:
ezl-keygraph
2026-03-27 02:34:29 +05:30
committed by GitHub
parent 0d172f5e32
commit bc8fd203ed
4058 changed files with 7774 additions and 1189080 deletions
+646
View File
@@ -0,0 +1,646 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Temporal activities for Shannon agent execution.
*
* Each activity wraps service calls with Temporal-specific concerns:
* - Heartbeat loop (2s interval) to signal worker liveness
* - Error classification into ApplicationFailure
* - Container lifecycle management
*
* Business logic is delegated to services in src/services/.
*/
import fs from 'node:fs/promises';
import path from 'node:path';
import { ApplicationFailure, Context, heartbeat } from '@temporalio/activity';
import { AuditSession } from '../audit/index.js';
import type { ResumeAttempt } from '../audit/metrics-tracker.js';
import { copyDeliverablesToAudit, type SessionMetadata } from '../audit/utils.js';
import type { WorkflowSummary } from '../audit/workflow-logger.js';
import { getContainer, getOrCreateContainer, removeContainer } from '../services/container.js';
import { classifyErrorForTemporal, PentestError } from '../services/error-handling.js';
import { ExploitationCheckerService } from '../services/exploitation-checker.js';
import { executeGitCommandWithRetry } from '../services/git-manager.js';
import { runPreflightChecks } from '../services/preflight.js';
import type { ExploitationDecision, VulnType } from '../services/queue-validation.js';
import { assembleFinalReport, injectModelIntoReport } from '../services/reporting.js';
import { AGENTS } from '../session-manager.js';
import type { AgentName } from '../types/agents.js';
import { ALL_AGENTS } from '../types/agents.js';
import { ErrorCode } from '../types/errors.js';
import { isErr } from '../types/result.js';
import { fileExists, readJson } from '../utils/file-io.js';
import { createActivityLogger } from './activity-logger.js';
import type { AgentMetrics, ResumeState } from './shared.js';
// Max lengths to prevent Temporal protobuf buffer overflow
const MAX_ERROR_MESSAGE_LENGTH = 2000;
const MAX_STACK_TRACE_LENGTH = 1000;
// Max retries for output validation errors (agent didn't save deliverables)
const MAX_OUTPUT_VALIDATION_RETRIES = 3;
const HEARTBEAT_INTERVAL_MS = 2000;
/**
* Input for all agent activities.
*/
export interface ActivityInput {
webUrl: string;
repoPath: string;
configPath?: string;
outputPath?: string;
pipelineTestingMode?: boolean;
workflowId: string;
sessionId: string;
}
/**
* Truncate error message to prevent buffer overflow in Temporal serialization.
*/
function truncateErrorMessage(message: string): string {
if (message.length <= MAX_ERROR_MESSAGE_LENGTH) {
return message;
}
return `${message.slice(0, MAX_ERROR_MESSAGE_LENGTH - 20)}\n[truncated]`;
}
/**
* Truncate stack trace on an ApplicationFailure to prevent buffer overflow.
*/
function truncateStackTrace(failure: ApplicationFailure): void {
if (failure.stack && failure.stack.length > MAX_STACK_TRACE_LENGTH) {
failure.stack = `${failure.stack.slice(0, MAX_STACK_TRACE_LENGTH)}\n[stack truncated]`;
}
}
/**
* Build SessionMetadata from ActivityInput.
*/
function buildSessionMetadata(input: ActivityInput): SessionMetadata {
const { webUrl, repoPath, outputPath, sessionId } = input;
return {
id: sessionId,
webUrl,
repoPath,
...(outputPath && { outputPath }),
};
}
/**
* Core activity implementation using services.
*
* Executes a single agent with:
* 1. Heartbeat loop for worker liveness
* 2. Container creation/reuse
* 3. Service-based agent execution
* 4. Error classification for Temporal retry
*/
async function runAgentActivity(agentName: AgentName, input: ActivityInput): Promise<AgentMetrics> {
const { repoPath, configPath, pipelineTestingMode = false, workflowId, webUrl } = input;
const startTime = Date.now();
const attemptNumber = Context.current().info.attempt;
// Heartbeat loop - signals worker is alive to Temporal server
const heartbeatInterval = setInterval(() => {
const elapsed = Math.floor((Date.now() - startTime) / 1000);
heartbeat({ agent: agentName, elapsedSeconds: elapsed, attempt: attemptNumber });
}, HEARTBEAT_INTERVAL_MS);
try {
const logger = createActivityLogger();
// 1. Build session metadata and get/create container
const sessionMetadata = buildSessionMetadata(input);
const container = getOrCreateContainer(workflowId, sessionMetadata);
// 2. Create audit session for THIS agent execution
// NOTE: Each agent needs its own AuditSession because AuditSession uses
// instance state (currentAgentName) that cannot be shared across parallel agents
const auditSession = new AuditSession(sessionMetadata);
await auditSession.initialize(workflowId);
// 3. Execute agent via service (throws PentestError on failure)
const endResult = await container.agentExecution.executeOrThrow(
agentName,
{
webUrl,
repoPath,
configPath,
pipelineTestingMode,
attemptNumber,
},
auditSession,
logger,
);
// 4. Return metrics
return {
durationMs: Date.now() - startTime,
inputTokens: null,
outputTokens: null,
costUsd: endResult.cost_usd,
numTurns: null,
model: endResult.model,
};
} catch (error) {
// If error is already an ApplicationFailure, re-throw directly
if (error instanceof ApplicationFailure) {
throw error;
}
// Check if output validation retry limit reached (PentestError with code)
if (
error instanceof PentestError &&
error.code === ErrorCode.OUTPUT_VALIDATION_FAILED &&
attemptNumber >= MAX_OUTPUT_VALIDATION_RETRIES
) {
throw ApplicationFailure.nonRetryable(
`Agent ${agentName} failed output validation after ${attemptNumber} attempts`,
'OutputValidationError',
[{ agentName, attemptNumber, elapsed: Date.now() - startTime }],
);
}
// Classify error for Temporal retry behavior
const classified = classifyErrorForTemporal(error);
const rawMessage = error instanceof Error ? error.message : String(error);
const message = truncateErrorMessage(rawMessage);
if (classified.retryable) {
const failure = ApplicationFailure.create({
message,
type: classified.type,
details: [{ agentName, attemptNumber, elapsed: Date.now() - startTime }],
});
truncateStackTrace(failure);
throw failure;
} else {
const failure = ApplicationFailure.nonRetryable(message, classified.type, [
{ agentName, attemptNumber, elapsed: Date.now() - startTime },
]);
truncateStackTrace(failure);
throw failure;
}
} finally {
clearInterval(heartbeatInterval);
}
}
export async function runPreReconAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('pre-recon', input);
}
export async function runReconAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('recon', input);
}
export async function runInjectionVulnAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('injection-vuln', input);
}
export async function runXssVulnAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('xss-vuln', input);
}
export async function runAuthVulnAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('auth-vuln', input);
}
export async function runSsrfVulnAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('ssrf-vuln', input);
}
export async function runAuthzVulnAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('authz-vuln', input);
}
export async function runInjectionExploitAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('injection-exploit', input);
}
export async function runXssExploitAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('xss-exploit', input);
}
export async function runAuthExploitAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('auth-exploit', input);
}
export async function runSsrfExploitAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('ssrf-exploit', input);
}
export async function runAuthzExploitAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('authz-exploit', input);
}
export async function runReportAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('report', input);
}
/**
* Preflight validation activity.
*
* Runs cheap checks before any agent execution:
* 1. Repository path exists with .git
* 2. Config file validates (if provided)
* 3. Credential validation (API key, OAuth, or router mode)
* 4. Target URL reachable from the container
*
* NOT using runAgentActivity — preflight doesn't run an agent via the SDK.
*/
export async function runPreflightValidation(input: ActivityInput): Promise<void> {
const startTime = Date.now();
const attemptNumber = Context.current().info.attempt;
const heartbeatInterval = setInterval(() => {
const elapsed = Math.floor((Date.now() - startTime) / 1000);
heartbeat({ phase: 'preflight', elapsedSeconds: elapsed, attempt: attemptNumber });
}, HEARTBEAT_INTERVAL_MS);
try {
const logger = createActivityLogger();
logger.info('Running preflight validation...', { attempt: attemptNumber });
const result = await runPreflightChecks(input.webUrl, input.repoPath, input.configPath, logger);
if (isErr(result)) {
const classified = classifyErrorForTemporal(result.error);
const message = truncateErrorMessage(result.error.message);
if (classified.retryable) {
const failure = ApplicationFailure.create({
message,
type: classified.type,
details: [{ phase: 'preflight', attemptNumber, elapsed: Date.now() - startTime }],
});
truncateStackTrace(failure);
throw failure;
} else {
const failure = ApplicationFailure.nonRetryable(message, classified.type, [
{ phase: 'preflight', attemptNumber, elapsed: Date.now() - startTime },
]);
truncateStackTrace(failure);
throw failure;
}
}
logger.info('Preflight validation passed');
} catch (error) {
if (error instanceof ApplicationFailure) {
throw error;
}
const classified = classifyErrorForTemporal(error);
const rawMessage = error instanceof Error ? error.message : String(error);
const message = truncateErrorMessage(rawMessage);
const failure = ApplicationFailure.nonRetryable(message, classified.type, [
{ phase: 'preflight', attemptNumber, elapsed: Date.now() - startTime },
]);
truncateStackTrace(failure);
throw failure;
} finally {
clearInterval(heartbeatInterval);
}
}
/**
* Assemble the final report by concatenating exploitation evidence files.
*/
export async function assembleReportActivity(input: ActivityInput): Promise<void> {
const { repoPath } = input;
const logger = createActivityLogger();
logger.info('Assembling deliverables from specialist agents...');
try {
await assembleFinalReport(repoPath, logger);
} catch (error) {
const err = error as Error;
logger.warn(`Error assembling final report: ${err.message}`);
}
}
/**
* Inject model metadata into the final report.
*/
export async function injectReportMetadataActivity(input: ActivityInput): Promise<void> {
const { repoPath, sessionId, outputPath } = input;
const logger = createActivityLogger();
const effectiveOutputPath = outputPath ? path.join(outputPath, sessionId) : path.join('./workspaces', sessionId);
try {
await injectModelIntoReport(repoPath, effectiveOutputPath, logger);
} catch (error) {
const err = error as Error;
logger.warn(`Error injecting model into report: ${err.message}`);
}
}
/**
* Check if exploitation should run for a given vulnerability type.
*
* Uses existing container if available (from prior agent runs),
* otherwise creates service directly (stateless, no dependencies).
*/
export async function checkExploitationQueue(input: ActivityInput, vulnType: VulnType): Promise<ExploitationDecision> {
const { repoPath, workflowId } = input;
const logger = createActivityLogger();
// Reuse container's service if available (from prior vuln agent runs)
const existingContainer = getContainer(workflowId);
const checker = existingContainer?.exploitationChecker ?? new ExploitationCheckerService();
return checker.checkQueue(vulnType, repoPath, logger);
}
interface SessionJson {
session: {
id: string;
webUrl: string;
repoPath?: string;
originalWorkflowId?: string;
resumeAttempts?: ResumeAttempt[];
};
metrics: {
agents: Record<
string,
{
status: 'in-progress' | 'success' | 'failed';
checkpoint?: string;
}
>;
};
}
/**
* Load resume state from an existing workspace.
*/
export async function loadResumeState(
workspaceName: string,
expectedUrl: string,
expectedRepoPath: string,
): Promise<ResumeState> {
// 1. Validate workspace exists
const sessionPath = path.join('./workspaces', workspaceName, 'session.json');
const exists = await fileExists(sessionPath);
if (!exists) {
throw ApplicationFailure.nonRetryable(
`Workspace not found: ${workspaceName}\nExpected path: ${sessionPath}`,
'WorkspaceNotFoundError',
);
}
// 2. Parse session.json and validate URL match
let session: SessionJson;
try {
session = await readJson<SessionJson>(sessionPath);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
throw ApplicationFailure.nonRetryable(
`Corrupted session.json in workspace ${workspaceName}: ${errorMsg}`,
'CorruptedSessionError',
);
}
if (session.session.webUrl !== expectedUrl) {
throw ApplicationFailure.nonRetryable(
`URL mismatch with workspace\n Workspace URL: ${session.session.webUrl}\n Provided URL: ${expectedUrl}`,
'URLMismatchError',
);
}
// 3. Cross-check agent status with deliverables on disk
const completedAgents: string[] = [];
const agents = session.metrics.agents;
for (const agentName of ALL_AGENTS) {
const agentData = agents[agentName];
if (!agentData || agentData.status !== 'success') {
continue;
}
const deliverableFilename = AGENTS[agentName].deliverableFilename;
const deliverablePath = `${expectedRepoPath}/deliverables/${deliverableFilename}`;
const deliverableExists = await fileExists(deliverablePath);
if (!deliverableExists) {
const logger = createActivityLogger();
logger.warn(`Agent ${agentName} shows success but deliverable missing, will re-run`);
continue;
}
completedAgents.push(agentName);
}
// 4. Collect git checkpoints and validate at least one exists
const checkpoints = completedAgents
.map((name) => agents[name]?.checkpoint)
.filter((hash): hash is string => hash != null);
if (checkpoints.length === 0) {
const successAgents = Object.entries(agents)
.filter(([, data]) => data.status === 'success')
.map(([name]) => name);
throw ApplicationFailure.nonRetryable(
`Cannot resume workspace ${workspaceName}: ` +
(successAgents.length > 0
? `${successAgents.length} agent(s) show success in session.json (${successAgents.join(', ')}) ` +
`but their deliverable files are missing from disk. ` +
`Start a fresh run instead.`
: `No agents completed successfully. Start a fresh run instead.`),
'NoCheckpointsError',
);
}
// 5. Find the most recent checkpoint commit
const checkpointHash = await findLatestCommit(expectedRepoPath, checkpoints);
const originalWorkflowId = session.session.originalWorkflowId || session.session.id;
// 6. Log summary and return resume state
const logger = createActivityLogger();
logger.info('Resume state loaded', {
workspace: workspaceName,
completedAgents: completedAgents.length,
checkpoint: checkpointHash,
});
return {
workspaceName,
originalUrl: session.session.webUrl,
completedAgents,
checkpointHash,
originalWorkflowId,
};
}
async function findLatestCommit(repoPath: string, commitHashes: string[]): Promise<string> {
if (commitHashes.length === 1) {
const hash = commitHashes[0];
if (!hash) {
throw new PentestError(
'Empty commit hash in array',
'filesystem',
false, // Non-retryable - corrupt workspace state
{ phase: 'resume' },
ErrorCode.GIT_CHECKPOINT_FAILED,
);
}
return hash;
}
const result = await executeGitCommandWithRetry(
['git', 'rev-list', '--max-count=1', ...commitHashes],
repoPath,
'find latest commit',
);
return result.stdout.trim();
}
/**
* Restore git workspace to a checkpoint and clean up partial deliverables.
*/
export async function restoreGitCheckpoint(
repoPath: string,
checkpointHash: string,
incompleteAgents: AgentName[],
): Promise<void> {
const logger = createActivityLogger();
logger.info(`Restoring git workspace to ${checkpointHash}...`);
await executeGitCommandWithRetry(
['git', 'reset', '--hard', checkpointHash],
repoPath,
'reset to checkpoint for resume',
);
await executeGitCommandWithRetry(['git', 'clean', '-fd'], repoPath, 'clean untracked files for resume');
for (const agentName of incompleteAgents) {
const deliverableFilename = AGENTS[agentName].deliverableFilename;
const deliverablePath = `${repoPath}/deliverables/${deliverableFilename}`;
try {
const exists = await fileExists(deliverablePath);
if (exists) {
logger.warn(`Cleaning partial deliverable: ${agentName}`);
await fs.unlink(deliverablePath);
}
} catch (error) {
logger.info(`Note: Failed to delete ${deliverablePath}: ${error}`);
}
}
logger.info('Workspace restored to clean state');
}
/**
* Record a resume attempt in session.json and write resume header to workflow.log.
*/
export async function recordResumeAttempt(
input: ActivityInput,
terminatedWorkflows: string[],
checkpointHash: string,
previousWorkflowId: string,
completedAgents: string[],
): Promise<void> {
const sessionMetadata = buildSessionMetadata(input);
const auditSession = new AuditSession(sessionMetadata);
await auditSession.initialize();
// Update session.json with resume attempt
await auditSession.addResumeAttempt(input.workflowId, terminatedWorkflows, checkpointHash);
// Write resume header to workflow.log
await auditSession.logResumeHeader({
previousWorkflowId,
newWorkflowId: input.workflowId,
checkpointHash,
completedAgents,
});
}
/**
* Log phase transition to the unified workflow log.
*/
export async function logPhaseTransition(
input: ActivityInput,
phase: string,
event: 'start' | 'complete',
): Promise<void> {
const sessionMetadata = buildSessionMetadata(input);
const auditSession = new AuditSession(sessionMetadata);
await auditSession.initialize(input.workflowId);
if (event === 'start') {
await auditSession.logPhaseStart(phase);
} else {
await auditSession.logPhaseComplete(phase);
}
}
/**
* Log workflow completion with full summary.
* Cleans up container when done.
*/
export async function logWorkflowComplete(input: ActivityInput, summary: WorkflowSummary): Promise<void> {
const { repoPath, workflowId } = input;
const sessionMetadata = buildSessionMetadata(input);
// 1. Initialize audit session and mark final status
const auditSession = new AuditSession(sessionMetadata);
await auditSession.initialize(workflowId);
await auditSession.updateSessionStatus(summary.status);
// 2. Load cumulative metrics from session.json
const sessionData = (await auditSession.getMetrics()) as {
metrics: {
total_duration_ms: number;
total_cost_usd: number;
agents: Record<string, { final_duration_ms: number; total_cost_usd: number }>;
};
};
// 3. Fill in metrics for skipped agents (resumed from previous run)
const agentMetrics = { ...summary.agentMetrics };
for (const agentName of summary.completedAgents) {
if (!agentMetrics[agentName]) {
const agentData = sessionData.metrics.agents[agentName];
if (agentData) {
agentMetrics[agentName] = {
durationMs: agentData.final_duration_ms,
costUsd: agentData.total_cost_usd,
};
}
}
}
// 4. Build cumulative summary with cross-run totals
const cumulativeSummary: WorkflowSummary = {
...summary,
totalDurationMs: sessionData.metrics.total_duration_ms,
totalCostUsd: sessionData.metrics.total_cost_usd,
agentMetrics,
};
// 5. Write completion entry to workflow.log
await auditSession.logWorkflowComplete(cumulativeSummary);
// 6. Copy deliverables to workspaces
try {
await copyDeliverablesToAudit(sessionMetadata, repoPath);
} catch (copyErr) {
const logger = createActivityLogger();
logger.error('Failed to copy deliverables to workspaces', {
error: copyErr instanceof Error ? copyErr.message : String(copyErr),
});
}
// 7. Clean up container
removeContainer(workflowId);
}
@@ -0,0 +1,34 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
import { Context } from '@temporalio/activity';
import type { ActivityLogger } from '../types/activity-logger.js';
/**
* ActivityLogger backed by Temporal's Context.current().log.
* Must be called inside a running Temporal activity — throws otherwise.
*/
export class TemporalActivityLogger implements ActivityLogger {
info(message: string, attrs?: Record<string, unknown>): void {
Context.current().log.info(message, attrs ?? {});
}
warn(message: string, attrs?: Record<string, unknown>): void {
Context.current().log.warn(message, attrs ?? {});
}
error(message: string, attrs?: Record<string, unknown>): void {
Context.current().log.error(message, attrs ?? {});
}
}
/**
* Create an ActivityLogger. Must be called inside a Temporal activity.
* Throws if called outside an activity context.
*/
export function createActivityLogger(): ActivityLogger {
return new TemporalActivityLogger();
}
+66
View File
@@ -0,0 +1,66 @@
import { defineQuery } from '@temporalio/workflow';
export type { AgentMetrics } from '../types/metrics.js';
import type { PipelineConfig } from '../types/config.js';
import type { AgentMetrics } from '../types/metrics.js';
export interface PipelineInput {
webUrl: string;
repoPath: string;
configPath?: string;
outputPath?: string;
pipelineTestingMode?: boolean;
pipelineConfig?: PipelineConfig;
workflowId?: string; // Used for audit correlation
sessionId?: string; // Workspace directory name (distinct from workflowId for named workspaces)
resumeFromWorkspace?: string; // Workspace name to resume from
terminatedWorkflows?: string[]; // Workflows terminated during resume
}
export interface ResumeState {
workspaceName: string;
originalUrl: string;
completedAgents: string[];
checkpointHash: string;
originalWorkflowId: string;
}
export interface PipelineSummary {
totalCostUsd: number;
totalDurationMs: number; // Wall-clock time (end - start)
totalTurns: number;
agentCount: number;
}
export interface PipelineState {
status: 'running' | 'completed' | 'failed';
currentPhase: string | null;
currentAgent: string | null;
completedAgents: string[];
failedAgent: string | null;
error: string | null;
startTime: number;
agentMetrics: Record<string, AgentMetrics>;
summary: PipelineSummary | null;
}
// Extended state returned by getProgress query (includes computed fields)
export interface PipelineProgress extends PipelineState {
workflowId: string;
elapsedMs: number;
}
// Result from a single vuln→exploit pipeline
export interface VulnExploitPipelineResult {
vulnType: string;
vulnMetrics: AgentMetrics | null;
exploitMetrics: AgentMetrics | null;
exploitDecision: {
shouldExploit: boolean;
vulnerabilityCount: number;
} | null;
error: string | null;
}
export const getProgress = defineQuery<PipelineProgress>('getProgress');
@@ -0,0 +1,39 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Maps PipelineState to WorkflowSummary for audit logging.
* Pure function with no side effects.
*/
import type { WorkflowSummary } from '../audit/workflow-logger.js';
import type { PipelineState } from './shared.js';
/**
* Maps PipelineState to WorkflowSummary.
*
* This function is deterministic (no Date.now() or I/O) so it can be
* safely imported into Temporal workflows. The caller must ensure
* state.summary is set before calling (via computeSummary).
*/
export function toWorkflowSummary(state: PipelineState, status: 'completed' | 'failed'): WorkflowSummary {
// state.summary must be computed before calling this mapper
const summary = state.summary;
if (!summary) {
throw new Error('toWorkflowSummary: state.summary must be set before calling');
}
return {
status,
totalDurationMs: summary.totalDurationMs,
totalCostUsd: summary.totalCostUsd,
completedAgents: state.completedAgents,
agentMetrics: Object.fromEntries(
Object.entries(state.agentMetrics).map(([name, m]) => [name, { durationMs: m.durationMs, costUsd: m.costUsd }]),
),
...(state.error && { error: state.error }),
};
}
+454
View File
@@ -0,0 +1,454 @@
#!/usr/bin/env node
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Combined Temporal worker + client for Shannon pentest pipeline.
*
* Starts a worker on a per-invocation task queue, submits a workflow,
* waits for the result, and exits. Designed to run as a single ephemeral
* container per scan.
*
* Usage:
* node dist/temporal/worker.js <webUrl> <repoPath> [options]
*
* Options:
* --task-queue <name> Task queue name (required, unique per scan)
* --config <path> Configuration file path
* --output <path> Output directory for workspaces
* --workspace <name> Resume from existing workspace
* --pipeline-testing Use minimal prompts for fast testing
*
* Environment:
* TEMPORAL_ADDRESS - Temporal server address (default: localhost:7233)
*/
import fs from 'node:fs';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { Client, Connection, type WorkflowHandle, WorkflowNotFoundError } from '@temporalio/client';
import { bundleWorkflowCode, NativeConnection, Worker } from '@temporalio/worker';
import dotenv from 'dotenv';
import { sanitizeHostname } from '../audit/utils.js';
import { parseConfig } from '../config-parser.js';
import type { PipelineConfig } from '../types/config.js';
import { fileExists, readJson } from '../utils/file-io.js';
import * as activities from './activities.js';
import type { PipelineInput, PipelineProgress, PipelineState } from './shared.js';
dotenv.config();
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const PROGRESS_QUERY = 'getProgress';
// === CLI Argument Parsing ===
interface CliArgs {
webUrl: string;
repoPath: string;
taskQueue: string;
configPath?: string;
outputPath?: string;
pipelineTestingMode: boolean;
resumeFromWorkspace?: string;
}
function showUsage(): void {
console.log('\nShannon Worker');
console.log('Combined worker + client for pentest pipeline\n');
console.log('Usage:');
console.log(' node dist/temporal/worker.js <webUrl> <repoPath> --task-queue <name> [options]\n');
console.log('Options:');
console.log(' --task-queue <name> Task queue name (required)');
console.log(' --config <path> Configuration file path');
console.log(' --workspace <name> Resume from existing workspace');
console.log(' --pipeline-testing Use minimal prompts for fast testing\n');
}
function parseCliArgs(argv: string[]): CliArgs {
if (argv.includes('--help') || argv.includes('-h') || argv.length === 0) {
showUsage();
process.exit(0);
}
let webUrl: string | undefined;
let repoPath: string | undefined;
let taskQueue: string | undefined;
let configPath: string | undefined;
let outputPath: string | undefined;
let pipelineTestingMode = false;
let resumeFromWorkspace: string | undefined;
for (let i = 0; i < argv.length; i++) {
const arg = argv[i];
if (arg === '--task-queue') {
const nextArg = argv[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
taskQueue = nextArg;
i++;
}
} else if (arg === '--config') {
const nextArg = argv[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
configPath = nextArg;
i++;
}
} else if (arg === '--output') {
const nextArg = argv[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
outputPath = nextArg;
i++;
}
} else if (arg === '--workspace') {
const nextArg = argv[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
resumeFromWorkspace = nextArg;
i++;
}
} else if (arg === '--pipeline-testing') {
pipelineTestingMode = true;
} else if (arg && !arg.startsWith('-')) {
if (!webUrl) {
webUrl = arg;
} else if (!repoPath) {
repoPath = arg;
}
}
}
if (!webUrl || !repoPath) {
console.error('Error: webUrl and repoPath are required');
showUsage();
process.exit(1);
}
if (!taskQueue) {
console.error('Error: --task-queue is required');
showUsage();
process.exit(1);
}
return {
webUrl,
repoPath,
taskQueue,
pipelineTestingMode,
...(configPath && { configPath }),
...(outputPath && { outputPath }),
...(resumeFromWorkspace && { resumeFromWorkspace }),
};
}
// === Workspace Resolution ===
interface SessionJson {
session: {
id: string;
webUrl: string;
originalWorkflowId?: string;
resumeAttempts?: Array<{ workflowId: string }>;
};
metrics: {
total_cost_usd: number;
};
}
function isValidWorkspaceName(name: string): boolean {
return /^[a-zA-Z0-9][a-zA-Z0-9_-]{0,127}$/.test(name);
}
interface WorkspaceResolution {
workflowId: string;
sessionId: string;
isResume: boolean;
terminatedWorkflows: string[];
}
async function terminateExistingWorkflows(client: Client, workspaceName: string): Promise<string[]> {
const sessionPath = path.join('./workspaces', workspaceName, 'session.json');
if (!(await fileExists(sessionPath))) {
throw new Error(`Workspace not found: ${workspaceName}\n` + `Expected path: ${sessionPath}`);
}
const session = await readJson<SessionJson>(sessionPath);
const workflowIds = [
session.session.originalWorkflowId || session.session.id,
...(session.session.resumeAttempts?.map((r) => r.workflowId) || []),
].filter((id): id is string => id != null);
const terminated: string[] = [];
for (const wfId of workflowIds) {
try {
const handle = client.workflow.getHandle(wfId);
const description = await handle.describe();
if (description.status.name === 'RUNNING') {
console.log(`Terminating running workflow: ${wfId}`);
await handle.terminate('Superseded by resume workflow');
terminated.push(wfId);
console.log(`Terminated: ${wfId}`);
} else {
console.log(`Workflow already ${description.status.name}: ${wfId}`);
}
} catch (error) {
if (error instanceof WorkflowNotFoundError) {
console.log(`Workflow not found (already cleaned up): ${wfId}`);
} else {
console.log(`Failed to terminate ${wfId}: ${error}`);
}
}
}
return terminated;
}
async function resolveWorkspace(client: Client, args: CliArgs): Promise<WorkspaceResolution> {
if (!args.resumeFromWorkspace) {
const hostname = sanitizeHostname(args.webUrl);
const workflowId = `${hostname}_shannon-${Date.now()}`;
return {
workflowId,
sessionId: workflowId,
isResume: false,
terminatedWorkflows: [],
};
}
const workspace = args.resumeFromWorkspace;
const sessionPath = path.join('./workspaces', workspace, 'session.json');
const workspaceExists = await fileExists(sessionPath);
if (workspaceExists) {
console.log('=== RESUME MODE ===');
console.log(`Workspace: ${workspace}\n`);
const terminatedWorkflows = await terminateExistingWorkflows(client, workspace);
if (terminatedWorkflows.length > 0) {
console.log(`Terminated ${terminatedWorkflows.length} previous workflow(s)\n`);
}
const session = await readJson<SessionJson>(sessionPath);
if (session.session.webUrl !== args.webUrl) {
console.error('ERROR: URL mismatch with workspace');
console.error(` Workspace URL: ${session.session.webUrl}`);
console.error(` Provided URL: ${args.webUrl}`);
process.exit(1);
}
return {
workflowId: `${workspace}_resume_${Date.now()}`,
sessionId: workspace,
isResume: true,
terminatedWorkflows,
};
}
if (!isValidWorkspaceName(workspace)) {
console.error(`ERROR: Invalid workspace name: "${workspace}"`);
console.error(' Must be 1-128 characters, alphanumeric/hyphens/underscores, starting with alphanumeric');
process.exit(1);
}
console.log('=== NEW NAMED WORKSPACE ===');
console.log(`Workspace: ${workspace}\n`);
// If the workspace name already looks like a CLI-generated ID
// (ends with _shannon-<digits>), use it directly to avoid double _shannon- suffixes
const workflowId = /_shannon-\d+$/.test(workspace) ? workspace : `${workspace}_shannon-${Date.now()}`;
return {
workflowId,
sessionId: workspace,
isResume: false,
terminatedWorkflows: [],
};
}
// === Pipeline Input Construction ===
async function loadPipelineConfig(configPath: string | undefined): Promise<PipelineConfig> {
if (!configPath) return {};
try {
const config = await parseConfig(configPath);
const raw = config.pipeline;
if (!raw) return {};
const result: PipelineConfig = {};
if (raw.retry_preset !== undefined) {
result.retry_preset = raw.retry_preset;
}
if (raw.max_concurrent_pipelines !== undefined) {
result.max_concurrent_pipelines = Number(raw.max_concurrent_pipelines);
}
return result;
} catch {
return {};
}
}
function buildPipelineInput(
args: CliArgs,
workspace: WorkspaceResolution,
pipelineConfig: PipelineConfig,
): PipelineInput {
return {
webUrl: args.webUrl,
repoPath: args.repoPath,
workflowId: workspace.workflowId,
sessionId: workspace.sessionId,
...(args.configPath && { configPath: args.configPath }),
...(args.pipelineTestingMode && { pipelineTestingMode: args.pipelineTestingMode }),
...(workspace.isResume && args.resumeFromWorkspace && { resumeFromWorkspace: args.resumeFromWorkspace }),
...(workspace.terminatedWorkflows.length > 0 && { terminatedWorkflows: workspace.terminatedWorkflows }),
...(Object.keys(pipelineConfig).length > 0 && { pipelineConfig }),
};
}
// === Workflow Result Handling ===
async function waitForWorkflowResult(
handle: WorkflowHandle<(input: PipelineInput) => Promise<PipelineState>>,
workspace: WorkspaceResolution,
): Promise<void> {
const progressInterval = setInterval(async () => {
try {
const progress = await handle.query<PipelineProgress>(PROGRESS_QUERY);
const elapsed = Math.floor(progress.elapsedMs / 1000);
console.log(
`[${elapsed}s] Phase: ${progress.currentPhase || 'unknown'} | Agent: ${progress.currentAgent || 'none'} | Completed: ${progress.completedAgents.length}/13`,
);
} catch {
// Workflow may have completed
}
}, 30000);
try {
const result = await handle.result();
clearInterval(progressInterval);
console.log('\nPipeline completed successfully!');
if (result.summary) {
console.log(`Duration: ${Math.floor(result.summary.totalDurationMs / 1000)}s`);
console.log(`Agents completed: ${result.summary.agentCount}`);
console.log(`Total turns: ${result.summary.totalTurns}`);
console.log(`Run cost: $${result.summary.totalCostUsd.toFixed(4)}`);
if (workspace.isResume) {
try {
const session = await readJson<SessionJson>(path.join('./workspaces', workspace.sessionId, 'session.json'));
console.log(`Cumulative cost: $${session.metrics.total_cost_usd.toFixed(4)}`);
} catch {
// Non-fatal
}
}
}
} catch (error) {
clearInterval(progressInterval);
console.error('\nPipeline failed:', error);
process.exit(1);
}
}
// === Deliverables Copy ===
function copyDeliverables(repoPath: string, outputPath: string): void {
const deliverablesDir = path.join(repoPath, 'deliverables');
if (!fs.existsSync(deliverablesDir)) {
console.log('No deliverables directory found, skipping copy');
return;
}
const files = fs.readdirSync(deliverablesDir);
if (files.length === 0) {
console.log('No deliverables to copy');
return;
}
fs.mkdirSync(outputPath, { recursive: true });
for (const file of files) {
const src = path.join(deliverablesDir, file);
const dest = path.join(outputPath, file);
fs.cpSync(src, dest, { recursive: true });
}
console.log(`Copied ${files.length} deliverable(s) to ${outputPath}`);
}
// === Main Entry Point ===
async function run(): Promise<void> {
// 1. Parse CLI args
const args = parseCliArgs(process.argv.slice(2));
// 2. Connect to Temporal server
const address = process.env.TEMPORAL_ADDRESS || 'localhost:7233';
console.log(`Connecting to Temporal at ${address}...`);
const connection = await NativeConnection.connect({ address });
const clientConnection = await Connection.connect({ address });
const client = new Client({ connection: clientConnection });
try {
// 3. Bundle workflows and create worker on per-invocation task queue
console.log('Bundling workflows...');
const workflowBundle = await bundleWorkflowCode({
workflowsPath: path.join(__dirname, 'workflows.js'),
});
const worker = await Worker.create({
connection,
namespace: 'default',
workflowBundle,
activities,
taskQueue: args.taskQueue,
maxConcurrentActivityTaskExecutions: 25,
});
// 4. Resolve workspace and build pipeline input
const workspace = await resolveWorkspace(client, args);
const pipelineConfig = await loadPipelineConfig(args.configPath);
const input = buildPipelineInput(args, workspace, pipelineConfig);
// 5. Start worker polling in the background
const workerDone = worker.run();
// 6. Submit workflow to the same task queue
const handle = await client.workflow.start<(input: PipelineInput) => Promise<PipelineState>>(
'pentestPipelineWorkflow',
{
taskQueue: args.taskQueue,
workflowId: workspace.workflowId,
args: [input],
},
);
// 7. Wait for workflow result
await waitForWorkflowResult(handle, workspace);
// 8. Copy deliverables to output directory
if (args.outputPath) {
copyDeliverables(args.repoPath, args.outputPath);
}
// 9. Shut down worker gracefully
worker.shutdown();
await workerDone;
} finally {
await connection.close();
await clientConnection.close();
}
}
run().catch((err) => {
console.error('Worker failed:', err);
process.exit(1);
});
@@ -0,0 +1,88 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Workflow error formatting utilities.
* Pure functions with no side effects — safe for Temporal workflow sandbox.
*/
/** Maps Temporal error type strings to actionable remediation hints. */
const REMEDIATION_HINTS: Record<string, string> = {
AuthenticationError: 'Verify ANTHROPIC_API_KEY or CLAUDE_CODE_OAUTH_TOKEN in .env is valid and not expired.',
ConfigurationError: 'Check your CONFIG file path and contents.',
BillingError: 'Check your Anthropic billing dashboard. Add credits or wait for spending cap reset.',
GitError: 'Check repository path and git state.',
InvalidTargetError: 'Verify the target URL is correct and accessible.',
PermissionError: 'Check file and network permissions.',
ExecutionLimitError: 'Agent exceeded maximum turns or budget. Review prompt complexity.',
};
/**
* Walk the .cause chain to find the innermost error with a .type property.
* Temporal wraps ApplicationFailure in ActivityFailure — the useful info is inside.
*
* Uses duck-typing because workflow code cannot import @temporalio/activity types.
*/
function unwrapActivityError(error: unknown): {
message: string;
type: string | null;
} {
let current: unknown = error;
let typed: { message: string; type: string } | null = null;
while (current instanceof Error) {
if ('type' in current && typeof (current as { type: unknown }).type === 'string') {
typed = {
message: current.message,
type: (current as { type: string }).type,
};
}
current = (current as { cause?: unknown }).cause;
}
if (typed) {
return typed;
}
return {
message: error instanceof Error ? error.message : String(error),
type: null,
};
}
/**
* Format a structured error string from workflow catch context.
* Segments are delimited by | for multi-line rendering by WorkflowLogger.
*/
export function formatWorkflowError(error: unknown, currentPhase: string | null, currentAgent: string | null): string {
const unwrapped = unwrapActivityError(error);
// Phase context (first segment)
let phaseContext = 'Pipeline failed';
if (currentPhase && currentAgent && currentPhase !== currentAgent) {
phaseContext = `${currentPhase} failed (agent: ${currentAgent})`;
} else if (currentPhase) {
phaseContext = `${currentPhase} failed`;
}
const segments: string[] = [phaseContext];
if (unwrapped.type) {
segments.push(unwrapped.type);
}
// Sanitize pipe characters from message to preserve delimiter format
segments.push(unwrapped.message.replaceAll('|', '/'));
if (unwrapped.type) {
const hint = REMEDIATION_HINTS[unwrapped.type];
if (hint) {
segments.push(`Hint: ${hint}`);
}
}
return segments.join('|');
}
+484
View File
@@ -0,0 +1,484 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Temporal workflow for Shannon pentest pipeline.
*
* Orchestrates the penetration testing workflow:
* 1. Pre-Reconnaissance (sequential)
* 2. Reconnaissance (sequential)
* 3-4. Vulnerability + Exploitation (5 pipelined pairs in parallel)
* Each pair: vuln agent → queue check → conditional exploit
* No synchronization barrier - exploits start when their vuln finishes
* 5. Reporting (sequential)
*
* Features:
* - Queryable state via getProgress
* - Automatic retry with backoff for transient/billing errors
* - Non-retryable classification for permanent errors
* - Audit correlation via workflowId
* - Graceful failure handling: pipelines continue if one fails
*/
import { log, proxyActivities, setHandler, workflowInfo } from '@temporalio/workflow';
import type { AgentName, VulnType } from '../types/agents.js';
import { ALL_AGENTS } from '../types/agents.js';
import type * as activities from './activities.js';
import type { ActivityInput } from './activities.js';
import {
type AgentMetrics,
getProgress,
type PipelineInput,
type PipelineProgress,
type PipelineState,
type PipelineSummary,
type ResumeState,
type VulnExploitPipelineResult,
} from './shared.js';
import { toWorkflowSummary } from './summary-mapper.js';
import { formatWorkflowError } from './workflow-errors.js';
// Retry configuration for production (long intervals for billing recovery)
const PRODUCTION_RETRY = {
initialInterval: '5 minutes',
maximumInterval: '30 minutes',
backoffCoefficient: 2,
maximumAttempts: 50,
nonRetryableErrorTypes: [
'AuthenticationError',
'PermissionError',
'InvalidRequestError',
'RequestTooLargeError',
'ConfigurationError',
'InvalidTargetError',
'ExecutionLimitError',
],
};
// Retry configuration for pipeline testing (fast iteration)
const TESTING_RETRY = {
initialInterval: '10 seconds',
maximumInterval: '30 seconds',
backoffCoefficient: 2,
maximumAttempts: 5,
nonRetryableErrorTypes: PRODUCTION_RETRY.nonRetryableErrorTypes,
};
// Activity proxy with production retry configuration (default)
const acts = proxyActivities<typeof activities>({
startToCloseTimeout: '2 hours',
heartbeatTimeout: '60 minutes', // Extended for sub-agent execution (SDK blocks event loop during Task tool calls)
retry: PRODUCTION_RETRY,
});
// Activity proxy with testing retry configuration (fast)
const testActs = proxyActivities<typeof activities>({
startToCloseTimeout: '30 minutes',
heartbeatTimeout: '30 minutes', // Extended for sub-agent execution in testing
retry: TESTING_RETRY,
});
// Retry configuration for subscription plans (5h+ rolling rate limit windows)
const SUBSCRIPTION_RETRY = {
initialInterval: '5 minutes',
maximumInterval: '6 hours',
backoffCoefficient: 2,
maximumAttempts: 100,
nonRetryableErrorTypes: PRODUCTION_RETRY.nonRetryableErrorTypes,
};
// Activity proxy for subscription plan recovery (extended timeouts)
const subscriptionActs = proxyActivities<typeof activities>({
startToCloseTimeout: '8 hours',
heartbeatTimeout: '2 hours',
retry: SUBSCRIPTION_RETRY,
});
// Retry configuration for preflight validation (short timeout, few retries)
const PREFLIGHT_RETRY = {
initialInterval: '10 seconds',
maximumInterval: '1 minute',
backoffCoefficient: 2,
maximumAttempts: 3,
nonRetryableErrorTypes: PRODUCTION_RETRY.nonRetryableErrorTypes,
};
// Activity proxy for preflight validation (short timeout)
const preflightActs = proxyActivities<typeof activities>({
startToCloseTimeout: '2 minutes',
heartbeatTimeout: '2 minutes',
retry: PREFLIGHT_RETRY,
});
/**
* Compute aggregated metrics from the current pipeline state.
* Called on both success and failure to provide partial metrics.
*/
function computeSummary(state: PipelineState): PipelineSummary {
const metrics = Object.values(state.agentMetrics);
return {
totalCostUsd: metrics.reduce((sum, m) => sum + (m.costUsd ?? 0), 0),
totalDurationMs: Date.now() - state.startTime,
totalTurns: metrics.reduce((sum, m) => sum + (m.numTurns ?? 0), 0),
agentCount: state.completedAgents.length,
};
}
export async function pentestPipelineWorkflow(input: PipelineInput): Promise<PipelineState> {
const { workflowId } = workflowInfo();
// Select activity proxy based on mode: testing (fast), subscription (extended), or default
function selectActivityProxy(pipelineInput: PipelineInput) {
if (pipelineInput.pipelineTestingMode) return testActs;
if (pipelineInput.pipelineConfig?.retry_preset === 'subscription') return subscriptionActs;
return acts;
}
const a = selectActivityProxy(input);
const state: PipelineState = {
status: 'running',
currentPhase: null,
currentAgent: null,
completedAgents: [],
failedAgent: null,
error: null,
startTime: Date.now(),
agentMetrics: {},
summary: null,
};
setHandler(
getProgress,
(): PipelineProgress => ({
...state,
workflowId,
elapsedMs: Date.now() - state.startTime,
}),
);
// Build ActivityInput with required workflowId for audit correlation
// Activities require workflowId (non-optional), PipelineInput has it optional
// Use spread to conditionally include optional properties (exactOptionalPropertyTypes)
// sessionId is workspace name for resume, or workflowId for new runs
const sessionId = input.sessionId || input.resumeFromWorkspace || workflowId;
const activityInput: ActivityInput = {
webUrl: input.webUrl,
repoPath: input.repoPath,
workflowId,
sessionId,
...(input.configPath !== undefined && { configPath: input.configPath }),
...(input.outputPath !== undefined && { outputPath: input.outputPath }),
...(input.pipelineTestingMode !== undefined && {
pipelineTestingMode: input.pipelineTestingMode,
}),
};
let resumeState: ResumeState | null = null;
if (input.resumeFromWorkspace) {
// 1. Load resume state (validates workspace, cross-checks deliverables)
resumeState = await a.loadResumeState(input.resumeFromWorkspace, input.webUrl, input.repoPath);
// 2. Restore git workspace and clean up incomplete deliverables
const incompleteAgents = ALL_AGENTS.filter(
(agentName) => !resumeState?.completedAgents.includes(agentName),
) as AgentName[];
await a.restoreGitCheckpoint(input.repoPath, resumeState.checkpointHash, incompleteAgents);
// 3. Short-circuit if all agents already completed
if (resumeState.completedAgents.length === ALL_AGENTS.length) {
log.info(`All ${ALL_AGENTS.length} agents already completed. Nothing to resume.`);
state.status = 'completed';
state.completedAgents = [...resumeState.completedAgents];
state.summary = computeSummary(state);
return state;
}
// 4. Record this resume attempt in session.json and workflow.log
await a.recordResumeAttempt(
activityInput,
input.terminatedWorkflows || [],
resumeState.checkpointHash,
resumeState.originalWorkflowId,
resumeState.completedAgents,
);
log.info('Resume state loaded and workspace restored');
}
const shouldSkip = (agentName: string): boolean => {
return resumeState?.completedAgents.includes(agentName) ?? false;
};
// Run a sequential agent phase (pre-recon, recon)
async function runSequentialPhase(
phaseName: string,
agentName: AgentName,
runAgent: (input: ActivityInput) => Promise<AgentMetrics>,
): Promise<void> {
if (!shouldSkip(agentName)) {
state.currentPhase = phaseName;
state.currentAgent = agentName;
await a.logPhaseTransition(activityInput, phaseName, 'start');
state.agentMetrics[agentName] = await runAgent(activityInput);
state.completedAgents.push(agentName);
await a.logPhaseTransition(activityInput, phaseName, 'complete');
} else {
log.info(`Skipping ${agentName} (already complete)`);
state.completedAgents.push(agentName);
}
}
// Build pipeline configs for the 5 vuln→exploit pairs
function buildPipelineConfigs(): Array<{
vulnType: VulnType;
vulnAgent: string;
exploitAgent: string;
runVuln: () => Promise<AgentMetrics>;
runExploit: () => Promise<AgentMetrics>;
}> {
return [
{
vulnType: 'injection',
vulnAgent: 'injection-vuln',
exploitAgent: 'injection-exploit',
runVuln: () => a.runInjectionVulnAgent(activityInput),
runExploit: () => a.runInjectionExploitAgent(activityInput),
},
{
vulnType: 'xss',
vulnAgent: 'xss-vuln',
exploitAgent: 'xss-exploit',
runVuln: () => a.runXssVulnAgent(activityInput),
runExploit: () => a.runXssExploitAgent(activityInput),
},
{
vulnType: 'auth',
vulnAgent: 'auth-vuln',
exploitAgent: 'auth-exploit',
runVuln: () => a.runAuthVulnAgent(activityInput),
runExploit: () => a.runAuthExploitAgent(activityInput),
},
{
vulnType: 'ssrf',
vulnAgent: 'ssrf-vuln',
exploitAgent: 'ssrf-exploit',
runVuln: () => a.runSsrfVulnAgent(activityInput),
runExploit: () => a.runSsrfExploitAgent(activityInput),
},
{
vulnType: 'authz',
vulnAgent: 'authz-vuln',
exploitAgent: 'authz-exploit',
runVuln: () => a.runAuthzVulnAgent(activityInput),
runExploit: () => a.runAuthzExploitAgent(activityInput),
},
];
}
// Aggregate results from settled pipeline promises into workflow state
function aggregatePipelineResults(results: PromiseSettledResult<VulnExploitPipelineResult>[]): void {
const failedPipelines: string[] = [];
for (const result of results) {
if (result.status === 'fulfilled') {
const { vulnType, vulnMetrics, exploitMetrics } = result.value;
const vulnAgentName = `${vulnType}-vuln`;
if (vulnMetrics) {
state.agentMetrics[vulnAgentName] = vulnMetrics;
state.completedAgents.push(vulnAgentName);
} else if (shouldSkip(vulnAgentName)) {
state.completedAgents.push(vulnAgentName);
}
const exploitAgentName = `${vulnType}-exploit`;
if (exploitMetrics) {
state.agentMetrics[exploitAgentName] = exploitMetrics;
state.completedAgents.push(exploitAgentName);
} else if (shouldSkip(exploitAgentName)) {
state.completedAgents.push(exploitAgentName);
}
} else {
const errorMsg = result.reason instanceof Error ? result.reason.message : String(result.reason);
failedPipelines.push(errorMsg);
}
}
if (failedPipelines.length > 0) {
log.warn(`${failedPipelines.length} pipeline(s) failed`, {
failures: failedPipelines,
});
}
}
// Run thunks with a concurrency limit, returning PromiseSettledResult for each.
// When limit >= thunks.length (default), all launch concurrently — identical to Promise.allSettled.
// NOTE: Results are in completion order, not input order. Callers must key on value fields, not index.
async function runWithConcurrencyLimit(
thunks: Array<() => Promise<VulnExploitPipelineResult>>,
limit: number,
): Promise<PromiseSettledResult<VulnExploitPipelineResult>[]> {
const results: PromiseSettledResult<VulnExploitPipelineResult>[] = [];
const inFlight = new Set<Promise<void>>();
for (const thunk of thunks) {
const slot = thunk()
.then(
(value) => {
results.push({ status: 'fulfilled', value });
},
(reason: unknown) => {
results.push({ status: 'rejected', reason });
},
)
.finally(() => {
inFlight.delete(slot);
});
inFlight.add(slot);
if (inFlight.size >= limit) {
await Promise.race(inFlight);
}
}
await Promise.allSettled(inFlight);
return results;
}
try {
// === Preflight Validation ===
// Quick sanity checks before committing to expensive agent runs.
// NOT using runSequentialPhase — preflight doesn't produce AgentMetrics.
state.currentPhase = 'preflight';
state.currentAgent = null;
await preflightActs.runPreflightValidation(activityInput);
log.info('Preflight validation passed');
// === Phase 1: Pre-Reconnaissance ===
await runSequentialPhase('pre-recon', 'pre-recon', a.runPreReconAgent);
// === Phase 2: Reconnaissance ===
await runSequentialPhase('recon', 'recon', a.runReconAgent);
// === Phases 3-4: Vulnerability Analysis + Exploitation (Pipelined) ===
// Each vuln type runs as an independent pipeline:
// vuln agent → queue check → conditional exploit agent
// Exploits start immediately when their vuln finishes, not waiting for all.
state.currentPhase = 'vulnerability-exploitation';
state.currentAgent = 'pipelines';
await a.logPhaseTransition(activityInput, 'vulnerability-exploitation', 'start');
// Closure over shouldSkip and activityInput by design (Temporal replay safety)
async function runVulnExploitPipeline(
vulnType: VulnType,
runVulnAgent: () => Promise<AgentMetrics>,
runExploitAgent: () => Promise<AgentMetrics>,
): Promise<VulnExploitPipelineResult> {
const vulnAgentName = `${vulnType}-vuln`;
const exploitAgentName = `${vulnType}-exploit`;
// 1. Run vulnerability analysis (or skip if resumed)
let vulnMetrics: AgentMetrics | null = null;
if (!shouldSkip(vulnAgentName)) {
vulnMetrics = await runVulnAgent();
} else {
log.info(`Skipping ${vulnAgentName} (already complete)`);
}
// 2. Check exploitation queue for actionable findings
const decision = await a.checkExploitationQueue(activityInput, vulnType);
// 3. Conditionally run exploitation agent
let exploitMetrics: AgentMetrics | null = null;
if (decision.shouldExploit) {
if (!shouldSkip(exploitAgentName)) {
exploitMetrics = await runExploitAgent();
} else {
log.info(`Skipping ${exploitAgentName} (already complete)`);
}
}
return {
vulnType,
vulnMetrics,
exploitMetrics,
exploitDecision: {
shouldExploit: decision.shouldExploit,
vulnerabilityCount: decision.vulnerabilityCount,
},
error: null,
};
}
const maxConcurrent = input.pipelineConfig?.max_concurrent_pipelines ?? 5;
const pipelineConfigs = buildPipelineConfigs();
const pipelineThunks: Array<() => Promise<VulnExploitPipelineResult>> = [];
for (const config of pipelineConfigs) {
if (!shouldSkip(config.vulnAgent) || !shouldSkip(config.exploitAgent)) {
pipelineThunks.push(() => runVulnExploitPipeline(config.vulnType, config.runVuln, config.runExploit));
} else {
log.info(`Skipping entire ${config.vulnType} pipeline (both agents complete)`);
state.completedAgents.push(config.vulnAgent, config.exploitAgent);
}
}
const pipelineResults = await runWithConcurrencyLimit(pipelineThunks, maxConcurrent);
aggregatePipelineResults(pipelineResults);
state.currentPhase = 'exploitation';
state.currentAgent = null;
await a.logPhaseTransition(activityInput, 'vulnerability-exploitation', 'complete');
// === Phase 5: Reporting ===
if (!shouldSkip('report')) {
state.currentPhase = 'reporting';
state.currentAgent = 'report';
await a.logPhaseTransition(activityInput, 'reporting', 'start');
// First, assemble the concatenated report from exploitation evidence files
await a.assembleReportActivity(activityInput);
// Then run the report agent to add executive summary and clean up
state.agentMetrics.report = await a.runReportAgent(activityInput);
state.completedAgents.push('report');
// Inject model metadata into the final report
await a.injectReportMetadataActivity(activityInput);
await a.logPhaseTransition(activityInput, 'reporting', 'complete');
} else {
log.info('Skipping report (already complete)');
state.completedAgents.push('report');
}
state.status = 'completed';
state.currentPhase = null;
state.currentAgent = null;
state.summary = computeSummary(state);
// Log workflow completion summary
await a.logWorkflowComplete(activityInput, toWorkflowSummary(state, 'completed'));
return state;
} catch (error) {
state.status = 'failed';
state.failedAgent = state.currentAgent;
state.error = formatWorkflowError(error, state.currentPhase, state.currentAgent);
state.summary = computeSummary(state);
// Log workflow failure summary
await a.logWorkflowComplete(activityInput, toWorkflowSummary(state, 'failed'));
throw error;
}
}
+174
View File
@@ -0,0 +1,174 @@
#!/usr/bin/env node
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Workspace listing tool for Shannon.
*
* Reads workspaces/ directories, parses session.json files, and displays
* a formatted table of all workspaces with status, duration, and cost.
*
* Usage:
* node dist/temporal/workspaces.js
*
* Environment:
* WORKSPACES_DIR - Override workspaces directory (default: ./workspaces)
*/
import fs from 'node:fs/promises';
import path from 'node:path';
import { WORKSPACES_DIR as DEFAULT_WORKSPACES_DIR } from '../paths.js';
interface SessionJson {
session: {
id: string;
webUrl: string;
status: 'in-progress' | 'completed' | 'failed';
createdAt: string;
completedAt?: string;
};
metrics: {
total_cost_usd: number;
};
}
interface WorkspaceInfo {
name: string;
url: string;
status: 'in-progress' | 'completed' | 'failed';
createdAt: Date;
completedAt: Date | null;
costUsd: number;
}
function formatDuration(ms: number): string {
const seconds = Math.floor(ms / 1000);
const minutes = Math.floor(seconds / 60);
const hours = Math.floor(minutes / 60);
if (hours > 0) {
return `${hours}h ${minutes % 60}m`;
}
if (minutes > 0) {
return `${minutes}m`;
}
return `${seconds}s`;
}
function getStatusDisplay(status: string): string {
return status;
}
function truncate(str: string, maxLen: number): string {
if (str.length <= maxLen) return str;
return `${str.slice(0, maxLen - 1)}\u2026`;
}
async function listWorkspaces(): Promise<void> {
const workspacesDir = process.env.WORKSPACES_DIR || DEFAULT_WORKSPACES_DIR;
let entries: string[];
try {
entries = await fs.readdir(workspacesDir);
} catch {
console.log('No workspaces directory found.');
console.log(`Expected: ${workspacesDir}`);
return;
}
const workspaces: WorkspaceInfo[] = [];
for (const entry of entries) {
const sessionPath = path.join(workspacesDir, entry, 'session.json');
try {
const content = await fs.readFile(sessionPath, 'utf8');
const data = JSON.parse(content) as SessionJson;
workspaces.push({
name: entry,
url: data.session.webUrl,
status: data.session.status,
createdAt: new Date(data.session.createdAt),
completedAt: data.session.completedAt ? new Date(data.session.completedAt) : null,
costUsd: data.metrics.total_cost_usd,
});
} catch {
// Skip directories without valid session.json
}
}
if (workspaces.length === 0) {
console.log('\nNo workspaces found.');
console.log('Run a pipeline first: ./shannon start -u <url> -r <repo>');
return;
}
// Sort by creation date (most recent first)
workspaces.sort((a, b) => b.createdAt.getTime() - a.createdAt.getTime());
console.log('\n=== Shannon Workspaces ===\n');
// Column widths
const nameWidth = 30;
const urlWidth = 30;
const statusWidth = 14;
const durationWidth = 10;
const costWidth = 10;
// Header
console.log(
' ' +
'WORKSPACE'.padEnd(nameWidth) +
'URL'.padEnd(urlWidth) +
'STATUS'.padEnd(statusWidth) +
'DURATION'.padEnd(durationWidth) +
'COST'.padEnd(costWidth),
);
console.log(` ${'\u2500'.repeat(nameWidth + urlWidth + statusWidth + durationWidth + costWidth)}`);
let resumableCount = 0;
for (const ws of workspaces) {
const now = new Date();
const endTime = ws.completedAt || now;
const durationMs = endTime.getTime() - ws.createdAt.getTime();
const duration = formatDuration(durationMs);
const cost = `$${ws.costUsd.toFixed(2)}`;
const isResumable = ws.status !== 'completed';
if (isResumable) {
resumableCount++;
}
const resumeTag = isResumable ? ' (resumable)' : '';
console.log(
' ' +
truncate(ws.name, nameWidth - 2).padEnd(nameWidth) +
truncate(ws.url, urlWidth - 2).padEnd(urlWidth) +
getStatusDisplay(ws.status).padEnd(statusWidth) +
duration.padEnd(durationWidth) +
cost.padEnd(costWidth) +
resumeTag,
);
}
console.log();
const summary = `${workspaces.length} workspace${workspaces.length === 1 ? '' : 's'} found`;
const resumeSummary = resumableCount > 0 ? ` (${resumableCount} resumable)` : '';
console.log(`${summary}${resumeSummary}`);
if (resumableCount > 0) {
console.log('\nResume with: ./shannon start -u <url> -r <repo> -w <name>');
}
console.log();
}
listWorkspaces().catch((err) => {
console.error('Error listing workspaces:', err);
process.exit(1);
});