import { and, asc, desc, eq, inArray, isNull, or, sql } from "drizzle-orm"; import type { Db } from "@paperclipai/db"; import { activityLog, agents, documentRevisions, heartbeatRunEvents, heartbeatRuns, issueComments, issueDocuments, issues, issueWorkProducts, workspaceOperations, } from "@paperclipai/db"; import { ISSUE_CONTINUATION_SUMMARY_DOCUMENT_KEY } from "@paperclipai/shared"; import { logger } from "../middleware/logger.js"; import { classifyRunLiveness } from "./run-liveness.js"; export interface ActivityFilters { companyId: string; agentId?: string; entityType?: string; entityId?: string; limit?: number; } const DEFAULT_ACTIVITY_LIMIT = 100; const MAX_ACTIVITY_LIMIT = 500; export function normalizeActivityLimit(limit: number | undefined) { if (!Number.isFinite(limit)) return DEFAULT_ACTIVITY_LIMIT; return Math.max(1, Math.min(MAX_ACTIVITY_LIMIT, Math.floor(limit ?? DEFAULT_ACTIVITY_LIMIT))); } export function activityService(db: Db) { const scheduledLivenessBackfills = new Set(); const issueIdAsText = sql`${issues.id}::text`; const summarizedUsageJson = sql | null>` case when ${heartbeatRuns.usageJson} is null then null else jsonb_strip_nulls(jsonb_build_object( 'inputTokens', coalesce(${heartbeatRuns.usageJson} -> 'inputTokens', ${heartbeatRuns.usageJson} -> 'input_tokens'), 'input_tokens', coalesce(${heartbeatRuns.usageJson} -> 'input_tokens', ${heartbeatRuns.usageJson} -> 'inputTokens'), 'outputTokens', coalesce(${heartbeatRuns.usageJson} -> 'outputTokens', ${heartbeatRuns.usageJson} -> 'output_tokens'), 'output_tokens', coalesce(${heartbeatRuns.usageJson} -> 'output_tokens', ${heartbeatRuns.usageJson} -> 'outputTokens'), 'cachedInputTokens', coalesce( ${heartbeatRuns.usageJson} -> 'cachedInputTokens', ${heartbeatRuns.usageJson} -> 'cached_input_tokens', ${heartbeatRuns.usageJson} -> 'cache_read_input_tokens' ), 'cached_input_tokens', coalesce( ${heartbeatRuns.usageJson} -> 'cached_input_tokens', ${heartbeatRuns.usageJson} -> 'cachedInputTokens', ${heartbeatRuns.usageJson} -> 'cache_read_input_tokens' ), 'cache_read_input_tokens', coalesce( ${heartbeatRuns.usageJson} -> 'cache_read_input_tokens', ${heartbeatRuns.usageJson} -> 'cached_input_tokens', ${heartbeatRuns.usageJson} -> 'cachedInputTokens' ), 'billingType', coalesce(${heartbeatRuns.usageJson} -> 'billingType', ${heartbeatRuns.usageJson} -> 'billing_type'), 'billing_type', coalesce(${heartbeatRuns.usageJson} -> 'billing_type', ${heartbeatRuns.usageJson} -> 'billingType'), 'costUsd', coalesce( ${heartbeatRuns.usageJson} -> 'costUsd', ${heartbeatRuns.usageJson} -> 'cost_usd', ${heartbeatRuns.usageJson} -> 'total_cost_usd' ), 'cost_usd', coalesce( ${heartbeatRuns.usageJson} -> 'cost_usd', ${heartbeatRuns.usageJson} -> 'costUsd', ${heartbeatRuns.usageJson} -> 'total_cost_usd' ), 'total_cost_usd', coalesce( ${heartbeatRuns.usageJson} -> 'total_cost_usd', ${heartbeatRuns.usageJson} -> 'cost_usd', ${heartbeatRuns.usageJson} -> 'costUsd' ) )) end `.as("usageJson"); const summarizedResultJson = sql | null>` case when ${heartbeatRuns.resultJson} is null then null else jsonb_strip_nulls(jsonb_build_object( 'billingType', coalesce(${heartbeatRuns.resultJson} -> 'billingType', ${heartbeatRuns.resultJson} -> 'billing_type'), 'billing_type', coalesce(${heartbeatRuns.resultJson} -> 'billing_type', ${heartbeatRuns.resultJson} -> 'billingType'), 'costUsd', coalesce( ${heartbeatRuns.resultJson} -> 'costUsd', ${heartbeatRuns.resultJson} -> 'cost_usd', ${heartbeatRuns.resultJson} -> 'total_cost_usd' ), 'cost_usd', coalesce( ${heartbeatRuns.resultJson} -> 'cost_usd', ${heartbeatRuns.resultJson} -> 'costUsd', ${heartbeatRuns.resultJson} -> 'total_cost_usd' ), 'total_cost_usd', coalesce( ${heartbeatRuns.resultJson} -> 'total_cost_usd', ${heartbeatRuns.resultJson} -> 'cost_usd', ${heartbeatRuns.resultJson} -> 'costUsd' ), 'stopReason', ${heartbeatRuns.resultJson} -> 'stopReason', 'effectiveTimeoutSec', ${heartbeatRuns.resultJson} -> 'effectiveTimeoutSec', 'effectiveTimeoutMs', ${heartbeatRuns.resultJson} -> 'effectiveTimeoutMs', 'timeoutConfigured', ${heartbeatRuns.resultJson} -> 'timeoutConfigured', 'timeoutSource', ${heartbeatRuns.resultJson} -> 'timeoutSource', 'timeoutFired', ${heartbeatRuns.resultJson} -> 'timeoutFired' )) end `.as("resultJson"); function countValue(value: unknown) { const parsed = Number(value ?? 0); return Number.isFinite(parsed) ? Math.max(0, Math.floor(parsed)) : 0; } function dateValue(value: unknown) { if (value instanceof Date) return Number.isNaN(value.getTime()) ? null : value; if (typeof value === "string" || typeof value === "number") { const parsed = new Date(value); return Number.isNaN(parsed.getTime()) ? null : parsed; } return null; } function latestDate(...values: unknown[]) { let latest: Date | null = null; for (const value of values) { const parsed = dateValue(value); if (!parsed) continue; if (!latest || parsed.getTime() > latest.getTime()) latest = parsed; } return latest; } function asRecord(value: unknown): Record | null { if (!value || typeof value !== "object" || Array.isArray(value)) return null; return value as Record; } function readNumber(value: unknown) { return typeof value === "number" && Number.isFinite(value) ? value : null; } async function backfillMissingRunLivenessForIssue(companyId: string, issueId: string) { const runs = await db .select({ id: heartbeatRuns.id, companyId: heartbeatRuns.companyId, status: heartbeatRuns.status, contextSnapshot: heartbeatRuns.contextSnapshot, resultJson: heartbeatRuns.resultJson, stdoutExcerpt: heartbeatRuns.stdoutExcerpt, stderrExcerpt: heartbeatRuns.stderrExcerpt, error: heartbeatRuns.error, errorCode: heartbeatRuns.errorCode, continuationAttempt: heartbeatRuns.continuationAttempt, }) .from(heartbeatRuns) .where( and( eq(heartbeatRuns.companyId, companyId), isNull(heartbeatRuns.livenessState), sql`${heartbeatRuns.status} not in ('queued', 'running')`, or( sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${issueId}`, sql`exists ( select 1 from ${activityLog} where ${activityLog.companyId} = ${companyId} and ${activityLog.entityType} = 'issue' and ${activityLog.entityId} = ${issueId} and ${activityLog.runId} = ${heartbeatRuns.id} )`, ), ), ) .limit(20); if (runs.length === 0) return; const issue = await db .select({ status: issues.status, title: issues.title, description: issues.description, }) .from(issues) .where(and(eq(issues.companyId, companyId), eq(issues.id, issueId))) .then((rows) => rows[0] ?? null); for (const run of runs) { const context = asRecord(run.contextSnapshot); const continuationAttempt = readNumber(context?.continuationAttempt) ?? readNumber(context?.livenessContinuationAttempt) ?? run.continuationAttempt ?? 0; const [commentStats] = await db .select({ count: sql`count(*)::int`, latestAt: sql`max(${issueComments.createdAt})`, }) .from(issueComments) .where( and( eq(issueComments.companyId, companyId), eq(issueComments.issueId, issueId), eq(issueComments.createdByRunId, run.id), ), ); const [documentStats] = await db .select({ count: sql`count(*)::int`, planCount: sql`count(*) filter (where ${issueDocuments.key} = 'plan')::int`, latestAt: sql`max(${documentRevisions.createdAt})`, }) .from(documentRevisions) .innerJoin(issueDocuments, eq(documentRevisions.documentId, issueDocuments.documentId)) .where( and( eq(documentRevisions.companyId, companyId), eq(documentRevisions.createdByRunId, run.id), eq(issueDocuments.companyId, companyId), eq(issueDocuments.issueId, issueId), sql`${issueDocuments.key} != ${ISSUE_CONTINUATION_SUMMARY_DOCUMENT_KEY}`, ), ); const [workProductStats] = await db .select({ count: sql`count(*)::int`, latestAt: sql`max(${issueWorkProducts.createdAt})`, }) .from(issueWorkProducts) .where( and( eq(issueWorkProducts.companyId, companyId), eq(issueWorkProducts.issueId, issueId), eq(issueWorkProducts.createdByRunId, run.id), ), ); const [workspaceOperationStats] = await db .select({ count: sql`count(*)::int`, latestAt: sql`max(${workspaceOperations.startedAt})`, }) .from(workspaceOperations) .where(and(eq(workspaceOperations.companyId, companyId), eq(workspaceOperations.heartbeatRunId, run.id))); const [activityStats] = await db .select({ count: sql`count(*)::int`, latestAt: sql`max(${activityLog.createdAt})`, }) .from(activityLog) .where(and(eq(activityLog.companyId, companyId), eq(activityLog.runId, run.id))); const [eventStats] = await db .select({ count: sql`count(*) filter (where ${heartbeatRunEvents.eventType} not in ('lifecycle', 'adapter.invoke', 'error'))::int`, latestAt: sql`max(${heartbeatRunEvents.createdAt}) filter (where ${heartbeatRunEvents.eventType} not in ('lifecycle', 'adapter.invoke', 'error'))`, }) .from(heartbeatRunEvents) .where(and(eq(heartbeatRunEvents.companyId, companyId), eq(heartbeatRunEvents.runId, run.id))); const classification = classifyRunLiveness({ runStatus: run.status, issue, resultJson: asRecord(run.resultJson), stdoutExcerpt: run.stdoutExcerpt, stderrExcerpt: run.stderrExcerpt, error: run.error, errorCode: run.errorCode, continuationAttempt, evidence: { issueCommentsCreated: countValue(commentStats?.count), documentRevisionsCreated: countValue(documentStats?.count), planDocumentRevisionsCreated: countValue(documentStats?.planCount), workProductsCreated: countValue(workProductStats?.count), workspaceOperationsCreated: countValue(workspaceOperationStats?.count), activityEventsCreated: countValue(activityStats?.count), toolOrActionEventsCreated: countValue(eventStats?.count), latestEvidenceAt: latestDate( commentStats?.latestAt, documentStats?.latestAt, workProductStats?.latestAt, workspaceOperationStats?.latestAt, activityStats?.latestAt, eventStats?.latestAt, ), }, }); await db .update(heartbeatRuns) .set({ livenessState: classification.livenessState, livenessReason: classification.livenessReason, continuationAttempt: classification.continuationAttempt, lastUsefulActionAt: classification.lastUsefulActionAt, nextAction: classification.nextAction, updatedAt: new Date(), }) .where(and(eq(heartbeatRuns.id, run.id), isNull(heartbeatRuns.livenessState))); } } function scheduleRunLivenessBackfill(companyId: string, issueId: string) { const key = `${companyId}:${issueId}`; if (scheduledLivenessBackfills.has(key)) return; scheduledLivenessBackfills.add(key); void backfillMissingRunLivenessForIssue(companyId, issueId) .catch((err: unknown) => { logger.warn({ err, companyId, issueId }, "run liveness backfill failed"); }) .finally(() => { scheduledLivenessBackfills.delete(key); }); } return { list: (filters: ActivityFilters) => { const conditions = [eq(activityLog.companyId, filters.companyId)]; const limit = normalizeActivityLimit(filters.limit); if (filters.agentId) { conditions.push(eq(activityLog.agentId, filters.agentId)); } if (filters.entityType) { conditions.push(eq(activityLog.entityType, filters.entityType)); } if (filters.entityId) { conditions.push(eq(activityLog.entityId, filters.entityId)); } return db .select({ activityLog }) .from(activityLog) .leftJoin( issues, and( eq(activityLog.entityType, sql`'issue'`), eq(activityLog.entityId, issueIdAsText), ), ) .where( and( ...conditions, or( sql`${activityLog.entityType} != 'issue'`, isNull(issues.hiddenAt), ), ), ) .orderBy(desc(activityLog.createdAt)) .limit(limit) .then((rows) => rows.map((r) => r.activityLog)); }, forIssue: (issueId: string) => db .select() .from(activityLog) .where( and( eq(activityLog.entityType, "issue"), eq(activityLog.entityId, issueId), ), ) .orderBy(desc(activityLog.createdAt)), runsForIssue: async (companyId: string, issueId: string) => { scheduleRunLivenessBackfill(companyId, issueId); const runs = await db .select({ runId: heartbeatRuns.id, status: heartbeatRuns.status, agentId: heartbeatRuns.agentId, adapterType: agents.adapterType, startedAt: heartbeatRuns.startedAt, finishedAt: heartbeatRuns.finishedAt, createdAt: heartbeatRuns.createdAt, invocationSource: heartbeatRuns.invocationSource, usageJson: summarizedUsageJson, resultJson: summarizedResultJson, logBytes: heartbeatRuns.logBytes, retryOfRunId: heartbeatRuns.retryOfRunId, scheduledRetryAt: heartbeatRuns.scheduledRetryAt, scheduledRetryAttempt: heartbeatRuns.scheduledRetryAttempt, scheduledRetryReason: heartbeatRuns.scheduledRetryReason, livenessState: heartbeatRuns.livenessState, livenessReason: heartbeatRuns.livenessReason, continuationAttempt: heartbeatRuns.continuationAttempt, lastUsefulActionAt: heartbeatRuns.lastUsefulActionAt, nextAction: heartbeatRuns.nextAction, }) .from(heartbeatRuns) .innerJoin( agents, and( eq(agents.id, heartbeatRuns.agentId), eq(agents.companyId, heartbeatRuns.companyId), ), ) .where( and( eq(heartbeatRuns.companyId, companyId), or( sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${issueId}`, sql`exists ( select 1 from ${activityLog} where ${activityLog.companyId} = ${companyId} and ${activityLog.entityType} = 'issue' and ${activityLog.entityId} = ${issueId} and ${activityLog.runId} = ${heartbeatRuns.id} )`, ), ), ) .orderBy(desc(heartbeatRuns.createdAt)); if (runs.length === 0) return runs; const exhaustionRows = await db .select({ runId: heartbeatRunEvents.runId, message: heartbeatRunEvents.message, }) .from(heartbeatRunEvents) .where( and( inArray(heartbeatRunEvents.runId, runs.map((run) => run.runId)), eq(heartbeatRunEvents.eventType, "lifecycle"), sql`${heartbeatRunEvents.message} like 'Bounded retry exhausted%'`, ), ) .orderBy(asc(heartbeatRunEvents.runId), desc(heartbeatRunEvents.id)); const retryExhaustedReasonByRunId = new Map(); for (const row of exhaustionRows) { if (!row.message || retryExhaustedReasonByRunId.has(row.runId)) continue; retryExhaustedReasonByRunId.set(row.runId, row.message); } return runs.map((run) => ({ ...run, retryExhaustedReason: retryExhaustedReasonByRunId.get(run.runId) ?? null, })); }, issuesForRun: async (runId: string) => { const run = await db .select({ companyId: heartbeatRuns.companyId, contextSnapshot: heartbeatRuns.contextSnapshot, }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); if (!run) return []; const fromActivity = await db .selectDistinctOn([issueIdAsText], { issueId: issues.id, identifier: issues.identifier, title: issues.title, status: issues.status, priority: issues.priority, }) .from(activityLog) .innerJoin(issues, eq(activityLog.entityId, issueIdAsText)) .where( and( eq(activityLog.companyId, run.companyId), eq(activityLog.runId, runId), eq(activityLog.entityType, "issue"), isNull(issues.hiddenAt), ), ) .orderBy(issueIdAsText); const context = run.contextSnapshot; const contextIssueId = context && typeof context === "object" && typeof (context as Record).issueId === "string" ? ((context as Record).issueId as string) : null; if (!contextIssueId) return fromActivity; if (fromActivity.some((issue) => issue.issueId === contextIssueId)) return fromActivity; const fromContext = await db .select({ issueId: issues.id, identifier: issues.identifier, title: issues.title, status: issues.status, priority: issues.priority, }) .from(issues) .where( and( eq(issues.companyId, run.companyId), eq(issues.id, contextIssueId), isNull(issues.hiddenAt), ), ) .then((rows) => rows[0] ?? null); if (!fromContext) return fromActivity; return [fromContext, ...fromActivity]; }, create: (data: typeof activityLog.$inferInsert) => db .insert(activityLog) .values(data) .returning() .then((rows) => rows[0]), }; }