forked from farhoodlabs/paperclip
Add blocker relations and dependency wakeups
Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -13,6 +13,7 @@ import {
|
||||
issueAttachments,
|
||||
issueInboxArchives,
|
||||
issueLabels,
|
||||
issueRelations,
|
||||
issueComments,
|
||||
issueDocuments,
|
||||
issueReadStates,
|
||||
@@ -21,6 +22,7 @@ import {
|
||||
projectWorkspaces,
|
||||
projects,
|
||||
} from "@paperclipai/db";
|
||||
import type { IssueRelationIssueSummary } from "@paperclipai/shared";
|
||||
import { extractAgentMentionIds, extractProjectMentionIds, isUuidLike } from "@paperclipai/shared";
|
||||
import { conflict, notFound, unprocessable } from "../errors.js";
|
||||
import {
|
||||
@@ -114,8 +116,13 @@ type ProjectGoalReader = Pick<Db, "select">;
|
||||
type DbReader = Pick<Db, "select">;
|
||||
type IssueCreateInput = Omit<typeof issues.$inferInsert, "companyId"> & {
|
||||
labelIds?: string[];
|
||||
blockedByIssueIds?: string[];
|
||||
inheritExecutionWorkspaceFromIssueId?: string | null;
|
||||
};
|
||||
type IssueRelationSummaryMap = {
|
||||
blockedBy: IssueRelationIssueSummary[];
|
||||
blocks: IssueRelationIssueSummary[];
|
||||
};
|
||||
|
||||
function sameRunLock(checkoutRunId: string | null, actorRunId: string | null) {
|
||||
if (actorRunId) return checkoutRunId === actorRunId;
|
||||
@@ -675,6 +682,177 @@ export function issueService(db: Db) {
|
||||
);
|
||||
}
|
||||
|
||||
async function getIssueRelationSummaryMap(
|
||||
companyId: string,
|
||||
issueIds: string[],
|
||||
dbOrTx: DbReader = db,
|
||||
): Promise<Map<string, IssueRelationSummaryMap>> {
|
||||
const uniqueIssueIds = [...new Set(issueIds)];
|
||||
const empty = new Map<string, IssueRelationSummaryMap>();
|
||||
for (const issueId of uniqueIssueIds) {
|
||||
empty.set(issueId, { blockedBy: [], blocks: [] });
|
||||
}
|
||||
if (uniqueIssueIds.length === 0) return empty;
|
||||
|
||||
const [blockedByRows, blockingRows] = await Promise.all([
|
||||
dbOrTx
|
||||
.select({
|
||||
currentIssueId: issueRelations.relatedIssueId,
|
||||
relatedId: issues.id,
|
||||
identifier: issues.identifier,
|
||||
title: issues.title,
|
||||
status: issues.status,
|
||||
priority: issues.priority,
|
||||
assigneeAgentId: issues.assigneeAgentId,
|
||||
assigneeUserId: issues.assigneeUserId,
|
||||
})
|
||||
.from(issueRelations)
|
||||
.innerJoin(issues, eq(issueRelations.issueId, issues.id))
|
||||
.where(
|
||||
and(
|
||||
eq(issueRelations.companyId, companyId),
|
||||
eq(issueRelations.type, "blocks"),
|
||||
inArray(issueRelations.relatedIssueId, uniqueIssueIds),
|
||||
),
|
||||
),
|
||||
dbOrTx
|
||||
.select({
|
||||
currentIssueId: issueRelations.issueId,
|
||||
relatedId: issues.id,
|
||||
identifier: issues.identifier,
|
||||
title: issues.title,
|
||||
status: issues.status,
|
||||
priority: issues.priority,
|
||||
assigneeAgentId: issues.assigneeAgentId,
|
||||
assigneeUserId: issues.assigneeUserId,
|
||||
})
|
||||
.from(issueRelations)
|
||||
.innerJoin(issues, eq(issueRelations.relatedIssueId, issues.id))
|
||||
.where(
|
||||
and(
|
||||
eq(issueRelations.companyId, companyId),
|
||||
eq(issueRelations.type, "blocks"),
|
||||
inArray(issueRelations.issueId, uniqueIssueIds),
|
||||
),
|
||||
),
|
||||
]);
|
||||
|
||||
for (const row of blockedByRows) {
|
||||
empty.get(row.currentIssueId)?.blockedBy.push({
|
||||
id: row.relatedId,
|
||||
identifier: row.identifier,
|
||||
title: row.title,
|
||||
status: row.status as IssueRelationIssueSummary["status"],
|
||||
priority: row.priority as IssueRelationIssueSummary["priority"],
|
||||
assigneeAgentId: row.assigneeAgentId,
|
||||
assigneeUserId: row.assigneeUserId,
|
||||
});
|
||||
}
|
||||
for (const row of blockingRows) {
|
||||
empty.get(row.currentIssueId)?.blocks.push({
|
||||
id: row.relatedId,
|
||||
identifier: row.identifier,
|
||||
title: row.title,
|
||||
status: row.status as IssueRelationIssueSummary["status"],
|
||||
priority: row.priority as IssueRelationIssueSummary["priority"],
|
||||
assigneeAgentId: row.assigneeAgentId,
|
||||
assigneeUserId: row.assigneeUserId,
|
||||
});
|
||||
}
|
||||
|
||||
for (const relations of empty.values()) {
|
||||
relations.blockedBy.sort((a, b) => a.title.localeCompare(b.title));
|
||||
relations.blocks.sort((a, b) => a.title.localeCompare(b.title));
|
||||
}
|
||||
|
||||
return empty;
|
||||
}
|
||||
|
||||
async function assertNoBlockingCycles(
|
||||
companyId: string,
|
||||
issueId: string,
|
||||
blockerIssueIds: string[],
|
||||
dbOrTx: DbReader = db,
|
||||
) {
|
||||
if (blockerIssueIds.length === 0) return;
|
||||
|
||||
const rows = await dbOrTx
|
||||
.select({
|
||||
blockerIssueId: issueRelations.issueId,
|
||||
blockedIssueId: issueRelations.relatedIssueId,
|
||||
})
|
||||
.from(issueRelations)
|
||||
.where(and(eq(issueRelations.companyId, companyId), eq(issueRelations.type, "blocks")));
|
||||
|
||||
const adjacency = new Map<string, string[]>();
|
||||
for (const row of rows) {
|
||||
const list = adjacency.get(row.blockerIssueId) ?? [];
|
||||
list.push(row.blockedIssueId);
|
||||
adjacency.set(row.blockerIssueId, list);
|
||||
}
|
||||
|
||||
for (const blockerIssueId of blockerIssueIds) {
|
||||
const queue = [...(adjacency.get(issueId) ?? [])];
|
||||
const visited = new Set<string>([issueId]);
|
||||
while (queue.length > 0) {
|
||||
const current = queue.shift()!;
|
||||
if (current === blockerIssueId) {
|
||||
throw unprocessable("Blocking relations cannot contain cycles");
|
||||
}
|
||||
if (visited.has(current)) continue;
|
||||
visited.add(current);
|
||||
queue.push(...(adjacency.get(current) ?? []));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function syncBlockedByIssueIds(
|
||||
issueId: string,
|
||||
companyId: string,
|
||||
blockedByIssueIds: string[],
|
||||
actor: { agentId?: string | null; userId?: string | null } = {},
|
||||
dbOrTx: any = db,
|
||||
) {
|
||||
const deduped = [...new Set(blockedByIssueIds)];
|
||||
if (deduped.some((candidate) => candidate === issueId)) {
|
||||
throw unprocessable("Issue cannot be blocked by itself");
|
||||
}
|
||||
|
||||
if (deduped.length > 0) {
|
||||
const relatedIssues = await dbOrTx
|
||||
.select({ id: issues.id })
|
||||
.from(issues)
|
||||
.where(and(eq(issues.companyId, companyId), inArray(issues.id, deduped)));
|
||||
if (relatedIssues.length !== deduped.length) {
|
||||
throw unprocessable("Blocked-by issues must belong to the same company");
|
||||
}
|
||||
await assertNoBlockingCycles(companyId, issueId, deduped, dbOrTx);
|
||||
}
|
||||
|
||||
await dbOrTx
|
||||
.delete(issueRelations)
|
||||
.where(
|
||||
and(
|
||||
eq(issueRelations.companyId, companyId),
|
||||
eq(issueRelations.relatedIssueId, issueId),
|
||||
eq(issueRelations.type, "blocks"),
|
||||
),
|
||||
);
|
||||
|
||||
if (deduped.length === 0) return;
|
||||
|
||||
await dbOrTx.insert(issueRelations).values(
|
||||
deduped.map((blockerIssueId) => ({
|
||||
companyId,
|
||||
issueId: blockerIssueId,
|
||||
relatedIssueId: issueId,
|
||||
type: "blocks",
|
||||
createdByAgentId: actor.agentId ?? null,
|
||||
createdByUserId: actor.userId ?? null,
|
||||
})),
|
||||
);
|
||||
}
|
||||
|
||||
async function isTerminalOrMissingHeartbeatRun(runId: string) {
|
||||
const run = await db
|
||||
.select({ status: heartbeatRuns.status })
|
||||
@@ -1076,11 +1254,125 @@ export function issueService(db: Db) {
|
||||
return getIssueByIdentifier(identifier);
|
||||
},
|
||||
|
||||
getRelationSummaries: async (issueId: string) => {
|
||||
const issue = await db
|
||||
.select({ id: issues.id, companyId: issues.companyId })
|
||||
.from(issues)
|
||||
.where(eq(issues.id, issueId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!issue) throw notFound("Issue not found");
|
||||
const relations = await getIssueRelationSummaryMap(issue.companyId, [issueId], db);
|
||||
return relations.get(issueId) ?? { blockedBy: [], blocks: [] };
|
||||
},
|
||||
|
||||
listWakeableBlockedDependents: async (blockerIssueId: string) => {
|
||||
const blockerIssue = await db
|
||||
.select({ id: issues.id, companyId: issues.companyId })
|
||||
.from(issues)
|
||||
.where(eq(issues.id, blockerIssueId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!blockerIssue) return [];
|
||||
|
||||
const candidates = await db
|
||||
.select({
|
||||
id: issues.id,
|
||||
assigneeAgentId: issues.assigneeAgentId,
|
||||
status: issues.status,
|
||||
})
|
||||
.from(issueRelations)
|
||||
.innerJoin(issues, eq(issueRelations.relatedIssueId, issues.id))
|
||||
.where(
|
||||
and(
|
||||
eq(issueRelations.companyId, blockerIssue.companyId),
|
||||
eq(issueRelations.type, "blocks"),
|
||||
eq(issueRelations.issueId, blockerIssueId),
|
||||
),
|
||||
);
|
||||
if (candidates.length === 0) return [];
|
||||
|
||||
const candidateIds = candidates.map((candidate) => candidate.id);
|
||||
const blockerRows = await db
|
||||
.select({
|
||||
issueId: issueRelations.relatedIssueId,
|
||||
blockerIssueId: issueRelations.issueId,
|
||||
blockerStatus: issues.status,
|
||||
})
|
||||
.from(issueRelations)
|
||||
.innerJoin(issues, eq(issueRelations.issueId, issues.id))
|
||||
.where(
|
||||
and(
|
||||
eq(issueRelations.companyId, blockerIssue.companyId),
|
||||
eq(issueRelations.type, "blocks"),
|
||||
inArray(issueRelations.relatedIssueId, candidateIds),
|
||||
),
|
||||
);
|
||||
|
||||
const blockersByIssueId = new Map<string, Array<{ blockerIssueId: string; blockerStatus: string }>>();
|
||||
for (const row of blockerRows) {
|
||||
const list = blockersByIssueId.get(row.issueId) ?? [];
|
||||
list.push({ blockerIssueId: row.blockerIssueId, blockerStatus: row.blockerStatus });
|
||||
blockersByIssueId.set(row.issueId, list);
|
||||
}
|
||||
|
||||
return candidates
|
||||
.filter((candidate) => candidate.assigneeAgentId && !["backlog", "done", "cancelled"].includes(candidate.status))
|
||||
.map((candidate) => {
|
||||
const blockers = blockersByIssueId.get(candidate.id) ?? [];
|
||||
return {
|
||||
...candidate,
|
||||
blockerIssueIds: blockers.map((blocker) => blocker.blockerIssueId),
|
||||
allBlockersDone: blockers.length > 0 && blockers.every((blocker) => blocker.blockerStatus === "done"),
|
||||
};
|
||||
})
|
||||
.filter((candidate) => candidate.allBlockersDone)
|
||||
.map((candidate) => ({
|
||||
id: candidate.id,
|
||||
assigneeAgentId: candidate.assigneeAgentId!,
|
||||
blockerIssueIds: candidate.blockerIssueIds,
|
||||
}));
|
||||
},
|
||||
|
||||
getWakeableParentAfterChildCompletion: async (parentIssueId: string) => {
|
||||
const parent = await db
|
||||
.select({
|
||||
id: issues.id,
|
||||
assigneeAgentId: issues.assigneeAgentId,
|
||||
status: issues.status,
|
||||
companyId: issues.companyId,
|
||||
})
|
||||
.from(issues)
|
||||
.where(eq(issues.id, parentIssueId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!parent || !parent.assigneeAgentId || ["backlog", "done", "cancelled"].includes(parent.status)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const children = await db
|
||||
.select({ id: issues.id, status: issues.status })
|
||||
.from(issues)
|
||||
.where(and(eq(issues.companyId, parent.companyId), eq(issues.parentId, parentIssueId)));
|
||||
if (children.length === 0) return null;
|
||||
if (!children.every((child) => child.status === "done" || child.status === "cancelled")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
id: parent.id,
|
||||
assigneeAgentId: parent.assigneeAgentId,
|
||||
childIssueIds: children.map((child) => child.id),
|
||||
};
|
||||
},
|
||||
|
||||
create: async (
|
||||
companyId: string,
|
||||
data: IssueCreateInput,
|
||||
) => {
|
||||
const { labelIds: inputLabelIds, inheritExecutionWorkspaceFromIssueId, ...issueData } = data;
|
||||
const {
|
||||
labelIds: inputLabelIds,
|
||||
blockedByIssueIds,
|
||||
inheritExecutionWorkspaceFromIssueId,
|
||||
...issueData
|
||||
} = data;
|
||||
const isolatedWorkspacesEnabled = (await instanceSettings.getExperimental()).enableIsolatedWorkspaces;
|
||||
if (!isolatedWorkspacesEnabled) {
|
||||
delete issueData.executionWorkspaceId;
|
||||
@@ -1223,12 +1515,32 @@ export function issueService(db: Db) {
|
||||
if (inputLabelIds) {
|
||||
await syncIssueLabels(issue.id, companyId, inputLabelIds, tx);
|
||||
}
|
||||
if (blockedByIssueIds !== undefined) {
|
||||
await syncBlockedByIssueIds(
|
||||
issue.id,
|
||||
companyId,
|
||||
blockedByIssueIds,
|
||||
{
|
||||
agentId: issueData.createdByAgentId ?? null,
|
||||
userId: issueData.createdByUserId ?? null,
|
||||
},
|
||||
tx,
|
||||
);
|
||||
}
|
||||
const [enriched] = await withIssueLabels(tx, [issue]);
|
||||
return enriched;
|
||||
});
|
||||
},
|
||||
|
||||
update: async (id: string, data: Partial<typeof issues.$inferInsert> & { labelIds?: string[] }) => {
|
||||
update: async (
|
||||
id: string,
|
||||
data: Partial<typeof issues.$inferInsert> & {
|
||||
labelIds?: string[];
|
||||
blockedByIssueIds?: string[];
|
||||
actorAgentId?: string | null;
|
||||
actorUserId?: string | null;
|
||||
},
|
||||
) => {
|
||||
const existing = await db
|
||||
.select()
|
||||
.from(issues)
|
||||
@@ -1236,7 +1548,13 @@ export function issueService(db: Db) {
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!existing) return null;
|
||||
|
||||
const { labelIds: nextLabelIds, ...issueData } = data;
|
||||
const {
|
||||
labelIds: nextLabelIds,
|
||||
blockedByIssueIds,
|
||||
actorAgentId,
|
||||
actorUserId,
|
||||
...issueData
|
||||
} = data;
|
||||
const isolatedWorkspacesEnabled = (await instanceSettings.getExperimental()).enableIsolatedWorkspaces;
|
||||
if (!isolatedWorkspacesEnabled) {
|
||||
delete issueData.executionWorkspaceId;
|
||||
@@ -1328,6 +1646,18 @@ export function issueService(db: Db) {
|
||||
if (nextLabelIds !== undefined) {
|
||||
await syncIssueLabels(updated.id, existing.companyId, nextLabelIds, tx);
|
||||
}
|
||||
if (blockedByIssueIds !== undefined) {
|
||||
await syncBlockedByIssueIds(
|
||||
updated.id,
|
||||
existing.companyId,
|
||||
blockedByIssueIds,
|
||||
{
|
||||
agentId: actorAgentId ?? null,
|
||||
userId: actorUserId ?? null,
|
||||
},
|
||||
tx,
|
||||
);
|
||||
}
|
||||
const [enriched] = await withIssueLabels(tx, [updated]);
|
||||
return enriched;
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user