feat: add workflow resume from workspace via --workspace flag

When a workflow is interrupted (VM crash, Ctrl+C, Docker restart), it can
now be resumed by passing the workspace name. The system reads session.json
to determine which agents completed, validates deliverables exist on disk,
restores the git checkpoint, and skips already-completed agents.

- Add --workspace CLI flag and auto-terminate conflicting workflows
- Add loadResumeState, restoreGitCheckpoint, recordResumeAttempt activities
- Add skip logic for all 5 pipeline phases including parallel execution
- Separate sessionId (persistent directory) from workflowId (execution ID)
- Track resume attempts in session.json for audit trail
- Derive AgentName type from ALL_AGENTS array to eliminate duplication
- Add getDeliverablePath mapping for deliverable validation
This commit is contained in:
ezl-keygraph
2026-02-13 20:26:16 +05:30
parent ce2628f6f0
commit f932fad2ed
7 changed files with 691 additions and 97 deletions
+229 -9
View File
@@ -72,9 +72,14 @@ import { getPromptNameForAgent } from '../types/agents.js';
import { AuditSession } from '../audit/index.js';
import type { WorkflowSummary } from '../audit/workflow-logger.js';
import type { AgentName } from '../types/agents.js';
import type { AgentMetrics } from './shared.js';
import { getDeliverablePath, ALL_AGENTS } from '../types/agents.js';
import type { AgentMetrics, ResumeState } from './shared.js';
import type { DistributedConfig } from '../types/config.js';
import { copyDeliverablesToAudit, type SessionMetadata } from '../audit/utils.js';
import { copyDeliverablesToAudit, type SessionMetadata, readJson, fileExists } from '../audit/utils.js';
import type { ResumeAttempt } from '../audit/metrics-tracker.js';
import { executeGitCommandWithRetry } from '../utils/git-manager.js';
import path from 'path';
import fs from 'fs/promises';
const HEARTBEAT_INTERVAL_MS = 2000; // Must be < heartbeatTimeout (10min production, 5min testing)
@@ -89,6 +94,7 @@ export interface ActivityInput {
outputPath?: string;
pipelineTestingMode?: boolean;
workflowId: string;
sessionId: string; // Workspace name (for resume) or workflowId (for new runs)
}
/**
@@ -142,8 +148,9 @@ async function runAgentActivity(
}
// 2. Build session metadata for audit
// Use sessionId (workspace name) for directory, workflowId for tracking
const sessionMetadata: SessionMetadata = {
id: workflowId,
id: input.sessionId,
webUrl,
repoPath,
...(outputPath && { outputPath }),
@@ -151,7 +158,7 @@ async function runAgentActivity(
// 3. Initialize audit session (idempotent, safe across retries)
const auditSession = new AuditSession(sessionMetadata);
await auditSession.initialize();
await auditSession.initialize(workflowId);
// 4. Load prompt
const promptName = getPromptNameForAgent(agentName);
@@ -449,6 +456,219 @@ export async function checkExploitationQueue(
};
}
// === Resume Activities ===
/**
* Session.json structure for resume state loading
*/
interface SessionJson {
session: {
id: string;
webUrl: string;
repoPath?: string;
originalWorkflowId?: string;
resumeAttempts?: ResumeAttempt[];
};
metrics: {
agents: Record<string, {
status: 'in-progress' | 'success' | 'failed';
checkpoint?: string;
}>;
};
}
/**
* Load resume state from an existing workspace.
* Validates workspace exists, URL matches, and determines which agents to skip.
*
* @throws ApplicationFailure.nonRetryable if workspace not found or URL mismatch
*/
export async function loadResumeState(
workspaceName: string,
expectedUrl: string,
expectedRepoPath: string
): Promise<ResumeState> {
const sessionPath = path.join('./audit-logs', workspaceName, 'session.json');
// Validate workspace exists
const exists = await fileExists(sessionPath);
if (!exists) {
throw ApplicationFailure.nonRetryable(
`Workspace not found: ${workspaceName}\nExpected path: ${sessionPath}`,
'WorkspaceNotFoundError'
);
}
// Load session.json
let session: SessionJson;
try {
session = await readJson<SessionJson>(sessionPath);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
throw ApplicationFailure.nonRetryable(
`Corrupted session.json in workspace ${workspaceName}: ${errorMsg}`,
'CorruptedSessionError'
);
}
// Validate URL matches
if (session.session.webUrl !== expectedUrl) {
throw ApplicationFailure.nonRetryable(
`URL mismatch with workspace\n Workspace URL: ${session.session.webUrl}\n Provided URL: ${expectedUrl}`,
'URLMismatchError'
);
}
// Find completed agents (status === 'success' AND deliverable exists)
const completedAgents: string[] = [];
const agents = session.metrics.agents;
for (const agentName of ALL_AGENTS) {
const agentData = agents[agentName];
// Skip if agent never ran or didn't succeed
if (!agentData || agentData.status !== 'success') {
continue;
}
// Validate deliverable exists
const deliverablePath = getDeliverablePath(agentName, expectedRepoPath);
const deliverableExists = await fileExists(deliverablePath);
if (!deliverableExists) {
console.log(
chalk.yellow(`Agent ${agentName} shows success but deliverable missing, will re-run`)
);
continue;
}
// Agent completed successfully and deliverable exists
completedAgents.push(agentName);
}
// Find latest checkpoint from completed agents
const checkpoints = completedAgents
.map((name) => agents[name]?.checkpoint)
.filter((hash): hash is string => hash != null);
if (checkpoints.length === 0) {
throw ApplicationFailure.nonRetryable(
`No successful agent checkpoints found in workspace ${workspaceName}`,
'NoCheckpointsError'
);
}
// Find most recent commit among checkpoints
const checkpointHash = await findLatestCommit(expectedRepoPath, checkpoints);
const originalWorkflowId = session.session.originalWorkflowId || session.session.id;
console.log(chalk.cyan(`=== RESUME STATE ===`));
console.log(`Workspace: ${workspaceName}`);
console.log(`Completed agents: ${completedAgents.length}`);
console.log(`Checkpoint: ${checkpointHash}`);
return {
workspaceName,
originalUrl: session.session.webUrl,
completedAgents,
checkpointHash,
originalWorkflowId,
};
}
/**
* Find the most recent commit among a list of commit hashes.
* Uses git rev-list to determine which commit is newest.
*/
async function findLatestCommit(repoPath: string, commitHashes: string[]): Promise<string> {
if (commitHashes.length === 1) {
const hash = commitHashes[0];
if (!hash) {
throw new Error('Empty commit hash in array');
}
return hash;
}
// Use git rev-list to find the most recent commit among all hashes
const result = await executeGitCommandWithRetry(
['git', 'rev-list', '--max-count=1', ...commitHashes],
repoPath,
'find latest commit'
);
return result.stdout.trim();
}
/**
* Restore git workspace to a checkpoint and clean up partial deliverables.
*
* @param repoPath - Repository path
* @param checkpointHash - Git commit hash to reset to
* @param incompleteAgents - Agents that didn't complete (will have deliverables cleaned up)
*/
export async function restoreGitCheckpoint(
repoPath: string,
checkpointHash: string,
incompleteAgents: AgentName[]
): Promise<void> {
console.log(chalk.blue(`Restoring git workspace to ${checkpointHash}...`));
// Git reset to checkpoint
await executeGitCommandWithRetry(
['git', 'reset', '--hard', checkpointHash],
repoPath,
'reset to checkpoint for resume'
);
await executeGitCommandWithRetry(
['git', 'clean', '-fd'],
repoPath,
'clean untracked files for resume'
);
// Explicitly delete deliverables for incomplete agents
for (const agentName of incompleteAgents) {
const deliverablePath = getDeliverablePath(agentName, repoPath);
try {
const exists = await fileExists(deliverablePath);
if (exists) {
console.log(chalk.yellow(`Cleaning partial deliverable: ${agentName}`));
await fs.unlink(deliverablePath);
}
} catch (error) {
// Non-fatal, just log
console.log(chalk.gray(`Note: Failed to delete ${deliverablePath}: ${error}`));
}
}
console.log(chalk.green('Workspace restored to clean state'));
}
/**
* Record a resume attempt in session.json.
* Tracks the new workflow ID, terminated workflows, and checkpoint hash.
*/
export async function recordResumeAttempt(
input: ActivityInput,
terminatedWorkflows: string[],
checkpointHash: string
): Promise<void> {
const { webUrl, repoPath, outputPath, sessionId, workflowId } = input;
const sessionMetadata: SessionMetadata = {
id: sessionId,
webUrl,
repoPath,
...(outputPath && { outputPath }),
};
const auditSession = new AuditSession(sessionMetadata);
await auditSession.initialize();
await auditSession.addResumeAttempt(workflowId, terminatedWorkflows, checkpointHash);
}
/**
* Log phase transition to the unified workflow log.
* Called at phase boundaries for per-workflow logging.
@@ -458,10 +678,10 @@ export async function logPhaseTransition(
phase: string,
event: 'start' | 'complete'
): Promise<void> {
const { webUrl, repoPath, outputPath, workflowId } = input;
const { webUrl, repoPath, outputPath, sessionId } = input;
const sessionMetadata: SessionMetadata = {
id: workflowId,
id: sessionId,
webUrl,
repoPath,
...(outputPath && { outputPath }),
@@ -485,16 +705,16 @@ export async function logWorkflowComplete(
input: ActivityInput,
summary: WorkflowSummary
): Promise<void> {
const { webUrl, repoPath, outputPath, workflowId } = input;
const { webUrl, repoPath, outputPath, sessionId, workflowId } = input;
const sessionMetadata: SessionMetadata = {
id: workflowId,
id: sessionId,
webUrl,
repoPath,
...(outputPath && { outputPath }),
};
const auditSession = new AuditSession(sessionMetadata);
await auditSession.initialize();
await auditSession.initialize(workflowId);
await auditSession.logWorkflowComplete(summary);
}