diff --git a/apps/api/package.json b/apps/api/package.json index a7c8876..8c9b6be 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -24,8 +24,7 @@ "nodemailer": "^6.9.16", "stripe": "^22.0.0", "telnyx": "^1.23.0", - "uuid": "^11.0.5", - + "uuid": "^11.1.1", "zod": "^4.3.6" }, "devDependencies": { diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 1ed08f2..4dfdb8c 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -29,6 +29,7 @@ import { devRouter } from "./routes/dev.js"; import { adminSeedRouter } from "./routes/admin/seed.js"; import { startReminderScheduler } from "./services/reminders.js"; import { webhooksRouter } from "./routes/stripe-webhooks.js"; +import { telnyxWebhooksRouter } from "./routes/webhooks/telnyx.js"; const app = new Hono(); @@ -69,6 +70,9 @@ app.route("/api/portal", portalRouter); // Public Stripe webhook endpoint — signature-verified, no auth required app.route("/api/webhooks/stripe", webhooksRouter); +// Public Telnyx messaging webhook — signature-verified, no auth required +app.route("/api/webhooks/telnyx", telnyxWebhooksRouter); + // Dev/demo routes — config is always public, users endpoint is guarded internally app.route("/api/dev", devRouter); diff --git a/apps/api/src/routes/webhooks/telnyx.ts b/apps/api/src/routes/webhooks/telnyx.ts new file mode 100644 index 0000000..369461d --- /dev/null +++ b/apps/api/src/routes/webhooks/telnyx.ts @@ -0,0 +1,59 @@ +import { Hono } from "hono"; +import { validateTelnyxSignature } from "../../services/sms.js"; +import { + handleMessageReceived, + handleMessageFinalized, + TelnyxMessageReceivedPayload, +} from "../../services/messaging/inbound.js"; + +export const telnyxWebhooksRouter = new Hono(); + +telnyxWebhooksRouter.post("/messaging", async (c) => { + const signature = c.req.header("telnyx-signature"); + + let rawBody: string; + try { + rawBody = await c.req.text(); + } catch { + return c.json({ error: "Could not read body" }, 400); + } + + if (!validateTelnyxSignature(rawBody, signature)) { + return c.json({ error: "Invalid signature" }, 401); + } + + let payload: TelnyxMessageReceivedPayload; + try { + payload = JSON.parse(rawBody) as TelnyxMessageReceivedPayload; + } catch { + return c.json({ error: "Invalid JSON" }, 400); + } + + const eventType = payload.data?.event_type; + if (!eventType) { + return c.json({ error: "Missing event_type" }, 400); + } + + if (eventType === "message.received") { + try { + await handleMessageReceived(payload); + } catch (err) { + const msg = err instanceof Error ? err.message : "Unknown error"; + if (msg.startsWith("No business owns")) { + return c.json({ error: "Unknown messaging number" }, 404); + } + return c.json({ error: msg }, 500); + } + return c.json({ received: true }); + } + + if (eventType === "message.finalized") { + const result = await handleMessageFinalized(payload); + if (result) { + return c.json({ received: true, messageId: result.messageId, status: result.newStatus }); + } + return c.json({ received: true, messageId: null }); + } + + return c.json({ received: true }); +}); diff --git a/apps/api/src/services/messaging/__tests__/inbound.test.ts b/apps/api/src/services/messaging/__tests__/inbound.test.ts new file mode 100644 index 0000000..b905824 --- /dev/null +++ b/apps/api/src/services/messaging/__tests__/inbound.test.ts @@ -0,0 +1,313 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { + findOrCreateConversation, + upsertMessage, + handleMessageReceived, + handleMessageFinalized, + TelnyxMessageReceivedPayload, +} from "../inbound.js"; +import * as schema from "@groombook/db"; + +vi.mock("@groombook/db", () => ({ + getDb: vi.fn(), + conversations: { id: "", businessId: "", clientId: "", externalNumber: "", businessNumber: "", channel: "", lastMessageAt: null, status: "", createdAt: null, updatedAt: null }, + messages: { id: "", conversationId: "", direction: "", body: "", status: "", providerMessageId: "", sentByStaffId: null, createdAt: null, deliveredAt: null, readByClientAt: null }, + businessSettings: { id: "", messagingPhoneNumber: "" }, + clients: { id: "", name: "", email: "", phone: "", status: "" }, + eq: vi.fn(), + and: vi.fn(), + sql: vi.fn(), +})); + +const mockDb = { + select: vi.fn().mockReturnThis(), + from: vi.fn().mockReturnThis(), + where: vi.fn().mockReturnThis(), + limit: vi.fn().mockReturnThis(), + insert: vi.fn().mockReturnThis(), + update: vi.fn().mockReturnThis(), + returning: vi.fn().mockReturnThis(), +}; + +vi.mocked(schema.getDb).mockReturnValue(mockDb as unknown as ReturnType); + +const makePayload = ( + eventType: "message.received" | "message.sent" | "message.finalized", + messageId: string, + fromPhone: string, + toPhone: string, + body = "Hello" +): TelnyxMessageReceivedPayload => ({ + data: { + id: "evt-1", + event_type: eventType, + payload: { + message: { + id: messageId, + from: { phone: fromPhone, carrier: "carrier" }, + to: [{ phone: toPhone }], + body, + }, + }, + }, +}); + +describe("signature validation via route", () => { + beforeEach(() => { + vi.resetModules(); + }); + + it("returns 401 when telnyx-signature header is missing", async () => { + const { telnyxWebhooksRouter } = await import("../../../routes/webhooks/telnyx.js"); + const payload = JSON.stringify(makePayload("message.received", "msg-123", "+1555111", "+1555222")); + const req = new Request("http://localhost/messaging", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: payload, + }); + const res = await telnyxWebhooksRouter.fetch(req); + expect(res.status).toBe(401); + }); + + it("returns 401 when signature does not match", async () => { + process.env.TELNYX_WEBHOOK_SECRET = "test-secret"; + const { telnyxWebhooksRouter } = await import("../../../routes/webhooks/telnyx.js"); + const payload = JSON.stringify(makePayload("message.received", "msg-123", "+1555111", "+1555222")); + const req = new Request("http://localhost/messaging", { + method: "POST", + headers: { + "Content-Type": "application/json", + "telnyx-signature": "sha256=bad", + }, + body: payload, + }); + const res = await telnyxWebhooksRouter.fetch(req); + expect(res.status).toBe(401); + }); +}); + +describe("findOrCreateConversation", () => { + beforeEach(() => { + vi.clearAllMocks(); + mockDb.select.mockReset(); + mockDb.from.mockReset(); + mockDb.where.mockReset(); + mockDb.limit.mockReset(); + mockDb.insert.mockReset(); + mockDb.update.mockReset(); + mockDb.returning.mockReset(); + }); + + it("returns existing conversation when found", async () => { + mockDb.select.mockReturnValue({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue([{ id: "conv-1", clientId: "client-1" }]), + }), + }), + }); + + const result = await findOrCreateConversation("biz-1", "+1555111", "+1555222"); + expect(result.id).toBe("conv-1"); + }); + + it("creates new conversation when none exists", async () => { + mockDb.select.mockReturnValue({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue([]), + }), + }), + }); + mockDb.insert.mockReturnValue({ + values: vi.fn().mockReturnValue({ + returning: vi.fn().mockReturnValue([{ id: "conv-2", clientId: "client-2" }]), + }), + }); + + const result = await findOrCreateConversation("biz-1", "+1555111", "+1555222"); + expect(result.id).toBe("conv-2"); + }); + + it("creates placeholder client for unknown phone then creates conversation", async () => { + mockDb.select + .mockReturnValueOnce({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue([]), + }), + }), + }) + .mockReturnValueOnce({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue([]), + }), + }), + }); + mockDb.insert.mockReturnValue({ + values: vi.fn().mockReturnValue({ + returning: vi.fn().mockReturnValue([{ id: "conv-3", clientId: "client-3" }]), + }), + }); + + const result = await findOrCreateConversation("biz-1", "+1555111", "+1555222"); + expect(result.id).toBe("conv-3"); + expect(result.clientId).toBe("client-3"); + }); +}); + +describe("upsertMessage", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("returns isNew=false when message with providerMessageId already exists", async () => { + mockDb.select.mockReturnValue({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue([{ id: "msg-existing" }]), + }), + }), + }); + + const result = await upsertMessage("msg-123", "conv-1", "inbound", "Hello", "received"); + expect(result.isNew).toBe(false); + expect(result.id).toBe("msg-existing"); + }); + + it("inserts new message and returns isNew=true", async () => { + mockDb.select.mockReturnValue({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue([]), + }), + }), + }); + mockDb.insert.mockReturnValue({ + values: vi.fn().mockReturnValue({ + returning: vi.fn().mockReturnValue([{ id: "msg-new" }]), + }), + }); + + const result = await upsertMessage("msg-new-123", "conv-1", "inbound", "New message", "queued"); + expect(result.isNew).toBe(true); + expect(result.id).toBe("msg-new"); + }); +}); + +describe("handleMessageReceived", () => { + beforeEach(() => { + vi.clearAllMocks(); + mockDb.select.mockReset(); + mockDb.from.mockReset(); + mockDb.where.mockReset(); + mockDb.limit.mockReset(); + mockDb.insert.mockReset(); + mockDb.update.mockReset(); + mockDb.returning.mockReset(); + mockDb.select.mockImplementation(() => ({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue([]), + }), + }), + })); + }); + + it("returns 404 when no business owns the to number", async () => { + const payload = makePayload("message.received", "msg-123", "+1555111", "+1555000"); + await expect(handleMessageReceived(payload)).rejects.toThrow("No business owns messaging number"); + }); + + it("creates conversation and message for valid inbound", async () => { + mockDb.select + .mockReturnValueOnce({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue([{ id: "biz-1" }]), + }), + }), + }) + .mockReturnValueOnce({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue([]), + }), + }), + }); + mockDb.insert + .mockReturnValueOnce({ + values: vi.fn().mockReturnValue({ + returning: vi.fn().mockReturnValue([{ id: "client-new" }]), + }), + }) + .mockReturnValueOnce({ + values: vi.fn().mockReturnValue({ + returning: vi.fn().mockReturnValue([{ id: "conv-new", clientId: "client-new" }]), + }), + }); + mockDb.update.mockReturnValueOnce({ + set: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({}), + }), + }); + mockDb.insert.mockReturnValueOnce({ + values: vi.fn().mockReturnValue({ + returning: vi.fn().mockReturnValue([{ id: "msg-new" }]), + }), + }); + + const payload = makePayload("message.received", "msg-abc", "+1555111", "+1555222", "Test message"); + const result = await handleMessageReceived(payload); + expect(result.messageId).toBe("msg-new"); + }); +}); + +describe("handleMessageFinalized", () => { + beforeEach(() => { + vi.clearAllMocks(); + mockDb.select.mockReset(); + mockDb.from.mockReset(); + mockDb.where.mockReset(); + mockDb.limit.mockReset(); + mockDb.insert.mockReset(); + mockDb.update.mockReset(); + mockDb.returning.mockReset(); + }); + + it("returns null when message not found", async () => { + mockDb.select.mockReturnValue({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue([]), + }), + }), + }); + + const payload = makePayload("message.finalized", "msg-unknown", "+1555111", "+1555222"); + const result = await handleMessageFinalized(payload); + expect(result).toBeNull(); + }); + + it("updates status to delivered for finalized inbound", async () => { + mockDb.select.mockReturnValue({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue([{ id: "msg-1", status: "sent" }]), + }), + }), + }); + mockDb.update.mockReturnValue({ + set: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + returning: vi.fn().mockReturnValue([{ id: "msg-1" }]), + }), + }), + }); + + const payload = makePayload("message.finalized", "msg-1", "+1555111", "+1555222"); + const result = await handleMessageFinalized(payload); + expect(result?.newStatus).toBe("delivered"); + }); +}); diff --git a/apps/api/src/services/messaging/inbound.ts b/apps/api/src/services/messaging/inbound.ts new file mode 100644 index 0000000..de9d0e4 --- /dev/null +++ b/apps/api/src/services/messaging/inbound.ts @@ -0,0 +1,200 @@ +import { getDb, conversations, messages, businessSettings, clients, eq, and } from "@groombook/db"; +import { v4 as uuidv4 } from "uuid"; + +export interface TelnyxMessageReceivedPayload { + data: { + id: string; + event_type: "message.received" | "message.sent" | "message.finalized"; + payload: { + message: { + id: string; + from: { phone: string; carrier?: string }; + to: { phone: string }[]; + body: string; + media?: Array<{ type: string; url: string }>; + }; + recording?: unknown; + leg_count?: number; + }; + }; +} + +export async function findOrCreateConversation( + businessId: string, + clientPhone: string, + businessNumber: string +): Promise<{ id: string; clientId: string }> { + const db = getDb(); + + const [existing] = await db + .select({ id: conversations.id, clientId: conversations.clientId }) + .from(conversations) + .where( + and( + eq(conversations.businessId, businessId), + eq(conversations.externalNumber, clientPhone), + eq(conversations.businessNumber, businessNumber) + ) + ) + .limit(1); + + if (existing) { + return { id: existing.id, clientId: existing.clientId }; + } + + const [existingClient] = await db + .select({ id: clients.id }) + .from(clients) + .where(eq(clients.phone, clientPhone)) + .limit(1); + + const clientId = existingClient?.id ?? uuidv4(); + + if (!existingClient) { + await db.insert(clients).values({ + id: clientId, + name: clientPhone, + email: `sms-${uuidv4()}@placeholder.local`, + phone: clientPhone, + status: "active", + }); + } + + const [created] = await db + .insert(conversations) + .values({ + id: crypto.randomUUID(), + businessId, + clientId, + channel: "sms", + externalNumber: clientPhone, + businessNumber, + lastMessageAt: new Date(), + status: "active", + }) + .returning({ id: conversations.id, clientId: conversations.clientId }); + + if (!created) throw new Error("Failed to create conversation"); + + return { id: created.id, clientId: created.clientId }; +} + +export async function upsertMessage( + providerMessageId: string, + conversationId: string, + direction: "inbound" | "outbound", + body: string, + status: "queued" | "sent" | "delivered" | "failed" | "received", + sentByStaffId?: string +): Promise<{ id: string; isNew: boolean }> { + const db = getDb(); + + const [existing] = await db + .select({ id: messages.id }) + .from(messages) + .where(eq(messages.providerMessageId, providerMessageId)) + .limit(1); + + if (existing) { + return { id: existing.id, isNew: false }; + } + + try { + const [inserted] = await db + .insert(messages) + .values({ + id: crypto.randomUUID(), + conversationId, + direction, + body, + status, + providerMessageId, + sentByStaffId: sentByStaffId ?? null, + }) + .returning({ id: messages.id }); + + if (!inserted) throw new Error("Failed to insert message"); + return { id: inserted.id, isNew: true }; + } catch (err) { + if (err instanceof Error && err.message.includes("unique")) { + const [existing] = await db + .select({ id: messages.id }) + .from(messages) + .where(eq(messages.providerMessageId, providerMessageId)) + .limit(1); + if (existing) return { id: existing.id, isNew: false }; + } + throw err; + } +} + +export async function resolveBusinessIdByMessagingNumber(toNumber: string): Promise { + const db = getDb(); + const [settings] = await db + .select({ id: businessSettings.id }) + .from(businessSettings) + .where(eq(businessSettings.messagingPhoneNumber, toNumber)) + .limit(1); + return settings?.id ?? null; +} + +export async function handleMessageReceived(payload: TelnyxMessageReceivedPayload): Promise<{ conversationId: string; messageId: string }> { + const { message } = payload.data.payload; + const fromPhone = message.from.phone; + const toPhone = message.to[0]?.phone; + + if (!toPhone) { + throw new Error("No recipient phone in payload"); + } + + const businessId = await resolveBusinessIdByMessagingNumber(toPhone); + if (!businessId) { + throw new Error(`No business owns messaging number: ${toPhone}`); + } + + const { id: conversationId } = await findOrCreateConversation(businessId, fromPhone, toPhone); + + await getDb() + .update(conversations) + .set({ lastMessageAt: new Date(), updatedAt: new Date() }) + .where(eq(conversations.id, conversationId)); + + const { id: messageId } = await upsertMessage( + message.id, + conversationId, + "inbound", + message.body, + "received" + ); + + return { conversationId, messageId }; +} + +export async function handleMessageFinalized(payload: TelnyxMessageReceivedPayload): Promise<{ messageId: string; newStatus: string } | null> { + const { message } = payload.data.payload; + + if (!message.id) return null; + + const db = getDb(); + const [existing] = await db + .select({ id: messages.id, status: messages.status }) + .from(messages) + .where(eq(messages.providerMessageId, message.id)) + .limit(1); + + if (!existing) return null; + + let newStatus = existing.status; + if (payload.data.event_type === "message.finalized") { + newStatus = "delivered"; + } + + if (newStatus !== existing.status) { + await db + .update(messages) + .set({ status: newStatus, deliveredAt: new Date() }) + .where(eq(messages.id, existing.id)); + } + + return { messageId: existing.id, newStatus }; +} diff --git a/apps/api/src/services/sms.ts b/apps/api/src/services/sms.ts index 5be4009..209e5e2 100644 --- a/apps/api/src/services/sms.ts +++ b/apps/api/src/services/sms.ts @@ -32,6 +32,35 @@ function isE164(phone: string): boolean { return /^\+[1-9]\d{7,14}$/.test(phone); } +export function validateTelnyxSignature( + rawBody: string, + signature: string | undefined | null +): boolean { + if (!signature) return false; + const secret = process.env.TELNYX_WEBHOOK_SECRET; + if (!secret) return false; + + try { + const hmac = createHmac("sha256", secret); + const expected = `sha256=${hmac.update(rawBody).digest("hex")}`; + + const sigBuf = Buffer.from(signature); + const expBuf = Buffer.from(expected); + + if (sigBuf.length !== expBuf.length) return false; + + let diff = 0; + for (let i = 0; i < sigBuf.length; i++) { + const sigByte = sigBuf[i] ?? 0; + const expByte = expBuf[i] ?? 0; + diff |= sigByte ^ expByte; + } + return diff === 0; + } catch { + return false; + } +} + export async function sendSms( to: string, body: string, @@ -74,33 +103,7 @@ export class TelnyxProvider implements SmsProvider { } validateWebhookSignature(req: Request): boolean { - const secret = process.env.TELNYX_WEBHOOK_SECRET; - if (!secret) return false; - - const signature = req.headers.get("telnyx-signature"); - if (!signature) return false; - - const payload = JSON.stringify(req.body); - - try { - const hmac = createHmac("sha256", secret); - const expected = `sha256=${hmac.update(payload).digest("hex")}`; - - const sigBuf = Buffer.from(signature); - const expBuf = Buffer.from(expected); - - if (sigBuf.length !== expBuf.length) return false; - - let diff = 0; - for (let i = 0; i < sigBuf.length; i++) { - const sigByte = sigBuf[i] ?? 0; - const expByte = expBuf[i] ?? 0; - diff |= sigByte ^ expByte; - } - return diff === 0; - } catch { - return false; - } + return validateTelnyxSignature(JSON.stringify(req.body), req.headers.get("telnyx-signature")); } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f586e98..3f69c84 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -47,7 +47,7 @@ importers: specifier: ^1.23.0 version: 1.27.0 uuid: - specifier: ^11.0.5 + specifier: ^11.1.1 version: 11.1.1 zod: specifier: ^4.3.6