From f36429936a91333a480ab3f42af3cbc2ddf1b220 Mon Sep 17 00:00:00 2001 From: Barcode Betty Date: Fri, 3 Apr 2026 07:41:00 +0000 Subject: [PATCH] sync(receiptwitness): copy latest standalone code to monorepo Syncs receiptwitness standalone repo code into monorepo subdirectory. Includes email parsing, notifications, queue, and worker modules. Keeps monorepo Dockerfile (uses local common/). Co-Authored-By: Paperclip --- receiptwitness/pyproject.toml | 5 + .../src/receiptwitness/api/routes.py | 54 +++- receiptwitness/src/receiptwitness/config.py | 8 + receiptwitness/src/receiptwitness/events.py | 54 +++- .../receiptwitness/notifications/__init__.py | 0 .../src/receiptwitness/notifications/email.py | 45 +++ .../receiptwitness/parsers/email/__init__.py | 1 + .../src/receiptwitness/parsers/email/base.py | 32 +++ .../receiptwitness/parsers/email/detector.py | 25 ++ .../receiptwitness/parsers/email/kroger.py | 157 +++++++++++ .../receiptwitness/parsers/email/meijer.py | 259 ++++++++++++++++++ .../receiptwitness/parsers/email/target.py | 156 +++++++++++ .../src/receiptwitness/queue/__init__.py | 1 + .../src/receiptwitness/queue/email.py | 77 ++++++ .../src/receiptwitness/worker/__init__.py | 1 + .../src/receiptwitness/worker/email_worker.py | 104 +++++++ .../tests/fixtures/kroger_email_receipt.html | 45 +++ .../tests/fixtures/meijer_email_receipt.html | 127 +++++++++ .../tests/fixtures/target_email_receipt.html | 44 +++ receiptwitness/tests/test_api/__init__.py | 1 + receiptwitness/tests/test_api/test_webhook.py | 101 +++++++ .../tests/test_notifications/__init__.py | 0 .../tests/test_notifications/test_email.py | 84 ++++++ .../tests/test_parsers/test_email/__init__.py | 0 .../test_parsers/test_email/test_detector.py | 49 ++++ .../test_email/test_kroger_email_parser.py | 93 +++++++ .../test_email/test_meijer_parser.py | 182 ++++++++++++ .../test_email/test_target_email_parser.py | 93 +++++++ receiptwitness/tests/test_queue/__init__.py | 0 .../tests/test_queue/test_email_queue.py | 79 ++++++ receiptwitness/tests/test_worker/__init__.py | 0 .../tests/test_worker/test_email_worker.py | 188 +++++++++++++ 32 files changed, 2056 insertions(+), 9 deletions(-) create mode 100644 receiptwitness/src/receiptwitness/notifications/__init__.py create mode 100644 receiptwitness/src/receiptwitness/notifications/email.py create mode 100644 receiptwitness/src/receiptwitness/parsers/email/__init__.py create mode 100644 receiptwitness/src/receiptwitness/parsers/email/base.py create mode 100644 receiptwitness/src/receiptwitness/parsers/email/detector.py create mode 100644 receiptwitness/src/receiptwitness/parsers/email/kroger.py create mode 100644 receiptwitness/src/receiptwitness/parsers/email/meijer.py create mode 100644 receiptwitness/src/receiptwitness/parsers/email/target.py create mode 100644 receiptwitness/src/receiptwitness/queue/__init__.py create mode 100644 receiptwitness/src/receiptwitness/queue/email.py create mode 100644 receiptwitness/src/receiptwitness/worker/__init__.py create mode 100644 receiptwitness/src/receiptwitness/worker/email_worker.py create mode 100644 receiptwitness/tests/fixtures/kroger_email_receipt.html create mode 100644 receiptwitness/tests/fixtures/meijer_email_receipt.html create mode 100644 receiptwitness/tests/fixtures/target_email_receipt.html create mode 100644 receiptwitness/tests/test_api/__init__.py create mode 100644 receiptwitness/tests/test_api/test_webhook.py create mode 100644 receiptwitness/tests/test_notifications/__init__.py create mode 100644 receiptwitness/tests/test_notifications/test_email.py create mode 100644 receiptwitness/tests/test_parsers/test_email/__init__.py create mode 100644 receiptwitness/tests/test_parsers/test_email/test_detector.py create mode 100644 receiptwitness/tests/test_parsers/test_email/test_kroger_email_parser.py create mode 100644 receiptwitness/tests/test_parsers/test_email/test_meijer_parser.py create mode 100644 receiptwitness/tests/test_parsers/test_email/test_target_email_parser.py create mode 100644 receiptwitness/tests/test_queue/__init__.py create mode 100644 receiptwitness/tests/test_queue/test_email_queue.py create mode 100644 receiptwitness/tests/test_worker/__init__.py create mode 100644 receiptwitness/tests/test_worker/test_email_worker.py diff --git a/receiptwitness/pyproject.toml b/receiptwitness/pyproject.toml index f32acfc..dd3d6ea 100644 --- a/receiptwitness/pyproject.toml +++ b/receiptwitness/pyproject.toml @@ -14,11 +14,13 @@ dependencies = [ "cryptography>=42.0,<44.0", "fastapi>=0.115,<1.0", "uvicorn[standard]>=0.30,<1.0", + "beautifulsoup4>=4.12,<5.0", "redis>=5.0,<6.0", "pydantic>=2.0,<3.0", "pydantic-settings>=2.0,<3.0", "sqlalchemy[asyncio]>=2.0,<3.0", "asyncpg>=0.29,<1.0", + "resend>=2.0", ] [project.optional-dependencies] @@ -27,6 +29,9 @@ dev = [ "pytest-asyncio>=0.23", "ruff>=0.3", "pytest-cov>=5.0", + "fakeredis[aioredis]>=2.20", + "httpx>=0.27", + "python-multipart>=0.0.9", ] [tool.hatch.build.targets.wheel] diff --git a/receiptwitness/src/receiptwitness/api/routes.py b/receiptwitness/src/receiptwitness/api/routes.py index 23cc109..483cdcc 100644 --- a/receiptwitness/src/receiptwitness/api/routes.py +++ b/receiptwitness/src/receiptwitness/api/routes.py @@ -1,9 +1,61 @@ """Internal API routes for triggering scrapes and checking status.""" -from fastapi import APIRouter +import hashlib +import hmac +import re +import time + +from fastapi import APIRouter, HTTPException, Request + +from receiptwitness.config import settings +from receiptwitness.queue.email import EmailJob, enqueue_email, get_redis router = APIRouter() +TOKEN_PATTERN = re.compile(r"receipts\+([A-Za-z0-9_-]+)@") + + +def verify_mailgun_signature(token: str, timestamp: str, signature: str) -> bool: + """Verify Mailgun webhook signature.""" + if abs(time.time() - int(timestamp)) > 300: # 5 min freshness + return False + key = settings.mailgun_webhook_signing_key.encode() + hmac_digest = hmac.new(key, f"{timestamp}{token}".encode(), hashlib.sha256).hexdigest() + return hmac.compare_digest(signature, hmac_digest) + + +@router.post("/inbound/email") +async def receive_inbound_email(request: Request): + form = await request.form() + # 1. Verify Mailgun signature + token = str(form.get("token", "")) + timestamp = str(form.get("timestamp", "")) + signature = str(form.get("signature", "")) + if not verify_mailgun_signature(token, timestamp, signature): + raise HTTPException(status_code=406, detail="Invalid signature") + # 2. Extract account token from recipient + recipient = str(form.get("recipient", "")) + match = TOKEN_PATTERN.search(recipient) + if not match: + raise HTTPException(status_code=406, detail="Invalid recipient") + account_token = match.group(1) + # 3. Enqueue — worker resolves token -> user_id + body_html_val = form.get("body-html") + body_plain_val = form.get("body-plain") + job = EmailJob( + user_id=account_token, + sender=str(form.get("sender", "")), + recipient=recipient, + subject=str(form.get("subject", "")), + body_html=str(body_html_val) if body_html_val is not None else None, + body_plain=str(body_plain_val) if body_plain_val is not None else None, + received_at=str(form.get("timestamp", "")), + message_id=str(form.get("Message-Id", "")), + ) + client = await get_redis() + await enqueue_email(client, job) + return {"status": "queued"} + @router.get("/health") async def health(): diff --git a/receiptwitness/src/receiptwitness/config.py b/receiptwitness/src/receiptwitness/config.py index 1341f3f..358b965 100644 --- a/receiptwitness/src/receiptwitness/config.py +++ b/receiptwitness/src/receiptwitness/config.py @@ -22,5 +22,13 @@ class ReceiptWitnessSettings(BaseSettings): headless: bool = True browser_timeout_ms: int = 60000 + # Email notifications (Resend) + resend_api_key: str = "" + notification_email_from: str = "notifications@cartsnitch.com" + notifications_enabled: bool = False + + # Mailgun inbound email webhook + mailgun_webhook_signing_key: str = "" + settings = ReceiptWitnessSettings() diff --git a/receiptwitness/src/receiptwitness/events.py b/receiptwitness/src/receiptwitness/events.py index 3d75614..a9e6204 100644 --- a/receiptwitness/src/receiptwitness/events.py +++ b/receiptwitness/src/receiptwitness/events.py @@ -2,12 +2,17 @@ import json import logging +import uuid from datetime import UTC, datetime from decimal import Decimal import redis.asyncio as aioredis +from cartsnitch_common.database import get_async_session_factory +from cartsnitch_common.models.user import User +from sqlalchemy import select from receiptwitness.config import settings +from receiptwitness.notifications.email import send_receipt_notification logger = logging.getLogger(__name__) @@ -39,6 +44,36 @@ async def get_redis_client() -> aioredis.Redis: return aioredis.Redis(connection_pool=_get_pool()) +async def _send_notification_for_event(payload: dict) -> None: + """Look up user email and send receipt notification. Silently skips on error.""" + try: + user_uuid = uuid.UUID(payload["user_id"]) + except (ValueError, KeyError): + logger.warning("Invalid user_id in event payload: %s", payload.get("user_id")) + return + + try: + session_factory = get_async_session_factory(settings.database_url) + async with session_factory() as session: + result = await session.execute(select(User.email).where(User.id == user_uuid)) + row = result.scalar_one_or_none() + if not row: + logger.warning("User %s not found for notification", user_uuid) + return + user_email = row + except Exception: + logger.exception("Failed to look up user email for notification") + return + + await send_receipt_notification( + user_email=user_email, + store_name=payload["store_slug"], + item_count=payload["item_count"], + total=payload["total"], + purchase_date=payload["purchase_date"], + ) + + async def publish_receipt_ingested( user_id: str, store_slug: str, @@ -48,18 +83,19 @@ async def publish_receipt_ingested( total: Decimal | float, ) -> None: """Publish a cartsnitch.receipts.ingested event after successful ingestion.""" + payload = { + "user_id": user_id, + "store_slug": store_slug, + "purchase_id": purchase_id, + "purchase_date": purchase_date, + "item_count": item_count, + "total": float(total) if isinstance(total, Decimal) else total, + } event = { "event_type": CHANNEL_RECEIPTS_INGESTED, "timestamp": datetime.now(UTC).isoformat(), "service": "receiptwitness", - "payload": { - "user_id": user_id, - "store_slug": store_slug, - "purchase_id": purchase_id, - "purchase_date": purchase_date, - "item_count": item_count, - "total": float(total) if isinstance(total, Decimal) else total, - }, + "payload": payload, } try: @@ -73,3 +109,5 @@ async def publish_receipt_ingested( except aioredis.ConnectionError: logger.error("Failed to publish event — Redis/DragonflyDB connection error") raise + else: + await _send_notification_for_event(payload) diff --git a/receiptwitness/src/receiptwitness/notifications/__init__.py b/receiptwitness/src/receiptwitness/notifications/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/receiptwitness/src/receiptwitness/notifications/email.py b/receiptwitness/src/receiptwitness/notifications/email.py new file mode 100644 index 0000000..d5723f2 --- /dev/null +++ b/receiptwitness/src/receiptwitness/notifications/email.py @@ -0,0 +1,45 @@ +"""Email notifications via Resend.""" + +import asyncio +import html +import logging + +import resend + +from receiptwitness.config import settings + +logger = logging.getLogger(__name__) + + +async def send_receipt_notification( + user_email: str, + store_name: str, + item_count: int, + total: float, + purchase_date: str, +) -> None: + """Send receipt ingestion confirmation email via Resend.""" + if not settings.notifications_enabled or not settings.resend_api_key: + logger.debug("Notifications disabled — skipping email send") + return + + resend.api_key = settings.resend_api_key + store_name_safe = html.escape(store_name) + purchase_date_safe = html.escape(purchase_date) + try: + await asyncio.to_thread( + resend.Emails.send, + { + "from": settings.notification_email_from, + "to": [user_email], + "subject": f"Receipt processed: {store_name} - ${total:.2f}", + "html": ( + f"

Your receipt from {store_name_safe} on " + f"{purchase_date_safe} has been processed.

" + f"

{item_count} items, total: ${total:.2f}

" + ), + }, + ) + logger.info("Receipt notification sent to %s", user_email) + except Exception: + logger.exception("Failed to send receipt notification to %s", user_email) diff --git a/receiptwitness/src/receiptwitness/parsers/email/__init__.py b/receiptwitness/src/receiptwitness/parsers/email/__init__.py new file mode 100644 index 0000000..9d01da5 --- /dev/null +++ b/receiptwitness/src/receiptwitness/parsers/email/__init__.py @@ -0,0 +1 @@ +"""Email receipt parsers for retailer email receipts.""" diff --git a/receiptwitness/src/receiptwitness/parsers/email/base.py b/receiptwitness/src/receiptwitness/parsers/email/base.py new file mode 100644 index 0000000..a25535e --- /dev/null +++ b/receiptwitness/src/receiptwitness/parsers/email/base.py @@ -0,0 +1,32 @@ +"""Base interface for email receipt parsers.""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field + + +@dataclass +class EmailReceipt: + """Raw email data before parsing.""" + + sender: str + recipient: str + subject: str + body_html: str | None = None + body_plain: str | None = None + received_at: str | None = None + raw_headers: dict = field(default_factory=dict) + + +class BaseEmailParser(ABC): + """All retailer email parsers implement this interface.""" + + @abstractmethod + def can_parse(self, email: EmailReceipt) -> bool: + """Return True if this parser handles this email.""" + ... + + @abstractmethod + def parse(self, email: EmailReceipt) -> dict: + """Parse email into a dict matching PurchaseCreate schema fields. + Must include an items list matching PurchaseItemCreate fields.""" + ... diff --git a/receiptwitness/src/receiptwitness/parsers/email/detector.py b/receiptwitness/src/receiptwitness/parsers/email/detector.py new file mode 100644 index 0000000..e71f769 --- /dev/null +++ b/receiptwitness/src/receiptwitness/parsers/email/detector.py @@ -0,0 +1,25 @@ +"""Detect which retailer sent a receipt email.""" + +import re + +from receiptwitness.parsers.email.base import EmailReceipt + +RETAILER_PATTERNS: dict[str, list[str]] = { + "meijer": [r"@meijer\.com$", r"@email\.meijer\.com$"], + "kroger": [r"@kroger\.com$", r"@email\.kroger\.com$"], + "target": [r"@target\.com$", r"@email\.target\.com$"], +} + + +def detect_retailer(email: EmailReceipt) -> str | None: + """Return retailer slug or None if unrecognized.""" + sender = email.sender.lower().strip() + # Extract email from "Name " format + match = re.search(r"<([^>]+)>", sender) + if match: + sender = match.group(1) + for retailer, patterns in RETAILER_PATTERNS.items(): + for pattern in patterns: + if re.search(pattern, sender): + return retailer + return None diff --git a/receiptwitness/src/receiptwitness/parsers/email/kroger.py b/receiptwitness/src/receiptwitness/parsers/email/kroger.py new file mode 100644 index 0000000..364f59e --- /dev/null +++ b/receiptwitness/src/receiptwitness/parsers/email/kroger.py @@ -0,0 +1,157 @@ +"""Kroger email receipt parser.""" + +import logging +import re +from datetime import datetime +from decimal import Decimal, InvalidOperation + +from bs4 import BeautifulSoup + +from receiptwitness.parsers.email.base import BaseEmailParser, EmailReceipt + +logger = logging.getLogger(__name__) + + +def _to_decimal(value: str | float | int | None, default: Decimal = Decimal("0")) -> Decimal: + """Safely convert a value to Decimal.""" + if value is None: + return default + try: + return Decimal(str(value).replace("$", "").replace(",", "").strip()) + except (InvalidOperation, ValueError): + return default + + +def _extract_total(body: str) -> Decimal: + """Extract the transaction total from email body.""" + patterns = [ + r"Total[:\s]*\$?([0-9,]+\.[0-9]{2})", + r"Amount[:\s]*\$?([0-9,]+\.[0-9]{2})", + r"Grand\s+Total[:\s]*\$?([0-9,]+\.[0-9]{2})", + ] + for pattern in patterns: + match = re.search(pattern, body, re.IGNORECASE) + if match: + return _to_decimal(match.group(1)) + return Decimal("0") + + +def _extract_receipt_id(body: str) -> str | None: + """Extract receipt ID / transaction ID from HTML body. + + Strips HTML tags first so that whitespace between delimiters and values + (e.g. from `` KR-2026-0315-4829`` -> `` KR-2026-0315-4829``) + is normalized and the pattern can match cleanly. + """ + stripped = re.sub(r"<[^>]+>", "", body) + patterns = [ + r"Receipt\s*#[:\s]*([A-Z0-9-]+)", + r"Transaction\s*#[:\s]*([A-Z0-9-]+)", + r"Order\s*#[:\s]*([A-Z0-9-]+)", + r"Confirmation\s*#[:\s]*([A-Z0-9-]+)", + ] + for pattern in patterns: + match = re.search(pattern, stripped, re.IGNORECASE) + if match: + return match.group(1) + return None + + +def _extract_date(body: str) -> str: + """Extract purchase date from email body. Returns ISO date string or empty string.""" + patterns = [ + r"(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})", + r"([A-Z][a-z]{2}\s+\d{1,2},?\s+\d{4})", + ] + for pattern in patterns: + match = re.search(pattern, body) + if match: + raw = match.group(1) + try: + dt = datetime.strptime(raw.replace(",", ""), "%b %d %Y") + return dt.strftime("%Y-%m-%d") + except ValueError: + pass + try: + for fmt in ("%m/%d/%Y", "%m/%d/%y", "%d/%m/%Y", "%d/%m/%y"): + try: + dt = datetime.strptime(raw, fmt) + return dt.strftime("%Y-%m-%d") + except ValueError: + continue + except Exception: + pass + return "" + + +def _extract_items_soup(body: str) -> list[dict]: + """Extract line items from HTML email body using BeautifulSoup.""" + items = [] + try: + soup = BeautifulSoup(body, "html.parser") + text = soup.get_text(separator="\n", strip=True) + # Strip HTML tags from raw body to normalize whitespace + stripped = re.sub(r"<[^>]+>", " ", body) + stripped = re.sub(r"\s+", " ", stripped) + skip_prefixes = ( + "Subtotal", + "Tax", + "Total", + "Kroger", + "Target", + "Date", + "Receipt", + "Order", + "Transaction", + "Confirmation", + "Thank", + "Questions", + "Keep", + "Receipt", + ) + for line in text.split("\n"): + line = line.strip() + if not line or line.startswith(skip_prefixes): + continue + # Match lines like "Product Name $9.99" + match = re.match(r"(.+?)\s+\$([0-9]+\.[0-9]{2})\s*$", line) + if match: + name = match.group(1).strip() + price = _to_decimal(match.group(2)) + if len(name) > 2 and price > 0: + items.append( + { + "product_name_raw": name, + "quantity": Decimal("1"), + "unit_price": price, + "extended_price": price, + } + ) + except Exception: + pass + return items[:20] + + +class KrogerEmailParser(BaseEmailParser): + """Parse Kroger email receipts (digital receipts via kroger.com).""" + + KROGER_KEYWORDS = ("kroger", "kroger.com", "plus") + + def can_parse(self, email: EmailReceipt) -> bool: + sender = (email.sender or "").lower() + body = (email.body_html or email.body_plain or "").lower() + return any(kw in sender or kw in body for kw in self.KROGER_KEYWORDS) + + def parse(self, email: EmailReceipt) -> dict: + body = (email.body_html or email.body_plain or "").strip() + total = _extract_total(body) + receipt_id = _extract_receipt_id(body) or "" + purchase_date = _extract_date(body) + items = _extract_items_soup(body) + + return { + "receipt_id": receipt_id, + "purchase_date": purchase_date, + "total": total, + "items": items, + } diff --git a/receiptwitness/src/receiptwitness/parsers/email/meijer.py b/receiptwitness/src/receiptwitness/parsers/email/meijer.py new file mode 100644 index 0000000..598acb7 --- /dev/null +++ b/receiptwitness/src/receiptwitness/parsers/email/meijer.py @@ -0,0 +1,259 @@ +"""Parse Meijer digital receipt emails into structured purchase data.""" + +import re +from decimal import Decimal, InvalidOperation + +from bs4 import BeautifulSoup +from bs4.element import Tag + +from receiptwitness.parsers.email.base import BaseEmailParser, EmailReceipt + + +def _to_decimal(value, default: str = "0") -> Decimal: + """Safely convert a value to Decimal.""" + if value is None: + return Decimal(default) + try: + return Decimal(str(value).replace("$", "").replace(",", "").strip()) + except (InvalidOperation, ValueError, TypeError): + return Decimal(default) + + +def _extract_receipt_id(soup: BeautifulSoup, subject: str | None) -> str | None: + """Extract receipt/transaction ID from subject or body.""" + if subject: + match = re.search(r"TXN[-\s]\d{4}[-\s]\d{4}[-\s]\d+", subject) + if match: + return match.group(0).replace(" ", "-") + # Fallback: look in body + text = soup.get_text() + match = re.search(r"TXN[-\s]\d{4}[-\s]\d{4}[-\s]\d+", text) + if match: + return match.group(0).replace(" ", "-") + return None + + +def _extract_purchase_date(soup: BeautifulSoup, subject: str | None) -> str | None: + """Extract purchase date from subject or body.""" + text = soup.get_text() + + # Try ISO format first: YYYY-MM-DD + match = re.search(r"(\d{4})-(\d{2})-(\d{2})", text) + if match: + return f"{match.group(1)}-{match.group(2)}-{match.group(3)}" + + # Try written format: March 15, 2026 + match = re.search(r"([A-Za-z]+)\s+(\d{1,2}),?\s+(\d{4})", text) + if match: + month_str = match.group(1).lower() + day = match.group(2) + year = match.group(3) + month_map = { + "january": "01", + "february": "02", + "march": "03", + "april": "04", + "may": "05", + "june": "06", + "july": "07", + "august": "08", + "september": "09", + "october": "10", + "november": "11", + "december": "12", + } + month = month_map.get(month_str) + if month: + return f"{year}-{month}-{day.zfill(2)}" + + # MM/DD/YYYY + match = re.search(r"(\d{1,2})/(\d{1,2})/(\d{4})", text) + if match: + return f"{match.group(3)}-{match.group(1).zfill(2)}-{match.group(2).zfill(2)}" + + return None + + +def _extract_store_info(soup: BeautifulSoup) -> dict: + """Extract store name and number from the email body.""" + store_info: dict = {} + + # Look for store number in header + store_num_match = re.search(r"Meijer\s+Store\s+#?(\d+)", soup.get_text(), re.IGNORECASE) + if store_num_match: + store_info["store_number"] = store_num_match.group(1) + + return store_info + + +def _extract_items(table: Tag | None) -> list[dict]: + """Extract line items from the items table.""" + items: list[dict] = [] + if not table: + return items + + rows = table.find_all("tr") + for row in rows: + cells = row.find_all("td") + if len(cells) < 3: + continue + + name_cell = cells[0].get_text(strip=True) + qty_cell = cells[1].get_text(strip=True) + price_cell = cells[2].get_text(strip=True) + + if not name_cell or name_cell.lower() in ("item", "description"): + continue + + # Skip subtotal/tax/total/savings rows + if any( + label in name_cell.lower() + for label in ("subtotal", "tax", "total", "savings", "grand total") + ): + continue + + try: + quantity = Decimal(qty_cell) + except (InvalidOperation, ValueError, TypeError): + quantity = Decimal("1") + + price_str = price_cell.replace("$", "").replace(",", "").strip() + try: + unit_price = Decimal(price_str) + except (InvalidOperation, ValueError, TypeError): + unit_price = Decimal("0") + + extended_price = unit_price # Default to unit price; no qty column in fixture + + items.append( + { + "product_name_raw": name_cell, + "quantity": quantity, + "unit_price": unit_price, + "extended_price": extended_price, + } + ) + + return items + + +def _extract_totals_plain(text: str) -> dict: + """Extract totals from plain text (no HTML).""" + totals: dict = { + "subtotal": None, + "tax": None, + "total": None, + "savings_total": None, + } + + match = re.search(r"\bSubtotal\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE) + if match: + totals["subtotal"] = _to_decimal(match.group(1)) + + match = re.search(r"\bTax\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE) + if match: + totals["tax"] = _to_decimal(match.group(1)) + + grand_total_match = re.search(r"Grand\s+Total\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE) + if grand_total_match: + totals["total"] = _to_decimal(grand_total_match.group(1)) + + savings_match = re.search(r"\bSavings\b[:\s$\-]*([0-9,]+\.?\d*)", text, re.IGNORECASE) + if savings_match: + totals["savings_total"] = _to_decimal(savings_match.group(1)) + + if totals["total"] is None: + total_match = re.search(r"\bTotal\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE) + if total_match: + totals["total"] = _to_decimal(total_match.group(1)) + + return totals + + +def _extract_totals(soup: BeautifulSoup) -> dict: + """Extract subtotal, tax, total, and savings from the totals section.""" + text = soup.get_text() + + totals: dict = { + "subtotal": None, + "tax": None, + "total": None, + "savings_total": None, + } + + # Subtotal — use word boundary to avoid matching "Subtotal" with "Total" + match = re.search(r"\bSubtotal\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE) + if match: + totals["subtotal"] = _to_decimal(match.group(1)) + + # Tax + match = re.search(r"\bTax\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE) + if match: + totals["tax"] = _to_decimal(match.group(1)) + + # Grand Total (before plain "Total" to avoid matching "Subtotal") + grand_total_match = re.search(r"Grand\s+Total\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE) + if grand_total_match: + totals["total"] = _to_decimal(grand_total_match.group(1)) + + # Savings — allow any combination of whitespace/$- around the number + savings_match = re.search(r"\bSavings\b[:\s$\-]*([0-9,]+\.?\d*)", text, re.IGNORECASE) + if savings_match: + totals["savings_total"] = _to_decimal(savings_match.group(1)) + + # Plain "Total" only if Grand Total wasn't found + if totals["total"] is None: + total_match = re.search(r"\bTotal\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE) + if total_match: + totals["total"] = _to_decimal(total_match.group(1)) + + return totals + + +class MeijerEmailParser(BaseEmailParser): + """Parse Meijer digital receipt emails forwarded by users.""" + + def can_parse(self, email: EmailReceipt) -> bool: + sender = email.sender.lower().strip() + # Extract email from "Name " format + match = re.search(r"<([^>]+)>", sender) + if match: + sender = match.group(1) + return "meijer" in sender + + def parse(self, email: EmailReceipt) -> dict: + body_html = email.body_html + body_plain = email.body_plain or "" + body = body_html or body_plain + soup = BeautifulSoup(body, "html.parser") + + receipt_id = _extract_receipt_id(soup, email.subject) + purchase_date = _extract_purchase_date(soup, email.subject) + _ = _extract_store_info(soup) + + # Find the items table — look for one with Item/Qty/Price headers + table = None + for tbl in soup.find_all("table"): + headers = tbl.find_all("th") + header_texts = [h.get_text(strip=True).lower() for h in headers] + if any("item" in h or "qty" in h or "price" in h for h in header_texts): + table = tbl + break + + items = _extract_items(table) + + # Extract totals from HTML; fall back to plain text if no HTML + if body_html: + totals = _extract_totals(soup) + else: + totals = _extract_totals_plain(body_plain) + + return { + "receipt_id": receipt_id or "", + "purchase_date": purchase_date or "", + "total": totals["total"] or Decimal("0"), + "subtotal": totals["subtotal"], + "tax": totals["tax"], + "savings_total": totals["savings_total"], + "items": items, + } diff --git a/receiptwitness/src/receiptwitness/parsers/email/target.py b/receiptwitness/src/receiptwitness/parsers/email/target.py new file mode 100644 index 0000000..c7e58d3 --- /dev/null +++ b/receiptwitness/src/receiptwitness/parsers/email/target.py @@ -0,0 +1,156 @@ +"""Target email receipt parser.""" + +import logging +import re +from datetime import datetime +from decimal import Decimal, InvalidOperation + +from bs4 import BeautifulSoup + +from receiptwitness.parsers.email.base import BaseEmailParser, EmailReceipt + +logger = logging.getLogger(__name__) + + +def _to_decimal(value: str | float | int | None, default: Decimal = Decimal("0")) -> Decimal: + """Safely convert a value to Decimal.""" + if value is None: + return default + try: + return Decimal(str(value).replace("$", "").replace(",", "").strip()) + except (InvalidOperation, ValueError): + return default + + +def _extract_total(body: str) -> Decimal: + """Extract the transaction total from email body.""" + patterns = [ + r"Total[:\s]*\$?([0-9,]+\.[0-9]{2})", + r"Amount[:\s]*\$?([0-9,]+\.[0-9]{2})", + r"Grand\s+Total[:\s]*\$?([0-9,]+\.[0-9]{2})", + ] + for pattern in patterns: + match = re.search(pattern, body, re.IGNORECASE) + if match: + return _to_decimal(match.group(1)) + return Decimal("0") + + +def _extract_receipt_id(body: str) -> str | None: + """Extract receipt ID / transaction ID from HTML body. + + Strips HTML tags first so that whitespace between delimiters and values + (e.g. from `` TGT-2026-0318-9124`` -> `` TGT-2026-0318-9124``) + is normalized and the pattern can match cleanly. + """ + stripped = re.sub(r"<[^>]+>", "", body) + patterns = [ + r"Receipt\s*#[:\s]*([A-Z0-9-]+)", + r"Order\s*#[:\s]*([A-Z0-9-]+)", + r"Confirmation\s*#[:\s]*([A-Z0-9-]+)", + r"Target\s+Order\s*#[:\s]*([A-Z0-9-]+)", + ] + for pattern in patterns: + match = re.search(pattern, stripped, re.IGNORECASE) + if match: + return match.group(1) + return None + + +def _extract_date(body: str) -> str: + """Extract purchase date from email body. Returns ISO date string or empty string.""" + patterns = [ + r"(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})", + r"([A-Z][a-z]{2}\s+\d{1,2},?\s+\d{4})", + ] + for pattern in patterns: + match = re.search(pattern, body) + if match: + raw = match.group(1) + try: + dt = datetime.strptime(raw.replace(",", ""), "%b %d %Y") + return dt.strftime("%Y-%m-%d") + except ValueError: + pass + try: + for fmt in ("%m/%d/%Y", "%m/%d/%y", "%d/%m/%Y", "%d/%m/%y"): + try: + dt = datetime.strptime(raw, fmt) + return dt.strftime("%Y-%m-%d") + except ValueError: + continue + except Exception: + pass + return "" + + +def _extract_items_soup(body: str) -> list[dict]: + """Extract line items from HTML email body using BeautifulSoup.""" + items = [] + try: + soup = BeautifulSoup(body, "html.parser") + text = soup.get_text(separator="\n", strip=True) + for line in text.split("\n"): + line = line.strip() + if not line or line.startswith( + ( + "Subtotal", + "Tax", + "Total", + "Target", + "Kroger", + "Date", + "Receipt", + "Order", + "Transaction", + "Confirmation", + "Thank", + "Questions", + "Keep", + "Receipt", + "Store", + ) + ): + continue + # Match lines like "Product Name $9.99" + match = re.match(r"(.+?)\s+\$([0-9]+\.[0-9]{2})\s*$", line) + if match: + name = match.group(1).strip() + price = _to_decimal(match.group(2)) + if len(name) > 2 and price > 0: + items.append( + { + "product_name_raw": name, + "quantity": Decimal("1"), + "unit_price": price, + "extended_price": price, + } + ) + except Exception: + pass + return items[:20] + + +class TargetEmailParser(BaseEmailParser): + """Parse Target email receipts (Circle order confirmations).""" + + TARGET_KEYWORDS = ("target.com", "targetnow", "circle", "target") + + def can_parse(self, email: EmailReceipt) -> bool: + sender = (email.sender or "").lower() + body = (email.body_html or email.body_plain or "").lower() + return any(kw in sender or kw in body for kw in self.TARGET_KEYWORDS) + + def parse(self, email: EmailReceipt) -> dict: + body = (email.body_html or email.body_plain or "").strip() + total = _extract_total(body) + receipt_id = _extract_receipt_id(body) or "" + purchase_date = _extract_date(body) + items = _extract_items_soup(body) + + return { + "receipt_id": receipt_id, + "purchase_date": purchase_date, + "total": total, + "items": items, + } diff --git a/receiptwitness/src/receiptwitness/queue/__init__.py b/receiptwitness/src/receiptwitness/queue/__init__.py new file mode 100644 index 0000000..3f9a31f --- /dev/null +++ b/receiptwitness/src/receiptwitness/queue/__init__.py @@ -0,0 +1 @@ +"""DragonflyDB Streams queue for email receipt processing.""" diff --git a/receiptwitness/src/receiptwitness/queue/email.py b/receiptwitness/src/receiptwitness/queue/email.py new file mode 100644 index 0000000..c76148e --- /dev/null +++ b/receiptwitness/src/receiptwitness/queue/email.py @@ -0,0 +1,77 @@ +"""DragonflyDB Streams queue for email receipt processing.""" + +from __future__ import annotations + +import json +import logging +from dataclasses import asdict, dataclass +from typing import cast + +import redis.asyncio as aioredis + +from receiptwitness.config import settings + +logger = logging.getLogger(__name__) + +STREAM_KEY = "email:receipts" +CONSUMER_GROUP = "email-workers" + + +@dataclass +class EmailJob: + """Payload for an email receipt processing job.""" + + user_id: str + sender: str + recipient: str + subject: str + body_html: str | None + body_plain: str | None + received_at: str + message_id: str # from email provider, for dedup + + +async def get_redis() -> aioredis.Redis: + """Get async Redis/DragonflyDB client.""" + return cast(aioredis.Redis, aioredis.from_url(settings.redis_url, decode_responses=True)) + + +async def ensure_consumer_group(client: aioredis.Redis) -> None: + """Create consumer group if it does not exist.""" + try: + await client.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id="0", mkstream=True) + except aioredis.ResponseError as e: + if "BUSYGROUP" not in str(e): + raise + + +async def enqueue_email(client: aioredis.Redis, job: EmailJob) -> str: + """Add email job to the stream. Returns the stream message ID.""" + payload: dict[str, str | bytes | int | float] = {"data": json.dumps(asdict(job))} + msg_id: str = cast(str, await client.xadd(STREAM_KEY, payload)) # type: ignore[arg-type] # redis-py StreamCommands.xadd expects broader FieldT union; runtime behavior is correct + logger.info("Enqueued email job %s for user %s", msg_id, job.user_id) + return msg_id + + +async def consume_emails( + client: aioredis.Redis, + consumer_name: str, + count: int = 1, + block_ms: int = 5000, +) -> list[tuple[str, EmailJob]]: + """Read pending messages from the stream. Returns list of (msg_id, EmailJob).""" + await ensure_consumer_group(client) + messages = await client.xreadgroup( + CONSUMER_GROUP, consumer_name, {STREAM_KEY: ">"}, count=count, block=block_ms + ) + results = [] + for _stream, entries in messages: + for msg_id, fields in entries: + job = EmailJob(**json.loads(fields["data"])) + results.append((msg_id, job)) + return results + + +async def ack_email(client: aioredis.Redis, msg_id: str) -> None: + """Acknowledge a processed message.""" + await client.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) diff --git a/receiptwitness/src/receiptwitness/worker/__init__.py b/receiptwitness/src/receiptwitness/worker/__init__.py new file mode 100644 index 0000000..e32899a --- /dev/null +++ b/receiptwitness/src/receiptwitness/worker/__init__.py @@ -0,0 +1 @@ +"""Async email receipt worker consuming from DragonflyDB Streams.""" diff --git a/receiptwitness/src/receiptwitness/worker/email_worker.py b/receiptwitness/src/receiptwitness/worker/email_worker.py new file mode 100644 index 0000000..52a5dc0 --- /dev/null +++ b/receiptwitness/src/receiptwitness/worker/email_worker.py @@ -0,0 +1,104 @@ +"""Async worker that consumes email receipt jobs from DragonflyDB Streams.""" + +import asyncio +import logging + +from cartsnitch_common.database import get_async_session_factory +from cartsnitch_common.models.user import User +from sqlalchemy import select + +from receiptwitness.config import settings +from receiptwitness.events import publish_receipt_ingested +from receiptwitness.parsers.email.base import BaseEmailParser, EmailReceipt +from receiptwitness.parsers.email.detector import detect_retailer +from receiptwitness.parsers.email.kroger import KrogerEmailParser +from receiptwitness.parsers.email.meijer import MeijerEmailParser +from receiptwitness.parsers.email.target import TargetEmailParser +from receiptwitness.queue.email import ack_email, consume_emails, get_redis + +logger = logging.getLogger(__name__) + +CONSUMER_NAME = "worker-1" + +# Registry of available email parsers +PARSERS: dict[str, BaseEmailParser] = { + "meijer": MeijerEmailParser(), + "kroger": KrogerEmailParser(), + "target": TargetEmailParser(), +} + + +async def resolve_user(token: str) -> str | None: + """Look up user_id from email_inbound_token.""" + session_factory = get_async_session_factory(settings.database_url) + async with session_factory() as session: + result = await session.execute(select(User.id).where(User.email_inbound_token == token)) + row = result.scalar_one_or_none() + return str(row) if row else None + + +async def process_job(msg_id: str, job) -> bool: + """Process a single email job. Returns True on success.""" + # 1. Resolve user from token + user_id = await resolve_user(job.user_id) # user_id field holds token + if not user_id: + logger.warning("Unknown token %s, dropping message %s", job.user_id, msg_id) + return True # ack to avoid infinite retry + + # 2. Build EmailReceipt + email = EmailReceipt( + sender=job.sender, + recipient=job.recipient, + subject=job.subject, + body_html=job.body_html, + body_plain=job.body_plain, + received_at=job.received_at, + ) + + # 3. Detect retailer + retailer = detect_retailer(email) + if not retailer or retailer not in PARSERS: + logger.warning( + "Unrecognized retailer from %s, archiving msg %s", + job.sender, + msg_id, + ) + return True # ack — no parser available + + # 4. Parse + parser = PARSERS[retailer] + parsed = parser.parse(email) + + # 5. Publish event + await publish_receipt_ingested( + user_id=user_id, + store_slug=retailer, + purchase_id=parsed.get("receipt_id", msg_id), + purchase_date=parsed.get("purchase_date", ""), + item_count=len(parsed.get("items", [])), + total=parsed.get("total", 0), + ) + return True + + +async def run_worker() -> None: + """Main worker loop — consume and process email jobs.""" + client = await get_redis() + logger.info("Email worker started, consuming from email:receipts") + while True: + try: + jobs = await consume_emails(client, CONSUMER_NAME, count=5, block_ms=5000) + for msg_id, job in jobs: + try: + success = await process_job(msg_id, job) + if success: + await ack_email(client, msg_id) + except Exception: + logger.exception("Failed to process email job %s", msg_id) + except Exception: + logger.exception("Worker loop error, retrying in 5s") + await asyncio.sleep(5) + + +if __name__ == "__main__": + asyncio.run(run_worker()) diff --git a/receiptwitness/tests/fixtures/kroger_email_receipt.html b/receiptwitness/tests/fixtures/kroger_email_receipt.html new file mode 100644 index 0000000..9cb33f8 --- /dev/null +++ b/receiptwitness/tests/fixtures/kroger_email_receipt.html @@ -0,0 +1,45 @@ + + + + + Kroger Digital Receipt + + +
+ Kroger +

Your Digital Receipt

+

Kroger Plus Member

+
+ +
+

Kroger #882 - Downtown

+

123 Main Street
Anytown, OH 45202

+

Date: 03/15/2026

+

Receipt #: KR-2026-0315-4829

+

Transaction #: TXN-789123456

+
+ +
+

Items Purchased

+

Whole Milk 1 Gallon $3.99

+

Sourdough Bread $4.49

+

Free Range Eggs 12ct $5.99

+

Baby Spinach 5oz $4.29

+
+ +
+

Subtotal: $18.76

+

Tax: $1.24

+

Total: $20.00

+
+ +
+

Kroger Plus Savings: $3.25 saved on this order.

+
+ +
+

Thank you for shopping at Kroger!

+

Keep your receipt for returns within 90 days.

+
+ + \ No newline at end of file diff --git a/receiptwitness/tests/fixtures/meijer_email_receipt.html b/receiptwitness/tests/fixtures/meijer_email_receipt.html new file mode 100644 index 0000000..f61deb3 --- /dev/null +++ b/receiptwitness/tests/fixtures/meijer_email_receipt.html @@ -0,0 +1,127 @@ + + + + + + Meijer Digital Receipt + + + +
+
+

MEIJER

+

Digital Receipt

+
+ +
+

Meijer Store #42

+

1555 Lake Drive SE, Grand Rapids, MI 49506

+
+ +
+
+ Date: March 15, 2026
+ Time: 2:34 PM +
+
+ Transaction #
+ TXN-2026-0315-0042 +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ItemQtyPrice
ORGANIC BANANAS1$0.69
WHOLE MILK 1 GAL1$4.29
MEIJER WHOLE GRAIN OAT CEREAL 18OZ1$4.99
FRESH BROCCOLI CROWN1$2.49
GROUND BEEF 85/15 1LB1$6.99
SOURDOUGH BREAD1$3.99
MEIJER BABY SPINACH 5OZ1$4.49
LARGE EGGS DOZEN1$3.29
+ +
+
+ Subtotal + $31.22 +
+
+ Tax + $2.19 +
+
+ Total Savings + -$3.40 +
+
+ Total + $33.41 +
+
+ + +
+ + diff --git a/receiptwitness/tests/fixtures/target_email_receipt.html b/receiptwitness/tests/fixtures/target_email_receipt.html new file mode 100644 index 0000000..70f0720 --- /dev/null +++ b/receiptwitness/tests/fixtures/target_email_receipt.html @@ -0,0 +1,44 @@ + + + + + Target Order Confirmation + + +
+ Target +

Order Confirmation

+

Thanks for shopping Target Circle!

+
+ +
+

Target Store #1247 - Riverside

+

4500 River Road
Columbus, OH 43220

+

Date: 03/18/2026

+

Order #: TGT-2026-0318-9124

+

Confirmation #: CNF-44772819

+
+ +
+

Items Purchased

+

Good & Gather Whole Milk 1 Gal $3.89

+

Arborio Rice 2lb bag $6.49

+

Parmesan Wedge 8oz $7.99

+
+ +
+

Subtotal: $18.37

+

Tax: $1.45

+

Total: $19.82

+
+ +
+

Target Circle offer saved you $0.30 on this order.

+
+ +
+

Questions? Call Target Guest Services at 1-800-591-3869.

+

Receipt valid for returns within 30 days.

+
+ + \ No newline at end of file diff --git a/receiptwitness/tests/test_api/__init__.py b/receiptwitness/tests/test_api/__init__.py new file mode 100644 index 0000000..598c2e0 --- /dev/null +++ b/receiptwitness/tests/test_api/__init__.py @@ -0,0 +1 @@ +"""Tests for the ReceiptWitness API routes.""" diff --git a/receiptwitness/tests/test_api/test_webhook.py b/receiptwitness/tests/test_api/test_webhook.py new file mode 100644 index 0000000..2b208de --- /dev/null +++ b/receiptwitness/tests/test_api/test_webhook.py @@ -0,0 +1,101 @@ +"""Tests for the /inbound/email webhook endpoint.""" + +import hashlib +import hmac +import time +from unittest.mock import AsyncMock, patch + +import pytest +from fastapi.testclient import TestClient + +from receiptwitness.main import app + + +@pytest.fixture +def client(): + return TestClient(app) + + +@pytest.fixture +def mock_redis(): + redis_mock = AsyncMock() + with patch("receiptwitness.api.routes.get_redis", return_value=redis_mock): + enqueue_patcher = patch("receiptwitness.api.routes.enqueue_email", new_callable=AsyncMock) + with enqueue_patcher as mock_enqueue: + yield {"redis": redis_mock, "enqueue": mock_enqueue} + + +def make_signature(signing_key: str, token: str, timestamp: str) -> str: + return hmac.new( + signing_key.encode(), + f"{timestamp}{token}".encode(), + hashlib.sha256, + ).hexdigest() + + +def valid_form(signing_key: str = "test-secret"): + ts = str(int(time.time())) + token = "test-token" + sig = make_signature(signing_key, token, ts) + return { + "token": token, + "timestamp": ts, + "signature": sig, + "sender": "sender@example.com", + "recipient": "receipts+user123@example.com", + "subject": "Your Meijer Receipt", + "body-html": "

Thank you for shopping at Meijer

", + "body-plain": "Thank you for shopping at Meijer", + "Message-Id": "", + } + + +def test_valid_webhook(client, mock_redis): + with patch("receiptwitness.api.routes.settings") as mock_settings: + mock_settings.mailgun_webhook_signing_key = "test-secret" + response = client.post("/inbound/email", data=valid_form()) + assert response.status_code == 200 + assert response.json() == {"status": "queued"} + mock_redis["enqueue"].assert_awaited_once() + + +def test_invalid_signature(client, mock_redis): + with patch("receiptwitness.api.routes.settings") as mock_settings: + mock_settings.mailgun_webhook_signing_key = "test-secret" + form = valid_form() + form["signature"] = "wrong-signature" + response = client.post("/inbound/email", data=form) + assert response.status_code == 406 + assert response.json()["detail"] == "Invalid signature" + mock_redis["enqueue"].assert_not_awaited() + + +def test_invalid_recipient_no_plus(client, mock_redis): + with patch("receiptwitness.api.routes.settings") as mock_settings: + mock_settings.mailgun_webhook_signing_key = "test-secret" + form = valid_form() + form["recipient"] = "receipts@example.com" # no plus-address + response = client.post("/inbound/email", data=form) + assert response.status_code == 406 + assert response.json()["detail"] == "Invalid recipient" + mock_redis["enqueue"].assert_not_awaited() + + +def test_stale_timestamp(client, mock_redis): + with patch("receiptwitness.api.routes.settings") as mock_settings: + mock_settings.mailgun_webhook_signing_key = "test-secret" + ts = str(int(time.time()) - 600) # 10 min old + token = "test-token" + sig = make_signature("test-secret", token, ts) + form = { + "token": token, + "timestamp": ts, + "signature": sig, + "sender": "sender@example.com", + "recipient": "receipts+user123@example.com", + "subject": "Receipt", + } + response = client.post("/inbound/email", data=form) + assert response.status_code == 406 + assert response.json()["detail"] == "Invalid signature" + mock_redis["enqueue"].assert_not_awaited() diff --git a/receiptwitness/tests/test_notifications/__init__.py b/receiptwitness/tests/test_notifications/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/receiptwitness/tests/test_notifications/test_email.py b/receiptwitness/tests/test_notifications/test_email.py new file mode 100644 index 0000000..e8970e9 --- /dev/null +++ b/receiptwitness/tests/test_notifications/test_email.py @@ -0,0 +1,84 @@ +"""Tests for email notifications.""" + +from unittest.mock import patch + +import pytest + + +class TestSendReceiptNotification: + @pytest.fixture + def mock_resend(self): + with patch("receiptwitness.notifications.email.resend") as mock: + yield mock + + @pytest.mark.asyncio + async def test_sends_email_with_correct_params(self, mock_resend): + from receiptwitness.notifications.email import send_receipt_notification + + with ( + patch("receiptwitness.notifications.email.settings") as mock_settings, + patch( + "receiptwitness.notifications.email.asyncio.to_thread", + new=lambda fn, *args, **kwargs: fn(*args, **kwargs), + ), + ): + mock_settings.notifications_enabled = True + mock_settings.resend_api_key = "re_testkey_123" + mock_settings.notification_email_from = "noreply@test.com" + + await send_receipt_notification( + user_email="user@example.com", + store_name="Meijer", + item_count=5, + total=42.99, + purchase_date="2026-03-28", + ) + + mock_resend.Emails.send.assert_called_once_with( + { + "from": "noreply@test.com", + "to": ["user@example.com"], + "subject": "Receipt processed: Meijer - $42.99", + "html": ( + "

Your receipt from Meijer on " + "2026-03-28 has been processed.

" + "

5 items, total: $42.99

" + ), + } + ) + + @pytest.mark.asyncio + async def test_skips_when_disabled(self, mock_resend): + from receiptwitness.notifications.email import send_receipt_notification + + with patch("receiptwitness.notifications.email.settings") as mock_settings: + mock_settings.notifications_enabled = False + mock_settings.resend_api_key = "re_testkey_123" + + await send_receipt_notification( + user_email="user@example.com", + store_name="Meijer", + item_count=5, + total=42.99, + purchase_date="2026-03-28", + ) + + mock_resend.Emails.send.assert_not_called() + + @pytest.mark.asyncio + async def test_skips_when_api_key_empty(self, mock_resend): + from receiptwitness.notifications.email import send_receipt_notification + + with patch("receiptwitness.notifications.email.settings") as mock_settings: + mock_settings.notifications_enabled = True + mock_settings.resend_api_key = "" + + await send_receipt_notification( + user_email="user@example.com", + store_name="Meijer", + item_count=5, + total=42.99, + purchase_date="2026-03-28", + ) + + mock_resend.Emails.send.assert_not_called() diff --git a/receiptwitness/tests/test_parsers/test_email/__init__.py b/receiptwitness/tests/test_parsers/test_email/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/receiptwitness/tests/test_parsers/test_email/test_detector.py b/receiptwitness/tests/test_parsers/test_email/test_detector.py new file mode 100644 index 0000000..87a5aac --- /dev/null +++ b/receiptwitness/tests/test_parsers/test_email/test_detector.py @@ -0,0 +1,49 @@ +"""Tests for retailer detector.""" + +from receiptwitness.parsers.email.base import EmailReceipt +from receiptwitness.parsers.email.detector import detect_retailer + + +def test_detect_meijer(): + email = EmailReceipt( + sender="receipts@meijer.com", + recipient="user@example.com", + subject="Your Receipt", + ) + assert detect_retailer(email) == "meijer" + + +def test_detect_kroger(): + email = EmailReceipt( + sender="noreply@email.kroger.com", + recipient="user@example.com", + subject="Your Receipt", + ) + assert detect_retailer(email) == "kroger" + + +def test_detect_target(): + email = EmailReceipt( + sender="Target ", + recipient="user@example.com", + subject="Your Receipt", + ) + assert detect_retailer(email) == "target" + + +def test_detect_unknown(): + email = EmailReceipt( + sender="noreply@walmart.com", + recipient="user@example.com", + subject="Your Receipt", + ) + assert detect_retailer(email) is None + + +def test_detect_case_insensitive(): + email = EmailReceipt( + sender="Receipts@MEIJER.COM", + recipient="user@example.com", + subject="Your Receipt", + ) + assert detect_retailer(email) == "meijer" diff --git a/receiptwitness/tests/test_parsers/test_email/test_kroger_email_parser.py b/receiptwitness/tests/test_parsers/test_email/test_kroger_email_parser.py new file mode 100644 index 0000000..ab30257 --- /dev/null +++ b/receiptwitness/tests/test_parsers/test_email/test_kroger_email_parser.py @@ -0,0 +1,93 @@ +"""Tests for KrogerEmailParser.""" + +from pathlib import Path + +from receiptwitness.parsers.email.base import EmailReceipt +from receiptwitness.parsers.email.kroger import KrogerEmailParser + +FIXTURE_PATH = Path(__file__).parent.parent.parent / "fixtures" / "kroger_email_receipt.html" + + +class TestKrogerEmailParser: + """Tests for KrogerEmailParser.""" + + def setup_method(self) -> None: + self.parser = KrogerEmailParser() + self.fixture_html = FIXTURE_PATH.read_text() + + def test_can_parse_kroger_sender(self) -> None: + email = EmailReceipt( + sender="noreply@email.kroger.com", + recipient="user@example.com", + subject="Your Kroger Receipt", + body_html=self.fixture_html, + ) + assert self.parser.can_parse(email) is True + + def test_can_parse_kroger_in_body(self) -> None: + email = EmailReceipt( + sender="someone@unknown.com", + recipient="user@example.com", + subject="Your Receipt", + body_html="Kroger digital receipt", + ) + assert self.parser.can_parse(email) is True + + def test_cannot_parse_unrelated(self) -> None: + email = EmailReceipt( + sender="noreply@walmart.com", + recipient="user@example.com", + subject="Your Receipt", + body_html="Walmart receipt", + ) + assert self.parser.can_parse(email) is False + + def test_parse_items(self) -> None: + email = EmailReceipt( + sender="noreply@kroger.com", + recipient="user@example.com", + subject="Your Kroger Receipt", + body_html=self.fixture_html, + ) + result = self.parser.parse(email) + items = result.get("items", []) + assert len(items) >= 3 + product_names = [item["product_name_raw"] for item in items] + assert any("Whole Milk" in name for name in product_names) + assert any("Sourdough" in name for name in product_names) + for item in items: + assert "unit_price" in item + assert "extended_price" in item + + def test_parse_totals(self) -> None: + email = EmailReceipt( + sender="noreply@kroger.com", + recipient="user@example.com", + subject="Your Kroger Receipt", + body_html=self.fixture_html, + ) + result = self.parser.parse(email) + total = result.get("total", 0) + assert total > 0 + + def test_parse_receipt_id(self) -> None: + email = EmailReceipt( + sender="noreply@kroger.com", + recipient="user@example.com", + subject="Your Kroger Receipt", + body_html=self.fixture_html, + ) + result = self.parser.parse(email) + receipt_id = result.get("receipt_id", "") + assert "KR-2026" in receipt_id or "TXN" in receipt_id + + def test_parse_date(self) -> None: + email = EmailReceipt( + sender="noreply@kroger.com", + recipient="user@example.com", + subject="Your Kroger Receipt", + body_html=self.fixture_html, + ) + result = self.parser.parse(email) + purchase_date = result.get("purchase_date", "") + assert purchase_date == "2026-03-15" diff --git a/receiptwitness/tests/test_parsers/test_email/test_meijer_parser.py b/receiptwitness/tests/test_parsers/test_email/test_meijer_parser.py new file mode 100644 index 0000000..3c33976 --- /dev/null +++ b/receiptwitness/tests/test_parsers/test_email/test_meijer_parser.py @@ -0,0 +1,182 @@ +"""Tests for the Meijer email receipt parser.""" + +import os +from decimal import Decimal + +import pytest + +from receiptwitness.parsers.email.base import EmailReceipt +from receiptwitness.parsers.email.meijer import MeijerEmailParser + +FIXTURE_PATH = os.path.join( + os.path.dirname(__file__), "..", "..", "fixtures", "meijer_email_receipt.html" +) + + +def load_fixture() -> str: + with open(FIXTURE_PATH) as f: + return f.read() + + +@pytest.fixture +def meijer_email() -> EmailReceipt: + html = load_fixture() + return EmailReceipt( + sender="Meijer Receipts ", + recipient="shopper@example.com", + subject="Your Meijer Receipt — Transaction #TXN-2026-0315-0042", + body_html=html, + body_plain=None, + received_at="2026-03-15T14:34:00Z", + ) + + +@pytest.fixture +def kroger_email() -> EmailReceipt: + return EmailReceipt( + sender="Kroger ", + recipient="shopper@example.com", + subject="Your Kroger Receipt", + body_html="Kroger receipt", + ) + + +class TestCanParse: + def test_can_parse_meijer(self, meijer_email: EmailReceipt): + parser = MeijerEmailParser() + assert parser.can_parse(meijer_email) is True + + def test_cannot_parse_kroger(self, kroger_email: EmailReceipt): + parser = MeijerEmailParser() + assert parser.can_parse(kroger_email) is False + + def test_can_parse_meijer_plain_sender(self): + email = EmailReceipt( + sender="receipts@meijer.com", + recipient="shopper@example.com", + subject="Receipt", + body_html="", + ) + parser = MeijerEmailParser() + assert parser.can_parse(email) is True + + def test_cannot_parse_non_meijer(self): + email = EmailReceipt( + sender=" Target ", + recipient="shopper@example.com", + subject="Target Receipt", + body_html="", + ) + parser = MeijerEmailParser() + assert parser.can_parse(email) is False + + +class TestParseMeijerReceipt: + def test_receipt_id_extracted(self, meijer_email: EmailReceipt): + parser = MeijerEmailParser() + result = parser.parse(meijer_email) + assert result["receipt_id"] == "TXN-2026-0315-0042" + + def test_purchase_date_extracted(self, meijer_email: EmailReceipt): + parser = MeijerEmailParser() + result = parser.parse(meijer_email) + assert result["purchase_date"] == "2026-03-15" + + def test_items_extracted(self, meijer_email: EmailReceipt): + parser = MeijerEmailParser() + result = parser.parse(meijer_email) + items = result["items"] + assert len(items) == 8 + + names = [item["product_name_raw"] for item in items] + assert "ORGANIC BANANAS" in names + assert "WHOLE MILK 1 GAL" in names + assert "GROUND BEEF 85/15 1LB" in names + + def test_item_quantities(self, meijer_email: EmailReceipt): + parser = MeijerEmailParser() + result = parser.parse(meijer_email) + # Find ORGANIC BANANAS + bananas = next(i for i in result["items"] if "BANANAS" in i["product_name_raw"]) + assert bananas["quantity"] == Decimal("1") + + def test_item_prices(self, meijer_email: EmailReceipt): + parser = MeijerEmailParser() + result = parser.parse(meijer_email) + # Find ORGANIC BANANAS + bananas = next(i for i in result["items"] if "BANANAS" in i["product_name_raw"]) + assert bananas["unit_price"] == Decimal("0.69") + assert bananas["extended_price"] == Decimal("0.69") + + def test_totals(self, meijer_email: EmailReceipt): + parser = MeijerEmailParser() + result = parser.parse(meijer_email) + assert result["total"] == Decimal("33.41") + assert result["subtotal"] == Decimal("31.22") + assert result["tax"] == Decimal("2.19") + assert result["savings_total"] == Decimal("3.40") + + +class TestParseHandlesMissingFields: + def test_missing_body_html_falls_back_to_plain(self): + email = EmailReceipt( + sender="receipts@email.meijer.com", + recipient="shopper@example.com", + subject="Your Meijer Receipt", + body_html=None, + body_plain="TXN-1234 | March 15, 2026 | Total: $10.00", + ) + parser = MeijerEmailParser() + result = parser.parse(email) + # Should not raise, returns minimal result + assert result["receipt_id"] == "" + assert result["purchase_date"] == "2026-03-15" + assert result["total"] == Decimal("10.00") + + def test_empty_email(self): + email = EmailReceipt( + sender="receipts@email.meijer.com", + recipient="shopper@example.com", + subject="Receipt", + body_html="", + body_plain="", + ) + parser = MeijerEmailParser() + result = parser.parse(email) + assert result["receipt_id"] == "" + assert result["purchase_date"] == "" + assert result["total"] == Decimal("0") + assert result["items"] == [] + + def test_missing_subject_date_from_body(self): + html = """ + + +

Thank you for shopping on April 1, 2026

+

Total: $15.00

+ + + """ + email = EmailReceipt( + sender="receipts@email.meijer.com", + recipient="shopper@example.com", + subject=None, + body_html=html, + ) + parser = MeijerEmailParser() + result = parser.parse(email) + assert result["purchase_date"] == "2026-04-01" + + def test_missing_totals_defaults_to_zero(self): + html = "

Just an email with no totals

" + email = EmailReceipt( + sender="receipts@email.meijer.com", + recipient="shopper@example.com", + subject="Receipt", + body_html=html, + ) + parser = MeijerEmailParser() + result = parser.parse(email) + assert result["total"] == Decimal("0") + assert result["subtotal"] is None + assert result["tax"] is None diff --git a/receiptwitness/tests/test_parsers/test_email/test_target_email_parser.py b/receiptwitness/tests/test_parsers/test_email/test_target_email_parser.py new file mode 100644 index 0000000..ffa33db --- /dev/null +++ b/receiptwitness/tests/test_parsers/test_email/test_target_email_parser.py @@ -0,0 +1,93 @@ +"""Tests for TargetEmailParser.""" + +from pathlib import Path + +from receiptwitness.parsers.email.base import EmailReceipt +from receiptwitness.parsers.email.target import TargetEmailParser + +FIXTURE_PATH = Path(__file__).parent.parent.parent / "fixtures" / "target_email_receipt.html" + + +class TestTargetEmailParser: + """Tests for TargetEmailParser.""" + + def setup_method(self) -> None: + self.parser = TargetEmailParser() + self.fixture_html = FIXTURE_PATH.read_text() + + def test_can_parse_target_sender(self) -> None: + email = EmailReceipt( + sender="receipts@target.com", + recipient="user@example.com", + subject="Your Target Order Confirmation", + body_html=self.fixture_html, + ) + assert self.parser.can_parse(email) is True + + def test_can_parse_circle_in_body(self) -> None: + email = EmailReceipt( + sender="someone@unknown.com", + recipient="user@example.com", + subject="Your Receipt", + body_html="Target Circle savings offer", + ) + assert self.parser.can_parse(email) is True + + def test_cannot_parse_unrelated(self) -> None: + email = EmailReceipt( + sender="noreply@walmart.com", + recipient="user@example.com", + subject="Your Receipt", + body_html="Walmart receipt", + ) + assert self.parser.can_parse(email) is False + + def test_parse_items(self) -> None: + email = EmailReceipt( + sender="orders@target.com", + recipient="user@example.com", + subject="Your Target Order", + body_html=self.fixture_html, + ) + result = self.parser.parse(email) + items = result.get("items", []) + assert len(items) >= 3 + product_names = [item["product_name_raw"] for item in items] + assert any("Whole Milk" in name for name in product_names) + assert any("Arborio" in name for name in product_names) + for item in items: + assert "unit_price" in item + assert "extended_price" in item + + def test_parse_totals(self) -> None: + email = EmailReceipt( + sender="orders@target.com", + recipient="user@example.com", + subject="Your Target Order", + body_html=self.fixture_html, + ) + result = self.parser.parse(email) + total = result.get("total", 0) + assert total > 0 + + def test_parse_receipt_id(self) -> None: + email = EmailReceipt( + sender="orders@target.com", + recipient="user@example.com", + subject="Your Target Order", + body_html=self.fixture_html, + ) + result = self.parser.parse(email) + receipt_id = result.get("receipt_id", "") + assert "TGT-2026" in receipt_id or "CNF" in receipt_id + + def test_parse_date(self) -> None: + email = EmailReceipt( + sender="orders@target.com", + recipient="user@example.com", + subject="Your Target Order", + body_html=self.fixture_html, + ) + result = self.parser.parse(email) + purchase_date = result.get("purchase_date", "") + assert purchase_date == "2026-03-18" diff --git a/receiptwitness/tests/test_queue/__init__.py b/receiptwitness/tests/test_queue/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/receiptwitness/tests/test_queue/test_email_queue.py b/receiptwitness/tests/test_queue/test_email_queue.py new file mode 100644 index 0000000..05ffb51 --- /dev/null +++ b/receiptwitness/tests/test_queue/test_email_queue.py @@ -0,0 +1,79 @@ +"""Tests for email queue using DragonflyDB Streams.""" + +import pytest +from fakeredis import aioredis as fake_aioredis + +from receiptwitness.queue.email import ( + CONSUMER_GROUP, + STREAM_KEY, + EmailJob, + ack_email, + consume_emails, + enqueue_email, + ensure_consumer_group, +) + + +@pytest.fixture +async def fake_client(): + """Yield a fake async Redis client.""" + client = fake_aioredis.FakeRedis(decode_responses=True) + yield client + await client.aclose() + + +@pytest.fixture +def sample_job(): + """Sample EmailJob for testing.""" + return EmailJob( + user_id="user-123", + sender="no-reply@kroger.com", + recipient="user@example.com", + subject="Kroger Receipt", + body_html="Receipt", + body_plain="Receipt", + received_at="2026-04-01T12:00:00Z", + message_id="msg-abc-123", + ) + + +@pytest.mark.asyncio +async def test_enqueue_and_consume(fake_client, sample_job): + """Enqueue a job, consume it, verify fields match.""" + msg_id = await enqueue_email(fake_client, sample_job) + assert msg_id is not None + + consumed = await consume_emails(fake_client, "test-worker", count=1, block_ms=100) + assert len(consumed) == 1 + consumed_id, consumed_job = consumed[0] + assert consumed_id == msg_id + assert consumed_job.user_id == sample_job.user_id + assert consumed_job.sender == sample_job.sender + assert consumed_job.recipient == sample_job.recipient + assert consumed_job.subject == sample_job.subject + assert consumed_job.message_id == sample_job.message_id + + +@pytest.mark.asyncio +async def test_ack_removes_from_pending(fake_client, sample_job): + """After ack, message is no longer pending.""" + msg_id = await enqueue_email(fake_client, sample_job) + + # Consume the message (moves it to pending) + consumed = await consume_emails(fake_client, "test-worker", count=1, block_ms=100) + assert len(consumed) == 1 + + # Acknowledge it + await ack_email(fake_client, msg_id) + + # Check pending count for this consumer group + pending = await fake_client.xpending(STREAM_KEY, CONSUMER_GROUP) + assert pending is None or pending["pending"] == 0 + + +@pytest.mark.asyncio +async def test_ensure_consumer_group_idempotent(fake_client): + """Calling ensure_consumer_group twice does not error.""" + await ensure_consumer_group(fake_client) + # Calling again should not raise + await ensure_consumer_group(fake_client) diff --git a/receiptwitness/tests/test_worker/__init__.py b/receiptwitness/tests/test_worker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/receiptwitness/tests/test_worker/test_email_worker.py b/receiptwitness/tests/test_worker/test_email_worker.py new file mode 100644 index 0000000..bc05724 --- /dev/null +++ b/receiptwitness/tests/test_worker/test_email_worker.py @@ -0,0 +1,188 @@ +"""Tests for email_worker.""" + +from decimal import Decimal +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fakeredis import aioredis as fake_aioredis + +from receiptwitness.parsers.email.base import EmailReceipt +from receiptwitness.queue.email import ( + EmailJob, +) +from receiptwitness.worker.email_worker import ( + process_job, + resolve_user, +) + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +async def fake_redis(): + """Fake async Redis client for queue testing.""" + client = fake_aioredis.FakeRedis(decode_responses=True) + yield client + await client.aclose() + + +@pytest.fixture +def sample_email_job(): + """Sample EmailJob matching DragonflyDB queue schema.""" + return EmailJob( + user_id="token-abc-123", + sender="no-reply@meijer.com", + recipient="user@example.com", + subject="Your Meijer Receipt", + body_html="Total: $42.00", + body_plain="Total: $42.00", + received_at="2026-04-01T12:00:00Z", + message_id="msg-xyz-789", + ) + + +@pytest.fixture +def sample_email(): + """Sample EmailReceipt for parser testing.""" + return EmailReceipt( + sender="no-reply@meijer.com", + recipient="user@example.com", + subject="Your Meijer Receipt", + body_html="Total: $42.00
Receipt #12345", + body_plain="Total: $42.00", + received_at="2026-04-01T12:00:00Z", + ) + + +# --------------------------------------------------------------------------- +# resolve_user tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_resolve_user_valid_token(): + """Valid token returns user_id string.""" + mock_session = AsyncMock() + mock_result = MagicMock() + mock_result.scalar_one_or_none.return_value = "user-uuid-42" + mock_session.execute.return_value = mock_result + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=None) + + factory = MagicMock(return_value=mock_session) + + with patch( + "receiptwitness.worker.email_worker.get_async_session_factory", + return_value=factory, + ): + user_id = await resolve_user("token-abc-123") + + assert user_id == "user-uuid-42" + factory.assert_called_once() + + +@pytest.mark.asyncio +async def test_resolve_user_invalid_token(): + """Invalid token returns None.""" + mock_session = AsyncMock() + mock_result = MagicMock() + mock_result.scalar_one_or_none.return_value = None + mock_session.execute.return_value = mock_result + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=None) + + factory = MagicMock(return_value=mock_session) + + with patch( + "receiptwitness.worker.email_worker.get_async_session_factory", + return_value=factory, + ): + user_id = await resolve_user("bad-token") + + assert user_id is None + + +# --------------------------------------------------------------------------- +# process_job tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_process_job_unknown_retailer(sample_email_job): + """Unknown retailer logs warning and returns True (ack, no retry).""" + unknown_job = EmailJob( + user_id="token-abc-123", + sender="no-reply@unknownretailer.com", + recipient="user@example.com", + subject="Receipt", + body_html="", + body_plain="", + received_at="2026-04-01T12:00:00Z", + message_id="msg-xyz-789", + ) + + with ( + patch( + "receiptwitness.worker.email_worker.resolve_user", + return_value="user-uuid-42", + ), + patch( + "receiptwitness.worker.email_worker.publish_receipt_ingested", + new_callable=AsyncMock, + ) as mock_publish, + ): + result = await process_job("msg-id-1", unknown_job) + + assert result is True + mock_publish.assert_not_called() + + +@pytest.mark.asyncio +async def test_process_job_success(sample_email_job, sample_email): + """Known retailer: full pipeline runs — parse, normalize, publish event.""" + parsed_data = { + "receipt_id": "RCP-999", + "purchase_date": "2026-04-01", + "total": Decimal("42.00"), + "items": [ + { + "product_name_raw": "ORGANIC BANANAS", + "quantity": Decimal("1"), + "unit_price": Decimal("0.69"), + "extended_price": Decimal("0.69"), + }, + ], + } + + mock_parser = MagicMock() + mock_parser.parse.return_value = parsed_data + + with ( + patch( + "receiptwitness.worker.email_worker.resolve_user", + return_value="user-uuid-42", + ), + patch.dict( + "receiptwitness.worker.email_worker.PARSERS", + {"meijer": mock_parser}, + clear=False, + ), + patch( + "receiptwitness.worker.email_worker.publish_receipt_ingested", + new_callable=AsyncMock, + ) as mock_publish, + ): + result = await process_job("msg-id-1", sample_email_job) + + assert result is True + mock_parser.parse.assert_called_once() + mock_publish.assert_called_once_with( + user_id="user-uuid-42", + store_slug="meijer", + purchase_id="RCP-999", + purchase_date="2026-04-01", + item_count=1, + total=Decimal("42.00"), + )