diff --git a/receiptwitness/pyproject.toml b/receiptwitness/pyproject.toml
index f32acfc..dd3d6ea 100644
--- a/receiptwitness/pyproject.toml
+++ b/receiptwitness/pyproject.toml
@@ -14,11 +14,13 @@ dependencies = [
"cryptography>=42.0,<44.0",
"fastapi>=0.115,<1.0",
"uvicorn[standard]>=0.30,<1.0",
+ "beautifulsoup4>=4.12,<5.0",
"redis>=5.0,<6.0",
"pydantic>=2.0,<3.0",
"pydantic-settings>=2.0,<3.0",
"sqlalchemy[asyncio]>=2.0,<3.0",
"asyncpg>=0.29,<1.0",
+ "resend>=2.0",
]
[project.optional-dependencies]
@@ -27,6 +29,9 @@ dev = [
"pytest-asyncio>=0.23",
"ruff>=0.3",
"pytest-cov>=5.0",
+ "fakeredis[aioredis]>=2.20",
+ "httpx>=0.27",
+ "python-multipart>=0.0.9",
]
[tool.hatch.build.targets.wheel]
diff --git a/receiptwitness/src/receiptwitness/api/routes.py b/receiptwitness/src/receiptwitness/api/routes.py
index 23cc109..483cdcc 100644
--- a/receiptwitness/src/receiptwitness/api/routes.py
+++ b/receiptwitness/src/receiptwitness/api/routes.py
@@ -1,9 +1,61 @@
"""Internal API routes for triggering scrapes and checking status."""
-from fastapi import APIRouter
+import hashlib
+import hmac
+import re
+import time
+
+from fastapi import APIRouter, HTTPException, Request
+
+from receiptwitness.config import settings
+from receiptwitness.queue.email import EmailJob, enqueue_email, get_redis
router = APIRouter()
+TOKEN_PATTERN = re.compile(r"receipts\+([A-Za-z0-9_-]+)@")
+
+
+def verify_mailgun_signature(token: str, timestamp: str, signature: str) -> bool:
+ """Verify Mailgun webhook signature."""
+ if abs(time.time() - int(timestamp)) > 300: # 5 min freshness
+ return False
+ key = settings.mailgun_webhook_signing_key.encode()
+ hmac_digest = hmac.new(key, f"{timestamp}{token}".encode(), hashlib.sha256).hexdigest()
+ return hmac.compare_digest(signature, hmac_digest)
+
+
+@router.post("/inbound/email")
+async def receive_inbound_email(request: Request):
+ form = await request.form()
+ # 1. Verify Mailgun signature
+ token = str(form.get("token", ""))
+ timestamp = str(form.get("timestamp", ""))
+ signature = str(form.get("signature", ""))
+ if not verify_mailgun_signature(token, timestamp, signature):
+ raise HTTPException(status_code=406, detail="Invalid signature")
+ # 2. Extract account token from recipient
+ recipient = str(form.get("recipient", ""))
+ match = TOKEN_PATTERN.search(recipient)
+ if not match:
+ raise HTTPException(status_code=406, detail="Invalid recipient")
+ account_token = match.group(1)
+ # 3. Enqueue — worker resolves token -> user_id
+ body_html_val = form.get("body-html")
+ body_plain_val = form.get("body-plain")
+ job = EmailJob(
+ user_id=account_token,
+ sender=str(form.get("sender", "")),
+ recipient=recipient,
+ subject=str(form.get("subject", "")),
+ body_html=str(body_html_val) if body_html_val is not None else None,
+ body_plain=str(body_plain_val) if body_plain_val is not None else None,
+ received_at=str(form.get("timestamp", "")),
+ message_id=str(form.get("Message-Id", "")),
+ )
+ client = await get_redis()
+ await enqueue_email(client, job)
+ return {"status": "queued"}
+
@router.get("/health")
async def health():
diff --git a/receiptwitness/src/receiptwitness/config.py b/receiptwitness/src/receiptwitness/config.py
index 1341f3f..358b965 100644
--- a/receiptwitness/src/receiptwitness/config.py
+++ b/receiptwitness/src/receiptwitness/config.py
@@ -22,5 +22,13 @@ class ReceiptWitnessSettings(BaseSettings):
headless: bool = True
browser_timeout_ms: int = 60000
+ # Email notifications (Resend)
+ resend_api_key: str = ""
+ notification_email_from: str = "notifications@cartsnitch.com"
+ notifications_enabled: bool = False
+
+ # Mailgun inbound email webhook
+ mailgun_webhook_signing_key: str = ""
+
settings = ReceiptWitnessSettings()
diff --git a/receiptwitness/src/receiptwitness/events.py b/receiptwitness/src/receiptwitness/events.py
index 3d75614..a9e6204 100644
--- a/receiptwitness/src/receiptwitness/events.py
+++ b/receiptwitness/src/receiptwitness/events.py
@@ -2,12 +2,17 @@
import json
import logging
+import uuid
from datetime import UTC, datetime
from decimal import Decimal
import redis.asyncio as aioredis
+from cartsnitch_common.database import get_async_session_factory
+from cartsnitch_common.models.user import User
+from sqlalchemy import select
from receiptwitness.config import settings
+from receiptwitness.notifications.email import send_receipt_notification
logger = logging.getLogger(__name__)
@@ -39,6 +44,36 @@ async def get_redis_client() -> aioredis.Redis:
return aioredis.Redis(connection_pool=_get_pool())
+async def _send_notification_for_event(payload: dict) -> None:
+ """Look up user email and send receipt notification. Silently skips on error."""
+ try:
+ user_uuid = uuid.UUID(payload["user_id"])
+ except (ValueError, KeyError):
+ logger.warning("Invalid user_id in event payload: %s", payload.get("user_id"))
+ return
+
+ try:
+ session_factory = get_async_session_factory(settings.database_url)
+ async with session_factory() as session:
+ result = await session.execute(select(User.email).where(User.id == user_uuid))
+ row = result.scalar_one_or_none()
+ if not row:
+ logger.warning("User %s not found for notification", user_uuid)
+ return
+ user_email = row
+ except Exception:
+ logger.exception("Failed to look up user email for notification")
+ return
+
+ await send_receipt_notification(
+ user_email=user_email,
+ store_name=payload["store_slug"],
+ item_count=payload["item_count"],
+ total=payload["total"],
+ purchase_date=payload["purchase_date"],
+ )
+
+
async def publish_receipt_ingested(
user_id: str,
store_slug: str,
@@ -48,18 +83,19 @@ async def publish_receipt_ingested(
total: Decimal | float,
) -> None:
"""Publish a cartsnitch.receipts.ingested event after successful ingestion."""
+ payload = {
+ "user_id": user_id,
+ "store_slug": store_slug,
+ "purchase_id": purchase_id,
+ "purchase_date": purchase_date,
+ "item_count": item_count,
+ "total": float(total) if isinstance(total, Decimal) else total,
+ }
event = {
"event_type": CHANNEL_RECEIPTS_INGESTED,
"timestamp": datetime.now(UTC).isoformat(),
"service": "receiptwitness",
- "payload": {
- "user_id": user_id,
- "store_slug": store_slug,
- "purchase_id": purchase_id,
- "purchase_date": purchase_date,
- "item_count": item_count,
- "total": float(total) if isinstance(total, Decimal) else total,
- },
+ "payload": payload,
}
try:
@@ -73,3 +109,5 @@ async def publish_receipt_ingested(
except aioredis.ConnectionError:
logger.error("Failed to publish event — Redis/DragonflyDB connection error")
raise
+ else:
+ await _send_notification_for_event(payload)
diff --git a/receiptwitness/src/receiptwitness/notifications/__init__.py b/receiptwitness/src/receiptwitness/notifications/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/receiptwitness/src/receiptwitness/notifications/email.py b/receiptwitness/src/receiptwitness/notifications/email.py
new file mode 100644
index 0000000..d5723f2
--- /dev/null
+++ b/receiptwitness/src/receiptwitness/notifications/email.py
@@ -0,0 +1,45 @@
+"""Email notifications via Resend."""
+
+import asyncio
+import html
+import logging
+
+import resend
+
+from receiptwitness.config import settings
+
+logger = logging.getLogger(__name__)
+
+
+async def send_receipt_notification(
+ user_email: str,
+ store_name: str,
+ item_count: int,
+ total: float,
+ purchase_date: str,
+) -> None:
+ """Send receipt ingestion confirmation email via Resend."""
+ if not settings.notifications_enabled or not settings.resend_api_key:
+ logger.debug("Notifications disabled — skipping email send")
+ return
+
+ resend.api_key = settings.resend_api_key
+ store_name_safe = html.escape(store_name)
+ purchase_date_safe = html.escape(purchase_date)
+ try:
+ await asyncio.to_thread(
+ resend.Emails.send,
+ {
+ "from": settings.notification_email_from,
+ "to": [user_email],
+ "subject": f"Receipt processed: {store_name} - ${total:.2f}",
+ "html": (
+ f"
Your receipt from {store_name_safe} on "
+ f"{purchase_date_safe} has been processed.
"
+ f"{item_count} items, total: ${total:.2f}
"
+ ),
+ },
+ )
+ logger.info("Receipt notification sent to %s", user_email)
+ except Exception:
+ logger.exception("Failed to send receipt notification to %s", user_email)
diff --git a/receiptwitness/src/receiptwitness/parsers/email/__init__.py b/receiptwitness/src/receiptwitness/parsers/email/__init__.py
new file mode 100644
index 0000000..9d01da5
--- /dev/null
+++ b/receiptwitness/src/receiptwitness/parsers/email/__init__.py
@@ -0,0 +1 @@
+"""Email receipt parsers for retailer email receipts."""
diff --git a/receiptwitness/src/receiptwitness/parsers/email/base.py b/receiptwitness/src/receiptwitness/parsers/email/base.py
new file mode 100644
index 0000000..a25535e
--- /dev/null
+++ b/receiptwitness/src/receiptwitness/parsers/email/base.py
@@ -0,0 +1,32 @@
+"""Base interface for email receipt parsers."""
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+
+
+@dataclass
+class EmailReceipt:
+ """Raw email data before parsing."""
+
+ sender: str
+ recipient: str
+ subject: str
+ body_html: str | None = None
+ body_plain: str | None = None
+ received_at: str | None = None
+ raw_headers: dict = field(default_factory=dict)
+
+
+class BaseEmailParser(ABC):
+ """All retailer email parsers implement this interface."""
+
+ @abstractmethod
+ def can_parse(self, email: EmailReceipt) -> bool:
+ """Return True if this parser handles this email."""
+ ...
+
+ @abstractmethod
+ def parse(self, email: EmailReceipt) -> dict:
+ """Parse email into a dict matching PurchaseCreate schema fields.
+ Must include an items list matching PurchaseItemCreate fields."""
+ ...
diff --git a/receiptwitness/src/receiptwitness/parsers/email/detector.py b/receiptwitness/src/receiptwitness/parsers/email/detector.py
new file mode 100644
index 0000000..e71f769
--- /dev/null
+++ b/receiptwitness/src/receiptwitness/parsers/email/detector.py
@@ -0,0 +1,25 @@
+"""Detect which retailer sent a receipt email."""
+
+import re
+
+from receiptwitness.parsers.email.base import EmailReceipt
+
+RETAILER_PATTERNS: dict[str, list[str]] = {
+ "meijer": [r"@meijer\.com$", r"@email\.meijer\.com$"],
+ "kroger": [r"@kroger\.com$", r"@email\.kroger\.com$"],
+ "target": [r"@target\.com$", r"@email\.target\.com$"],
+}
+
+
+def detect_retailer(email: EmailReceipt) -> str | None:
+ """Return retailer slug or None if unrecognized."""
+ sender = email.sender.lower().strip()
+ # Extract email from "Name " format
+ match = re.search(r"<([^>]+)>", sender)
+ if match:
+ sender = match.group(1)
+ for retailer, patterns in RETAILER_PATTERNS.items():
+ for pattern in patterns:
+ if re.search(pattern, sender):
+ return retailer
+ return None
diff --git a/receiptwitness/src/receiptwitness/parsers/email/kroger.py b/receiptwitness/src/receiptwitness/parsers/email/kroger.py
new file mode 100644
index 0000000..364f59e
--- /dev/null
+++ b/receiptwitness/src/receiptwitness/parsers/email/kroger.py
@@ -0,0 +1,157 @@
+"""Kroger email receipt parser."""
+
+import logging
+import re
+from datetime import datetime
+from decimal import Decimal, InvalidOperation
+
+from bs4 import BeautifulSoup
+
+from receiptwitness.parsers.email.base import BaseEmailParser, EmailReceipt
+
+logger = logging.getLogger(__name__)
+
+
+def _to_decimal(value: str | float | int | None, default: Decimal = Decimal("0")) -> Decimal:
+ """Safely convert a value to Decimal."""
+ if value is None:
+ return default
+ try:
+ return Decimal(str(value).replace("$", "").replace(",", "").strip())
+ except (InvalidOperation, ValueError):
+ return default
+
+
+def _extract_total(body: str) -> Decimal:
+ """Extract the transaction total from email body."""
+ patterns = [
+ r"Total[:\s]*\$?([0-9,]+\.[0-9]{2})",
+ r"Amount[:\s]*\$?([0-9,]+\.[0-9]{2})",
+ r"Grand\s+Total[:\s]*\$?([0-9,]+\.[0-9]{2})",
+ ]
+ for pattern in patterns:
+ match = re.search(pattern, body, re.IGNORECASE)
+ if match:
+ return _to_decimal(match.group(1))
+ return Decimal("0")
+
+
+def _extract_receipt_id(body: str) -> str | None:
+ """Extract receipt ID / transaction ID from HTML body.
+
+ Strips HTML tags first so that whitespace between delimiters and values
+ (e.g. from `` KR-2026-0315-4829`` -> `` KR-2026-0315-4829``)
+ is normalized and the pattern can match cleanly.
+ """
+ stripped = re.sub(r"<[^>]+>", "", body)
+ patterns = [
+ r"Receipt\s*#[:\s]*([A-Z0-9-]+)",
+ r"Transaction\s*#[:\s]*([A-Z0-9-]+)",
+ r"Order\s*#[:\s]*([A-Z0-9-]+)",
+ r"Confirmation\s*#[:\s]*([A-Z0-9-]+)",
+ ]
+ for pattern in patterns:
+ match = re.search(pattern, stripped, re.IGNORECASE)
+ if match:
+ return match.group(1)
+ return None
+
+
+def _extract_date(body: str) -> str:
+ """Extract purchase date from email body. Returns ISO date string or empty string."""
+ patterns = [
+ r"(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})",
+ r"([A-Z][a-z]{2}\s+\d{1,2},?\s+\d{4})",
+ ]
+ for pattern in patterns:
+ match = re.search(pattern, body)
+ if match:
+ raw = match.group(1)
+ try:
+ dt = datetime.strptime(raw.replace(",", ""), "%b %d %Y")
+ return dt.strftime("%Y-%m-%d")
+ except ValueError:
+ pass
+ try:
+ for fmt in ("%m/%d/%Y", "%m/%d/%y", "%d/%m/%Y", "%d/%m/%y"):
+ try:
+ dt = datetime.strptime(raw, fmt)
+ return dt.strftime("%Y-%m-%d")
+ except ValueError:
+ continue
+ except Exception:
+ pass
+ return ""
+
+
+def _extract_items_soup(body: str) -> list[dict]:
+ """Extract line items from HTML email body using BeautifulSoup."""
+ items = []
+ try:
+ soup = BeautifulSoup(body, "html.parser")
+ text = soup.get_text(separator="\n", strip=True)
+ # Strip HTML tags from raw body to normalize whitespace
+ stripped = re.sub(r"<[^>]+>", " ", body)
+ stripped = re.sub(r"\s+", " ", stripped)
+ skip_prefixes = (
+ "Subtotal",
+ "Tax",
+ "Total",
+ "Kroger",
+ "Target",
+ "Date",
+ "Receipt",
+ "Order",
+ "Transaction",
+ "Confirmation",
+ "Thank",
+ "Questions",
+ "Keep",
+ "Receipt",
+ )
+ for line in text.split("\n"):
+ line = line.strip()
+ if not line or line.startswith(skip_prefixes):
+ continue
+ # Match lines like "Product Name $9.99"
+ match = re.match(r"(.+?)\s+\$([0-9]+\.[0-9]{2})\s*$", line)
+ if match:
+ name = match.group(1).strip()
+ price = _to_decimal(match.group(2))
+ if len(name) > 2 and price > 0:
+ items.append(
+ {
+ "product_name_raw": name,
+ "quantity": Decimal("1"),
+ "unit_price": price,
+ "extended_price": price,
+ }
+ )
+ except Exception:
+ pass
+ return items[:20]
+
+
+class KrogerEmailParser(BaseEmailParser):
+ """Parse Kroger email receipts (digital receipts via kroger.com)."""
+
+ KROGER_KEYWORDS = ("kroger", "kroger.com", "plus")
+
+ def can_parse(self, email: EmailReceipt) -> bool:
+ sender = (email.sender or "").lower()
+ body = (email.body_html or email.body_plain or "").lower()
+ return any(kw in sender or kw in body for kw in self.KROGER_KEYWORDS)
+
+ def parse(self, email: EmailReceipt) -> dict:
+ body = (email.body_html or email.body_plain or "").strip()
+ total = _extract_total(body)
+ receipt_id = _extract_receipt_id(body) or ""
+ purchase_date = _extract_date(body)
+ items = _extract_items_soup(body)
+
+ return {
+ "receipt_id": receipt_id,
+ "purchase_date": purchase_date,
+ "total": total,
+ "items": items,
+ }
diff --git a/receiptwitness/src/receiptwitness/parsers/email/meijer.py b/receiptwitness/src/receiptwitness/parsers/email/meijer.py
new file mode 100644
index 0000000..598acb7
--- /dev/null
+++ b/receiptwitness/src/receiptwitness/parsers/email/meijer.py
@@ -0,0 +1,259 @@
+"""Parse Meijer digital receipt emails into structured purchase data."""
+
+import re
+from decimal import Decimal, InvalidOperation
+
+from bs4 import BeautifulSoup
+from bs4.element import Tag
+
+from receiptwitness.parsers.email.base import BaseEmailParser, EmailReceipt
+
+
+def _to_decimal(value, default: str = "0") -> Decimal:
+ """Safely convert a value to Decimal."""
+ if value is None:
+ return Decimal(default)
+ try:
+ return Decimal(str(value).replace("$", "").replace(",", "").strip())
+ except (InvalidOperation, ValueError, TypeError):
+ return Decimal(default)
+
+
+def _extract_receipt_id(soup: BeautifulSoup, subject: str | None) -> str | None:
+ """Extract receipt/transaction ID from subject or body."""
+ if subject:
+ match = re.search(r"TXN[-\s]\d{4}[-\s]\d{4}[-\s]\d+", subject)
+ if match:
+ return match.group(0).replace(" ", "-")
+ # Fallback: look in body
+ text = soup.get_text()
+ match = re.search(r"TXN[-\s]\d{4}[-\s]\d{4}[-\s]\d+", text)
+ if match:
+ return match.group(0).replace(" ", "-")
+ return None
+
+
+def _extract_purchase_date(soup: BeautifulSoup, subject: str | None) -> str | None:
+ """Extract purchase date from subject or body."""
+ text = soup.get_text()
+
+ # Try ISO format first: YYYY-MM-DD
+ match = re.search(r"(\d{4})-(\d{2})-(\d{2})", text)
+ if match:
+ return f"{match.group(1)}-{match.group(2)}-{match.group(3)}"
+
+ # Try written format: March 15, 2026
+ match = re.search(r"([A-Za-z]+)\s+(\d{1,2}),?\s+(\d{4})", text)
+ if match:
+ month_str = match.group(1).lower()
+ day = match.group(2)
+ year = match.group(3)
+ month_map = {
+ "january": "01",
+ "february": "02",
+ "march": "03",
+ "april": "04",
+ "may": "05",
+ "june": "06",
+ "july": "07",
+ "august": "08",
+ "september": "09",
+ "october": "10",
+ "november": "11",
+ "december": "12",
+ }
+ month = month_map.get(month_str)
+ if month:
+ return f"{year}-{month}-{day.zfill(2)}"
+
+ # MM/DD/YYYY
+ match = re.search(r"(\d{1,2})/(\d{1,2})/(\d{4})", text)
+ if match:
+ return f"{match.group(3)}-{match.group(1).zfill(2)}-{match.group(2).zfill(2)}"
+
+ return None
+
+
+def _extract_store_info(soup: BeautifulSoup) -> dict:
+ """Extract store name and number from the email body."""
+ store_info: dict = {}
+
+ # Look for store number in header
+ store_num_match = re.search(r"Meijer\s+Store\s+#?(\d+)", soup.get_text(), re.IGNORECASE)
+ if store_num_match:
+ store_info["store_number"] = store_num_match.group(1)
+
+ return store_info
+
+
+def _extract_items(table: Tag | None) -> list[dict]:
+ """Extract line items from the items table."""
+ items: list[dict] = []
+ if not table:
+ return items
+
+ rows = table.find_all("tr")
+ for row in rows:
+ cells = row.find_all("td")
+ if len(cells) < 3:
+ continue
+
+ name_cell = cells[0].get_text(strip=True)
+ qty_cell = cells[1].get_text(strip=True)
+ price_cell = cells[2].get_text(strip=True)
+
+ if not name_cell or name_cell.lower() in ("item", "description"):
+ continue
+
+ # Skip subtotal/tax/total/savings rows
+ if any(
+ label in name_cell.lower()
+ for label in ("subtotal", "tax", "total", "savings", "grand total")
+ ):
+ continue
+
+ try:
+ quantity = Decimal(qty_cell)
+ except (InvalidOperation, ValueError, TypeError):
+ quantity = Decimal("1")
+
+ price_str = price_cell.replace("$", "").replace(",", "").strip()
+ try:
+ unit_price = Decimal(price_str)
+ except (InvalidOperation, ValueError, TypeError):
+ unit_price = Decimal("0")
+
+ extended_price = unit_price # Default to unit price; no qty column in fixture
+
+ items.append(
+ {
+ "product_name_raw": name_cell,
+ "quantity": quantity,
+ "unit_price": unit_price,
+ "extended_price": extended_price,
+ }
+ )
+
+ return items
+
+
+def _extract_totals_plain(text: str) -> dict:
+ """Extract totals from plain text (no HTML)."""
+ totals: dict = {
+ "subtotal": None,
+ "tax": None,
+ "total": None,
+ "savings_total": None,
+ }
+
+ match = re.search(r"\bSubtotal\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE)
+ if match:
+ totals["subtotal"] = _to_decimal(match.group(1))
+
+ match = re.search(r"\bTax\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE)
+ if match:
+ totals["tax"] = _to_decimal(match.group(1))
+
+ grand_total_match = re.search(r"Grand\s+Total\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE)
+ if grand_total_match:
+ totals["total"] = _to_decimal(grand_total_match.group(1))
+
+ savings_match = re.search(r"\bSavings\b[:\s$\-]*([0-9,]+\.?\d*)", text, re.IGNORECASE)
+ if savings_match:
+ totals["savings_total"] = _to_decimal(savings_match.group(1))
+
+ if totals["total"] is None:
+ total_match = re.search(r"\bTotal\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE)
+ if total_match:
+ totals["total"] = _to_decimal(total_match.group(1))
+
+ return totals
+
+
+def _extract_totals(soup: BeautifulSoup) -> dict:
+ """Extract subtotal, tax, total, and savings from the totals section."""
+ text = soup.get_text()
+
+ totals: dict = {
+ "subtotal": None,
+ "tax": None,
+ "total": None,
+ "savings_total": None,
+ }
+
+ # Subtotal — use word boundary to avoid matching "Subtotal" with "Total"
+ match = re.search(r"\bSubtotal\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE)
+ if match:
+ totals["subtotal"] = _to_decimal(match.group(1))
+
+ # Tax
+ match = re.search(r"\bTax\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE)
+ if match:
+ totals["tax"] = _to_decimal(match.group(1))
+
+ # Grand Total (before plain "Total" to avoid matching "Subtotal")
+ grand_total_match = re.search(r"Grand\s+Total\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE)
+ if grand_total_match:
+ totals["total"] = _to_decimal(grand_total_match.group(1))
+
+ # Savings — allow any combination of whitespace/$- around the number
+ savings_match = re.search(r"\bSavings\b[:\s$\-]*([0-9,]+\.?\d*)", text, re.IGNORECASE)
+ if savings_match:
+ totals["savings_total"] = _to_decimal(savings_match.group(1))
+
+ # Plain "Total" only if Grand Total wasn't found
+ if totals["total"] is None:
+ total_match = re.search(r"\bTotal\b[:\s$]*([0-9,]+\.?\d*)", text, re.IGNORECASE)
+ if total_match:
+ totals["total"] = _to_decimal(total_match.group(1))
+
+ return totals
+
+
+class MeijerEmailParser(BaseEmailParser):
+ """Parse Meijer digital receipt emails forwarded by users."""
+
+ def can_parse(self, email: EmailReceipt) -> bool:
+ sender = email.sender.lower().strip()
+ # Extract email from "Name " format
+ match = re.search(r"<([^>]+)>", sender)
+ if match:
+ sender = match.group(1)
+ return "meijer" in sender
+
+ def parse(self, email: EmailReceipt) -> dict:
+ body_html = email.body_html
+ body_plain = email.body_plain or ""
+ body = body_html or body_plain
+ soup = BeautifulSoup(body, "html.parser")
+
+ receipt_id = _extract_receipt_id(soup, email.subject)
+ purchase_date = _extract_purchase_date(soup, email.subject)
+ _ = _extract_store_info(soup)
+
+ # Find the items table — look for one with Item/Qty/Price headers
+ table = None
+ for tbl in soup.find_all("table"):
+ headers = tbl.find_all("th")
+ header_texts = [h.get_text(strip=True).lower() for h in headers]
+ if any("item" in h or "qty" in h or "price" in h for h in header_texts):
+ table = tbl
+ break
+
+ items = _extract_items(table)
+
+ # Extract totals from HTML; fall back to plain text if no HTML
+ if body_html:
+ totals = _extract_totals(soup)
+ else:
+ totals = _extract_totals_plain(body_plain)
+
+ return {
+ "receipt_id": receipt_id or "",
+ "purchase_date": purchase_date or "",
+ "total": totals["total"] or Decimal("0"),
+ "subtotal": totals["subtotal"],
+ "tax": totals["tax"],
+ "savings_total": totals["savings_total"],
+ "items": items,
+ }
diff --git a/receiptwitness/src/receiptwitness/parsers/email/target.py b/receiptwitness/src/receiptwitness/parsers/email/target.py
new file mode 100644
index 0000000..c7e58d3
--- /dev/null
+++ b/receiptwitness/src/receiptwitness/parsers/email/target.py
@@ -0,0 +1,156 @@
+"""Target email receipt parser."""
+
+import logging
+import re
+from datetime import datetime
+from decimal import Decimal, InvalidOperation
+
+from bs4 import BeautifulSoup
+
+from receiptwitness.parsers.email.base import BaseEmailParser, EmailReceipt
+
+logger = logging.getLogger(__name__)
+
+
+def _to_decimal(value: str | float | int | None, default: Decimal = Decimal("0")) -> Decimal:
+ """Safely convert a value to Decimal."""
+ if value is None:
+ return default
+ try:
+ return Decimal(str(value).replace("$", "").replace(",", "").strip())
+ except (InvalidOperation, ValueError):
+ return default
+
+
+def _extract_total(body: str) -> Decimal:
+ """Extract the transaction total from email body."""
+ patterns = [
+ r"Total[:\s]*\$?([0-9,]+\.[0-9]{2})",
+ r"Amount[:\s]*\$?([0-9,]+\.[0-9]{2})",
+ r"Grand\s+Total[:\s]*\$?([0-9,]+\.[0-9]{2})",
+ ]
+ for pattern in patterns:
+ match = re.search(pattern, body, re.IGNORECASE)
+ if match:
+ return _to_decimal(match.group(1))
+ return Decimal("0")
+
+
+def _extract_receipt_id(body: str) -> str | None:
+ """Extract receipt ID / transaction ID from HTML body.
+
+ Strips HTML tags first so that whitespace between delimiters and values
+ (e.g. from `` TGT-2026-0318-9124`` -> `` TGT-2026-0318-9124``)
+ is normalized and the pattern can match cleanly.
+ """
+ stripped = re.sub(r"<[^>]+>", "", body)
+ patterns = [
+ r"Receipt\s*#[:\s]*([A-Z0-9-]+)",
+ r"Order\s*#[:\s]*([A-Z0-9-]+)",
+ r"Confirmation\s*#[:\s]*([A-Z0-9-]+)",
+ r"Target\s+Order\s*#[:\s]*([A-Z0-9-]+)",
+ ]
+ for pattern in patterns:
+ match = re.search(pattern, stripped, re.IGNORECASE)
+ if match:
+ return match.group(1)
+ return None
+
+
+def _extract_date(body: str) -> str:
+ """Extract purchase date from email body. Returns ISO date string or empty string."""
+ patterns = [
+ r"(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})",
+ r"([A-Z][a-z]{2}\s+\d{1,2},?\s+\d{4})",
+ ]
+ for pattern in patterns:
+ match = re.search(pattern, body)
+ if match:
+ raw = match.group(1)
+ try:
+ dt = datetime.strptime(raw.replace(",", ""), "%b %d %Y")
+ return dt.strftime("%Y-%m-%d")
+ except ValueError:
+ pass
+ try:
+ for fmt in ("%m/%d/%Y", "%m/%d/%y", "%d/%m/%Y", "%d/%m/%y"):
+ try:
+ dt = datetime.strptime(raw, fmt)
+ return dt.strftime("%Y-%m-%d")
+ except ValueError:
+ continue
+ except Exception:
+ pass
+ return ""
+
+
+def _extract_items_soup(body: str) -> list[dict]:
+ """Extract line items from HTML email body using BeautifulSoup."""
+ items = []
+ try:
+ soup = BeautifulSoup(body, "html.parser")
+ text = soup.get_text(separator="\n", strip=True)
+ for line in text.split("\n"):
+ line = line.strip()
+ if not line or line.startswith(
+ (
+ "Subtotal",
+ "Tax",
+ "Total",
+ "Target",
+ "Kroger",
+ "Date",
+ "Receipt",
+ "Order",
+ "Transaction",
+ "Confirmation",
+ "Thank",
+ "Questions",
+ "Keep",
+ "Receipt",
+ "Store",
+ )
+ ):
+ continue
+ # Match lines like "Product Name $9.99"
+ match = re.match(r"(.+?)\s+\$([0-9]+\.[0-9]{2})\s*$", line)
+ if match:
+ name = match.group(1).strip()
+ price = _to_decimal(match.group(2))
+ if len(name) > 2 and price > 0:
+ items.append(
+ {
+ "product_name_raw": name,
+ "quantity": Decimal("1"),
+ "unit_price": price,
+ "extended_price": price,
+ }
+ )
+ except Exception:
+ pass
+ return items[:20]
+
+
+class TargetEmailParser(BaseEmailParser):
+ """Parse Target email receipts (Circle order confirmations)."""
+
+ TARGET_KEYWORDS = ("target.com", "targetnow", "circle", "target")
+
+ def can_parse(self, email: EmailReceipt) -> bool:
+ sender = (email.sender or "").lower()
+ body = (email.body_html or email.body_plain or "").lower()
+ return any(kw in sender or kw in body for kw in self.TARGET_KEYWORDS)
+
+ def parse(self, email: EmailReceipt) -> dict:
+ body = (email.body_html or email.body_plain or "").strip()
+ total = _extract_total(body)
+ receipt_id = _extract_receipt_id(body) or ""
+ purchase_date = _extract_date(body)
+ items = _extract_items_soup(body)
+
+ return {
+ "receipt_id": receipt_id,
+ "purchase_date": purchase_date,
+ "total": total,
+ "items": items,
+ }
diff --git a/receiptwitness/src/receiptwitness/queue/__init__.py b/receiptwitness/src/receiptwitness/queue/__init__.py
new file mode 100644
index 0000000..3f9a31f
--- /dev/null
+++ b/receiptwitness/src/receiptwitness/queue/__init__.py
@@ -0,0 +1 @@
+"""DragonflyDB Streams queue for email receipt processing."""
diff --git a/receiptwitness/src/receiptwitness/queue/email.py b/receiptwitness/src/receiptwitness/queue/email.py
new file mode 100644
index 0000000..c76148e
--- /dev/null
+++ b/receiptwitness/src/receiptwitness/queue/email.py
@@ -0,0 +1,77 @@
+"""DragonflyDB Streams queue for email receipt processing."""
+
+from __future__ import annotations
+
+import json
+import logging
+from dataclasses import asdict, dataclass
+from typing import cast
+
+import redis.asyncio as aioredis
+
+from receiptwitness.config import settings
+
+logger = logging.getLogger(__name__)
+
+STREAM_KEY = "email:receipts"
+CONSUMER_GROUP = "email-workers"
+
+
+@dataclass
+class EmailJob:
+ """Payload for an email receipt processing job."""
+
+ user_id: str
+ sender: str
+ recipient: str
+ subject: str
+ body_html: str | None
+ body_plain: str | None
+ received_at: str
+ message_id: str # from email provider, for dedup
+
+
+async def get_redis() -> aioredis.Redis:
+ """Get async Redis/DragonflyDB client."""
+ return cast(aioredis.Redis, aioredis.from_url(settings.redis_url, decode_responses=True))
+
+
+async def ensure_consumer_group(client: aioredis.Redis) -> None:
+ """Create consumer group if it does not exist."""
+ try:
+ await client.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id="0", mkstream=True)
+ except aioredis.ResponseError as e:
+ if "BUSYGROUP" not in str(e):
+ raise
+
+
+async def enqueue_email(client: aioredis.Redis, job: EmailJob) -> str:
+ """Add email job to the stream. Returns the stream message ID."""
+ payload: dict[str, str | bytes | int | float] = {"data": json.dumps(asdict(job))}
+ msg_id: str = cast(str, await client.xadd(STREAM_KEY, payload)) # type: ignore[arg-type] # redis-py StreamCommands.xadd expects broader FieldT union; runtime behavior is correct
+ logger.info("Enqueued email job %s for user %s", msg_id, job.user_id)
+ return msg_id
+
+
+async def consume_emails(
+ client: aioredis.Redis,
+ consumer_name: str,
+ count: int = 1,
+ block_ms: int = 5000,
+) -> list[tuple[str, EmailJob]]:
+ """Read pending messages from the stream. Returns list of (msg_id, EmailJob)."""
+ await ensure_consumer_group(client)
+ messages = await client.xreadgroup(
+ CONSUMER_GROUP, consumer_name, {STREAM_KEY: ">"}, count=count, block=block_ms
+ )
+ results = []
+ for _stream, entries in messages:
+ for msg_id, fields in entries:
+ job = EmailJob(**json.loads(fields["data"]))
+ results.append((msg_id, job))
+ return results
+
+
+async def ack_email(client: aioredis.Redis, msg_id: str) -> None:
+ """Acknowledge a processed message."""
+ await client.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
diff --git a/receiptwitness/src/receiptwitness/worker/__init__.py b/receiptwitness/src/receiptwitness/worker/__init__.py
new file mode 100644
index 0000000..e32899a
--- /dev/null
+++ b/receiptwitness/src/receiptwitness/worker/__init__.py
@@ -0,0 +1 @@
+"""Async email receipt worker consuming from DragonflyDB Streams."""
diff --git a/receiptwitness/src/receiptwitness/worker/email_worker.py b/receiptwitness/src/receiptwitness/worker/email_worker.py
new file mode 100644
index 0000000..52a5dc0
--- /dev/null
+++ b/receiptwitness/src/receiptwitness/worker/email_worker.py
@@ -0,0 +1,104 @@
+"""Async worker that consumes email receipt jobs from DragonflyDB Streams."""
+
+import asyncio
+import logging
+
+from cartsnitch_common.database import get_async_session_factory
+from cartsnitch_common.models.user import User
+from sqlalchemy import select
+
+from receiptwitness.config import settings
+from receiptwitness.events import publish_receipt_ingested
+from receiptwitness.parsers.email.base import BaseEmailParser, EmailReceipt
+from receiptwitness.parsers.email.detector import detect_retailer
+from receiptwitness.parsers.email.kroger import KrogerEmailParser
+from receiptwitness.parsers.email.meijer import MeijerEmailParser
+from receiptwitness.parsers.email.target import TargetEmailParser
+from receiptwitness.queue.email import ack_email, consume_emails, get_redis
+
+logger = logging.getLogger(__name__)
+
+CONSUMER_NAME = "worker-1"
+
+# Registry of available email parsers
+PARSERS: dict[str, BaseEmailParser] = {
+ "meijer": MeijerEmailParser(),
+ "kroger": KrogerEmailParser(),
+ "target": TargetEmailParser(),
+}
+
+
+async def resolve_user(token: str) -> str | None:
+ """Look up user_id from email_inbound_token."""
+ session_factory = get_async_session_factory(settings.database_url)
+ async with session_factory() as session:
+ result = await session.execute(select(User.id).where(User.email_inbound_token == token))
+ row = result.scalar_one_or_none()
+ return str(row) if row else None
+
+
+async def process_job(msg_id: str, job) -> bool:
+ """Process a single email job. Returns True on success."""
+ # 1. Resolve user from token
+ user_id = await resolve_user(job.user_id) # user_id field holds token
+ if not user_id:
+ logger.warning("Unknown token %s, dropping message %s", job.user_id, msg_id)
+ return True # ack to avoid infinite retry
+
+ # 2. Build EmailReceipt
+ email = EmailReceipt(
+ sender=job.sender,
+ recipient=job.recipient,
+ subject=job.subject,
+ body_html=job.body_html,
+ body_plain=job.body_plain,
+ received_at=job.received_at,
+ )
+
+ # 3. Detect retailer
+ retailer = detect_retailer(email)
+ if not retailer or retailer not in PARSERS:
+ logger.warning(
+ "Unrecognized retailer from %s, archiving msg %s",
+ job.sender,
+ msg_id,
+ )
+ return True # ack — no parser available
+
+ # 4. Parse
+ parser = PARSERS[retailer]
+ parsed = parser.parse(email)
+
+ # 5. Publish event
+ await publish_receipt_ingested(
+ user_id=user_id,
+ store_slug=retailer,
+ purchase_id=parsed.get("receipt_id", msg_id),
+ purchase_date=parsed.get("purchase_date", ""),
+ item_count=len(parsed.get("items", [])),
+ total=parsed.get("total", 0),
+ )
+ return True
+
+
+async def run_worker() -> None:
+ """Main worker loop — consume and process email jobs."""
+ client = await get_redis()
+ logger.info("Email worker started, consuming from email:receipts")
+ while True:
+ try:
+ jobs = await consume_emails(client, CONSUMER_NAME, count=5, block_ms=5000)
+ for msg_id, job in jobs:
+ try:
+ success = await process_job(msg_id, job)
+ if success:
+ await ack_email(client, msg_id)
+ except Exception:
+ logger.exception("Failed to process email job %s", msg_id)
+ except Exception:
+ logger.exception("Worker loop error, retrying in 5s")
+ await asyncio.sleep(5)
+
+
+if __name__ == "__main__":
+ asyncio.run(run_worker())
diff --git a/receiptwitness/tests/fixtures/kroger_email_receipt.html b/receiptwitness/tests/fixtures/kroger_email_receipt.html
new file mode 100644
index 0000000..9cb33f8
--- /dev/null
+++ b/receiptwitness/tests/fixtures/kroger_email_receipt.html
@@ -0,0 +1,45 @@
+
+
+
+
+ Kroger Digital Receipt
+
+
+
+

+
Your Digital Receipt
+
Kroger Plus Member
+
+
+
+
Kroger #882 - Downtown
+
123 Main Street
Anytown, OH 45202
+
Date: 03/15/2026
+
Receipt #: KR-2026-0315-4829
+
Transaction #: TXN-789123456
+
+
+
+
Items Purchased
+
Whole Milk 1 Gallon $3.99
+
Sourdough Bread $4.49
+
Free Range Eggs 12ct $5.99
+
Baby Spinach 5oz $4.29
+
+
+
+
Subtotal: $18.76
+
Tax: $1.24
+
Total: $20.00
+
+
+
+
Kroger Plus Savings: $3.25 saved on this order.
+
+
+
+
Thank you for shopping at Kroger!
+
Keep your receipt for returns within 90 days.
+
+
+
\ No newline at end of file
diff --git a/receiptwitness/tests/fixtures/meijer_email_receipt.html b/receiptwitness/tests/fixtures/meijer_email_receipt.html
new file mode 100644
index 0000000..f61deb3
--- /dev/null
+++ b/receiptwitness/tests/fixtures/meijer_email_receipt.html
@@ -0,0 +1,127 @@
+
+
+
+
+
+ Meijer Digital Receipt
+
+
+
+
+
+
+
+
Meijer Store #42
+
1555 Lake Drive SE, Grand Rapids, MI 49506
+
+
+
+
+
+
+
+ | Item |
+ Qty |
+ Price |
+
+
+
+
+ | ORGANIC BANANAS |
+ 1 |
+ $0.69 |
+
+
+ | WHOLE MILK 1 GAL |
+ 1 |
+ $4.29 |
+
+
+ | MEIJER WHOLE GRAIN OAT CEREAL 18OZ |
+ 1 |
+ $4.99 |
+
+
+ | FRESH BROCCOLI CROWN |
+ 1 |
+ $2.49 |
+
+
+ | GROUND BEEF 85/15 1LB |
+ 1 |
+ $6.99 |
+
+
+ | SOURDOUGH BREAD |
+ 1 |
+ $3.99 |
+
+
+ | MEIJER BABY SPINACH 5OZ |
+ 1 |
+ $4.49 |
+
+
+ | LARGE EGGS DOZEN |
+ 1 |
+ $3.29 |
+
+
+
+
+
+
+ Subtotal
+ $31.22
+
+
+ Tax
+ $2.19
+
+
+ Total Savings
+ -$3.40
+
+
+ Total
+ $33.41
+
+
+
+
+
+
+
diff --git a/receiptwitness/tests/fixtures/target_email_receipt.html b/receiptwitness/tests/fixtures/target_email_receipt.html
new file mode 100644
index 0000000..70f0720
--- /dev/null
+++ b/receiptwitness/tests/fixtures/target_email_receipt.html
@@ -0,0 +1,44 @@
+
+
+
+
+ Target Order Confirmation
+
+
+
+

+
Order Confirmation
+
Thanks for shopping Target Circle!
+
+
+
+
Target Store #1247 - Riverside
+
4500 River Road
Columbus, OH 43220
+
Date: 03/18/2026
+
Order #: TGT-2026-0318-9124
+
Confirmation #: CNF-44772819
+
+
+
+
Items Purchased
+
Good & Gather Whole Milk 1 Gal $3.89
+
Arborio Rice 2lb bag $6.49
+
Parmesan Wedge 8oz $7.99
+
+
+
+
Subtotal: $18.37
+
Tax: $1.45
+
Total: $19.82
+
+
+
+
Target Circle offer saved you $0.30 on this order.
+
+
+
+
Questions? Call Target Guest Services at 1-800-591-3869.
+
Receipt valid for returns within 30 days.
+
+
+
\ No newline at end of file
diff --git a/receiptwitness/tests/test_api/__init__.py b/receiptwitness/tests/test_api/__init__.py
new file mode 100644
index 0000000..598c2e0
--- /dev/null
+++ b/receiptwitness/tests/test_api/__init__.py
@@ -0,0 +1 @@
+"""Tests for the ReceiptWitness API routes."""
diff --git a/receiptwitness/tests/test_api/test_webhook.py b/receiptwitness/tests/test_api/test_webhook.py
new file mode 100644
index 0000000..2b208de
--- /dev/null
+++ b/receiptwitness/tests/test_api/test_webhook.py
@@ -0,0 +1,101 @@
+"""Tests for the /inbound/email webhook endpoint."""
+
+import hashlib
+import hmac
+import time
+from unittest.mock import AsyncMock, patch
+
+import pytest
+from fastapi.testclient import TestClient
+
+from receiptwitness.main import app
+
+
+@pytest.fixture
+def client():
+ return TestClient(app)
+
+
+@pytest.fixture
+def mock_redis():
+ redis_mock = AsyncMock()
+ with patch("receiptwitness.api.routes.get_redis", return_value=redis_mock):
+ enqueue_patcher = patch("receiptwitness.api.routes.enqueue_email", new_callable=AsyncMock)
+ with enqueue_patcher as mock_enqueue:
+ yield {"redis": redis_mock, "enqueue": mock_enqueue}
+
+
+def make_signature(signing_key: str, token: str, timestamp: str) -> str:
+ return hmac.new(
+ signing_key.encode(),
+ f"{timestamp}{token}".encode(),
+ hashlib.sha256,
+ ).hexdigest()
+
+
+def valid_form(signing_key: str = "test-secret"):
+ ts = str(int(time.time()))
+ token = "test-token"
+ sig = make_signature(signing_key, token, ts)
+ return {
+ "token": token,
+ "timestamp": ts,
+ "signature": sig,
+ "sender": "sender@example.com",
+ "recipient": "receipts+user123@example.com",
+ "subject": "Your Meijer Receipt",
+ "body-html": "Thank you for shopping at Meijer
",
+ "body-plain": "Thank you for shopping at Meijer",
+ "Message-Id": "",
+ }
+
+
+def test_valid_webhook(client, mock_redis):
+ with patch("receiptwitness.api.routes.settings") as mock_settings:
+ mock_settings.mailgun_webhook_signing_key = "test-secret"
+ response = client.post("/inbound/email", data=valid_form())
+ assert response.status_code == 200
+ assert response.json() == {"status": "queued"}
+ mock_redis["enqueue"].assert_awaited_once()
+
+
+def test_invalid_signature(client, mock_redis):
+ with patch("receiptwitness.api.routes.settings") as mock_settings:
+ mock_settings.mailgun_webhook_signing_key = "test-secret"
+ form = valid_form()
+ form["signature"] = "wrong-signature"
+ response = client.post("/inbound/email", data=form)
+ assert response.status_code == 406
+ assert response.json()["detail"] == "Invalid signature"
+ mock_redis["enqueue"].assert_not_awaited()
+
+
+def test_invalid_recipient_no_plus(client, mock_redis):
+ with patch("receiptwitness.api.routes.settings") as mock_settings:
+ mock_settings.mailgun_webhook_signing_key = "test-secret"
+ form = valid_form()
+ form["recipient"] = "receipts@example.com" # no plus-address
+ response = client.post("/inbound/email", data=form)
+ assert response.status_code == 406
+ assert response.json()["detail"] == "Invalid recipient"
+ mock_redis["enqueue"].assert_not_awaited()
+
+
+def test_stale_timestamp(client, mock_redis):
+ with patch("receiptwitness.api.routes.settings") as mock_settings:
+ mock_settings.mailgun_webhook_signing_key = "test-secret"
+ ts = str(int(time.time()) - 600) # 10 min old
+ token = "test-token"
+ sig = make_signature("test-secret", token, ts)
+ form = {
+ "token": token,
+ "timestamp": ts,
+ "signature": sig,
+ "sender": "sender@example.com",
+ "recipient": "receipts+user123@example.com",
+ "subject": "Receipt",
+ }
+ response = client.post("/inbound/email", data=form)
+ assert response.status_code == 406
+ assert response.json()["detail"] == "Invalid signature"
+ mock_redis["enqueue"].assert_not_awaited()
diff --git a/receiptwitness/tests/test_notifications/__init__.py b/receiptwitness/tests/test_notifications/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/receiptwitness/tests/test_notifications/test_email.py b/receiptwitness/tests/test_notifications/test_email.py
new file mode 100644
index 0000000..e8970e9
--- /dev/null
+++ b/receiptwitness/tests/test_notifications/test_email.py
@@ -0,0 +1,84 @@
+"""Tests for email notifications."""
+
+from unittest.mock import patch
+
+import pytest
+
+
+class TestSendReceiptNotification:
+ @pytest.fixture
+ def mock_resend(self):
+ with patch("receiptwitness.notifications.email.resend") as mock:
+ yield mock
+
+ @pytest.mark.asyncio
+ async def test_sends_email_with_correct_params(self, mock_resend):
+ from receiptwitness.notifications.email import send_receipt_notification
+
+ with (
+ patch("receiptwitness.notifications.email.settings") as mock_settings,
+ patch(
+ "receiptwitness.notifications.email.asyncio.to_thread",
+ new=lambda fn, *args, **kwargs: fn(*args, **kwargs),
+ ),
+ ):
+ mock_settings.notifications_enabled = True
+ mock_settings.resend_api_key = "re_testkey_123"
+ mock_settings.notification_email_from = "noreply@test.com"
+
+ await send_receipt_notification(
+ user_email="user@example.com",
+ store_name="Meijer",
+ item_count=5,
+ total=42.99,
+ purchase_date="2026-03-28",
+ )
+
+ mock_resend.Emails.send.assert_called_once_with(
+ {
+ "from": "noreply@test.com",
+ "to": ["user@example.com"],
+ "subject": "Receipt processed: Meijer - $42.99",
+ "html": (
+ "Your receipt from Meijer on "
+ "2026-03-28 has been processed.
"
+ "5 items, total: $42.99
"
+ ),
+ }
+ )
+
+ @pytest.mark.asyncio
+ async def test_skips_when_disabled(self, mock_resend):
+ from receiptwitness.notifications.email import send_receipt_notification
+
+ with patch("receiptwitness.notifications.email.settings") as mock_settings:
+ mock_settings.notifications_enabled = False
+ mock_settings.resend_api_key = "re_testkey_123"
+
+ await send_receipt_notification(
+ user_email="user@example.com",
+ store_name="Meijer",
+ item_count=5,
+ total=42.99,
+ purchase_date="2026-03-28",
+ )
+
+ mock_resend.Emails.send.assert_not_called()
+
+ @pytest.mark.asyncio
+ async def test_skips_when_api_key_empty(self, mock_resend):
+ from receiptwitness.notifications.email import send_receipt_notification
+
+ with patch("receiptwitness.notifications.email.settings") as mock_settings:
+ mock_settings.notifications_enabled = True
+ mock_settings.resend_api_key = ""
+
+ await send_receipt_notification(
+ user_email="user@example.com",
+ store_name="Meijer",
+ item_count=5,
+ total=42.99,
+ purchase_date="2026-03-28",
+ )
+
+ mock_resend.Emails.send.assert_not_called()
diff --git a/receiptwitness/tests/test_parsers/test_email/__init__.py b/receiptwitness/tests/test_parsers/test_email/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/receiptwitness/tests/test_parsers/test_email/test_detector.py b/receiptwitness/tests/test_parsers/test_email/test_detector.py
new file mode 100644
index 0000000..87a5aac
--- /dev/null
+++ b/receiptwitness/tests/test_parsers/test_email/test_detector.py
@@ -0,0 +1,49 @@
+"""Tests for retailer detector."""
+
+from receiptwitness.parsers.email.base import EmailReceipt
+from receiptwitness.parsers.email.detector import detect_retailer
+
+
+def test_detect_meijer():
+ email = EmailReceipt(
+ sender="receipts@meijer.com",
+ recipient="user@example.com",
+ subject="Your Receipt",
+ )
+ assert detect_retailer(email) == "meijer"
+
+
+def test_detect_kroger():
+ email = EmailReceipt(
+ sender="noreply@email.kroger.com",
+ recipient="user@example.com",
+ subject="Your Receipt",
+ )
+ assert detect_retailer(email) == "kroger"
+
+
+def test_detect_target():
+ email = EmailReceipt(
+ sender="Target ",
+ recipient="user@example.com",
+ subject="Your Receipt",
+ )
+ assert detect_retailer(email) == "target"
+
+
+def test_detect_unknown():
+ email = EmailReceipt(
+ sender="noreply@walmart.com",
+ recipient="user@example.com",
+ subject="Your Receipt",
+ )
+ assert detect_retailer(email) is None
+
+
+def test_detect_case_insensitive():
+ email = EmailReceipt(
+ sender="Receipts@MEIJER.COM",
+ recipient="user@example.com",
+ subject="Your Receipt",
+ )
+ assert detect_retailer(email) == "meijer"
diff --git a/receiptwitness/tests/test_parsers/test_email/test_kroger_email_parser.py b/receiptwitness/tests/test_parsers/test_email/test_kroger_email_parser.py
new file mode 100644
index 0000000..ab30257
--- /dev/null
+++ b/receiptwitness/tests/test_parsers/test_email/test_kroger_email_parser.py
@@ -0,0 +1,93 @@
+"""Tests for KrogerEmailParser."""
+
+from pathlib import Path
+
+from receiptwitness.parsers.email.base import EmailReceipt
+from receiptwitness.parsers.email.kroger import KrogerEmailParser
+
+FIXTURE_PATH = Path(__file__).parent.parent.parent / "fixtures" / "kroger_email_receipt.html"
+
+
+class TestKrogerEmailParser:
+ """Tests for KrogerEmailParser."""
+
+ def setup_method(self) -> None:
+ self.parser = KrogerEmailParser()
+ self.fixture_html = FIXTURE_PATH.read_text()
+
+ def test_can_parse_kroger_sender(self) -> None:
+ email = EmailReceipt(
+ sender="noreply@email.kroger.com",
+ recipient="user@example.com",
+ subject="Your Kroger Receipt",
+ body_html=self.fixture_html,
+ )
+ assert self.parser.can_parse(email) is True
+
+ def test_can_parse_kroger_in_body(self) -> None:
+ email = EmailReceipt(
+ sender="someone@unknown.com",
+ recipient="user@example.com",
+ subject="Your Receipt",
+ body_html="Kroger digital receipt",
+ )
+ assert self.parser.can_parse(email) is True
+
+ def test_cannot_parse_unrelated(self) -> None:
+ email = EmailReceipt(
+ sender="noreply@walmart.com",
+ recipient="user@example.com",
+ subject="Your Receipt",
+ body_html="Walmart receipt",
+ )
+ assert self.parser.can_parse(email) is False
+
+ def test_parse_items(self) -> None:
+ email = EmailReceipt(
+ sender="noreply@kroger.com",
+ recipient="user@example.com",
+ subject="Your Kroger Receipt",
+ body_html=self.fixture_html,
+ )
+ result = self.parser.parse(email)
+ items = result.get("items", [])
+ assert len(items) >= 3
+ product_names = [item["product_name_raw"] for item in items]
+ assert any("Whole Milk" in name for name in product_names)
+ assert any("Sourdough" in name for name in product_names)
+ for item in items:
+ assert "unit_price" in item
+ assert "extended_price" in item
+
+ def test_parse_totals(self) -> None:
+ email = EmailReceipt(
+ sender="noreply@kroger.com",
+ recipient="user@example.com",
+ subject="Your Kroger Receipt",
+ body_html=self.fixture_html,
+ )
+ result = self.parser.parse(email)
+ total = result.get("total", 0)
+ assert total > 0
+
+ def test_parse_receipt_id(self) -> None:
+ email = EmailReceipt(
+ sender="noreply@kroger.com",
+ recipient="user@example.com",
+ subject="Your Kroger Receipt",
+ body_html=self.fixture_html,
+ )
+ result = self.parser.parse(email)
+ receipt_id = result.get("receipt_id", "")
+ assert "KR-2026" in receipt_id or "TXN" in receipt_id
+
+ def test_parse_date(self) -> None:
+ email = EmailReceipt(
+ sender="noreply@kroger.com",
+ recipient="user@example.com",
+ subject="Your Kroger Receipt",
+ body_html=self.fixture_html,
+ )
+ result = self.parser.parse(email)
+ purchase_date = result.get("purchase_date", "")
+ assert purchase_date == "2026-03-15"
diff --git a/receiptwitness/tests/test_parsers/test_email/test_meijer_parser.py b/receiptwitness/tests/test_parsers/test_email/test_meijer_parser.py
new file mode 100644
index 0000000..3c33976
--- /dev/null
+++ b/receiptwitness/tests/test_parsers/test_email/test_meijer_parser.py
@@ -0,0 +1,182 @@
+"""Tests for the Meijer email receipt parser."""
+
+import os
+from decimal import Decimal
+
+import pytest
+
+from receiptwitness.parsers.email.base import EmailReceipt
+from receiptwitness.parsers.email.meijer import MeijerEmailParser
+
+FIXTURE_PATH = os.path.join(
+ os.path.dirname(__file__), "..", "..", "fixtures", "meijer_email_receipt.html"
+)
+
+
+def load_fixture() -> str:
+ with open(FIXTURE_PATH) as f:
+ return f.read()
+
+
+@pytest.fixture
+def meijer_email() -> EmailReceipt:
+ html = load_fixture()
+ return EmailReceipt(
+ sender="Meijer Receipts ",
+ recipient="shopper@example.com",
+ subject="Your Meijer Receipt — Transaction #TXN-2026-0315-0042",
+ body_html=html,
+ body_plain=None,
+ received_at="2026-03-15T14:34:00Z",
+ )
+
+
+@pytest.fixture
+def kroger_email() -> EmailReceipt:
+ return EmailReceipt(
+ sender="Kroger ",
+ recipient="shopper@example.com",
+ subject="Your Kroger Receipt",
+ body_html="Kroger receipt",
+ )
+
+
+class TestCanParse:
+ def test_can_parse_meijer(self, meijer_email: EmailReceipt):
+ parser = MeijerEmailParser()
+ assert parser.can_parse(meijer_email) is True
+
+ def test_cannot_parse_kroger(self, kroger_email: EmailReceipt):
+ parser = MeijerEmailParser()
+ assert parser.can_parse(kroger_email) is False
+
+ def test_can_parse_meijer_plain_sender(self):
+ email = EmailReceipt(
+ sender="receipts@meijer.com",
+ recipient="shopper@example.com",
+ subject="Receipt",
+ body_html="",
+ )
+ parser = MeijerEmailParser()
+ assert parser.can_parse(email) is True
+
+ def test_cannot_parse_non_meijer(self):
+ email = EmailReceipt(
+ sender=" Target ",
+ recipient="shopper@example.com",
+ subject="Target Receipt",
+ body_html="",
+ )
+ parser = MeijerEmailParser()
+ assert parser.can_parse(email) is False
+
+
+class TestParseMeijerReceipt:
+ def test_receipt_id_extracted(self, meijer_email: EmailReceipt):
+ parser = MeijerEmailParser()
+ result = parser.parse(meijer_email)
+ assert result["receipt_id"] == "TXN-2026-0315-0042"
+
+ def test_purchase_date_extracted(self, meijer_email: EmailReceipt):
+ parser = MeijerEmailParser()
+ result = parser.parse(meijer_email)
+ assert result["purchase_date"] == "2026-03-15"
+
+ def test_items_extracted(self, meijer_email: EmailReceipt):
+ parser = MeijerEmailParser()
+ result = parser.parse(meijer_email)
+ items = result["items"]
+ assert len(items) == 8
+
+ names = [item["product_name_raw"] for item in items]
+ assert "ORGANIC BANANAS" in names
+ assert "WHOLE MILK 1 GAL" in names
+ assert "GROUND BEEF 85/15 1LB" in names
+
+ def test_item_quantities(self, meijer_email: EmailReceipt):
+ parser = MeijerEmailParser()
+ result = parser.parse(meijer_email)
+ # Find ORGANIC BANANAS
+ bananas = next(i for i in result["items"] if "BANANAS" in i["product_name_raw"])
+ assert bananas["quantity"] == Decimal("1")
+
+ def test_item_prices(self, meijer_email: EmailReceipt):
+ parser = MeijerEmailParser()
+ result = parser.parse(meijer_email)
+ # Find ORGANIC BANANAS
+ bananas = next(i for i in result["items"] if "BANANAS" in i["product_name_raw"])
+ assert bananas["unit_price"] == Decimal("0.69")
+ assert bananas["extended_price"] == Decimal("0.69")
+
+ def test_totals(self, meijer_email: EmailReceipt):
+ parser = MeijerEmailParser()
+ result = parser.parse(meijer_email)
+ assert result["total"] == Decimal("33.41")
+ assert result["subtotal"] == Decimal("31.22")
+ assert result["tax"] == Decimal("2.19")
+ assert result["savings_total"] == Decimal("3.40")
+
+
+class TestParseHandlesMissingFields:
+ def test_missing_body_html_falls_back_to_plain(self):
+ email = EmailReceipt(
+ sender="receipts@email.meijer.com",
+ recipient="shopper@example.com",
+ subject="Your Meijer Receipt",
+ body_html=None,
+ body_plain="TXN-1234 | March 15, 2026 | Total: $10.00",
+ )
+ parser = MeijerEmailParser()
+ result = parser.parse(email)
+ # Should not raise, returns minimal result
+ assert result["receipt_id"] == ""
+ assert result["purchase_date"] == "2026-03-15"
+ assert result["total"] == Decimal("10.00")
+
+ def test_empty_email(self):
+ email = EmailReceipt(
+ sender="receipts@email.meijer.com",
+ recipient="shopper@example.com",
+ subject="Receipt",
+ body_html="",
+ body_plain="",
+ )
+ parser = MeijerEmailParser()
+ result = parser.parse(email)
+ assert result["receipt_id"] == ""
+ assert result["purchase_date"] == ""
+ assert result["total"] == Decimal("0")
+ assert result["items"] == []
+
+ def test_missing_subject_date_from_body(self):
+ html = """
+
+
+ Thank you for shopping on April 1, 2026
+ Total: $15.00
+
+
+ """
+ email = EmailReceipt(
+ sender="receipts@email.meijer.com",
+ recipient="shopper@example.com",
+ subject=None,
+ body_html=html,
+ )
+ parser = MeijerEmailParser()
+ result = parser.parse(email)
+ assert result["purchase_date"] == "2026-04-01"
+
+ def test_missing_totals_defaults_to_zero(self):
+ html = "Just an email with no totals
"
+ email = EmailReceipt(
+ sender="receipts@email.meijer.com",
+ recipient="shopper@example.com",
+ subject="Receipt",
+ body_html=html,
+ )
+ parser = MeijerEmailParser()
+ result = parser.parse(email)
+ assert result["total"] == Decimal("0")
+ assert result["subtotal"] is None
+ assert result["tax"] is None
diff --git a/receiptwitness/tests/test_parsers/test_email/test_target_email_parser.py b/receiptwitness/tests/test_parsers/test_email/test_target_email_parser.py
new file mode 100644
index 0000000..ffa33db
--- /dev/null
+++ b/receiptwitness/tests/test_parsers/test_email/test_target_email_parser.py
@@ -0,0 +1,93 @@
+"""Tests for TargetEmailParser."""
+
+from pathlib import Path
+
+from receiptwitness.parsers.email.base import EmailReceipt
+from receiptwitness.parsers.email.target import TargetEmailParser
+
+FIXTURE_PATH = Path(__file__).parent.parent.parent / "fixtures" / "target_email_receipt.html"
+
+
+class TestTargetEmailParser:
+ """Tests for TargetEmailParser."""
+
+ def setup_method(self) -> None:
+ self.parser = TargetEmailParser()
+ self.fixture_html = FIXTURE_PATH.read_text()
+
+ def test_can_parse_target_sender(self) -> None:
+ email = EmailReceipt(
+ sender="receipts@target.com",
+ recipient="user@example.com",
+ subject="Your Target Order Confirmation",
+ body_html=self.fixture_html,
+ )
+ assert self.parser.can_parse(email) is True
+
+ def test_can_parse_circle_in_body(self) -> None:
+ email = EmailReceipt(
+ sender="someone@unknown.com",
+ recipient="user@example.com",
+ subject="Your Receipt",
+ body_html="Target Circle savings offer",
+ )
+ assert self.parser.can_parse(email) is True
+
+ def test_cannot_parse_unrelated(self) -> None:
+ email = EmailReceipt(
+ sender="noreply@walmart.com",
+ recipient="user@example.com",
+ subject="Your Receipt",
+ body_html="Walmart receipt",
+ )
+ assert self.parser.can_parse(email) is False
+
+ def test_parse_items(self) -> None:
+ email = EmailReceipt(
+ sender="orders@target.com",
+ recipient="user@example.com",
+ subject="Your Target Order",
+ body_html=self.fixture_html,
+ )
+ result = self.parser.parse(email)
+ items = result.get("items", [])
+ assert len(items) >= 3
+ product_names = [item["product_name_raw"] for item in items]
+ assert any("Whole Milk" in name for name in product_names)
+ assert any("Arborio" in name for name in product_names)
+ for item in items:
+ assert "unit_price" in item
+ assert "extended_price" in item
+
+ def test_parse_totals(self) -> None:
+ email = EmailReceipt(
+ sender="orders@target.com",
+ recipient="user@example.com",
+ subject="Your Target Order",
+ body_html=self.fixture_html,
+ )
+ result = self.parser.parse(email)
+ total = result.get("total", 0)
+ assert total > 0
+
+ def test_parse_receipt_id(self) -> None:
+ email = EmailReceipt(
+ sender="orders@target.com",
+ recipient="user@example.com",
+ subject="Your Target Order",
+ body_html=self.fixture_html,
+ )
+ result = self.parser.parse(email)
+ receipt_id = result.get("receipt_id", "")
+ assert "TGT-2026" in receipt_id or "CNF" in receipt_id
+
+ def test_parse_date(self) -> None:
+ email = EmailReceipt(
+ sender="orders@target.com",
+ recipient="user@example.com",
+ subject="Your Target Order",
+ body_html=self.fixture_html,
+ )
+ result = self.parser.parse(email)
+ purchase_date = result.get("purchase_date", "")
+ assert purchase_date == "2026-03-18"
diff --git a/receiptwitness/tests/test_queue/__init__.py b/receiptwitness/tests/test_queue/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/receiptwitness/tests/test_queue/test_email_queue.py b/receiptwitness/tests/test_queue/test_email_queue.py
new file mode 100644
index 0000000..05ffb51
--- /dev/null
+++ b/receiptwitness/tests/test_queue/test_email_queue.py
@@ -0,0 +1,79 @@
+"""Tests for email queue using DragonflyDB Streams."""
+
+import pytest
+from fakeredis import aioredis as fake_aioredis
+
+from receiptwitness.queue.email import (
+ CONSUMER_GROUP,
+ STREAM_KEY,
+ EmailJob,
+ ack_email,
+ consume_emails,
+ enqueue_email,
+ ensure_consumer_group,
+)
+
+
+@pytest.fixture
+async def fake_client():
+ """Yield a fake async Redis client."""
+ client = fake_aioredis.FakeRedis(decode_responses=True)
+ yield client
+ await client.aclose()
+
+
+@pytest.fixture
+def sample_job():
+ """Sample EmailJob for testing."""
+ return EmailJob(
+ user_id="user-123",
+ sender="no-reply@kroger.com",
+ recipient="user@example.com",
+ subject="Kroger Receipt",
+ body_html="Receipt",
+ body_plain="Receipt",
+ received_at="2026-04-01T12:00:00Z",
+ message_id="msg-abc-123",
+ )
+
+
+@pytest.mark.asyncio
+async def test_enqueue_and_consume(fake_client, sample_job):
+ """Enqueue a job, consume it, verify fields match."""
+ msg_id = await enqueue_email(fake_client, sample_job)
+ assert msg_id is not None
+
+ consumed = await consume_emails(fake_client, "test-worker", count=1, block_ms=100)
+ assert len(consumed) == 1
+ consumed_id, consumed_job = consumed[0]
+ assert consumed_id == msg_id
+ assert consumed_job.user_id == sample_job.user_id
+ assert consumed_job.sender == sample_job.sender
+ assert consumed_job.recipient == sample_job.recipient
+ assert consumed_job.subject == sample_job.subject
+ assert consumed_job.message_id == sample_job.message_id
+
+
+@pytest.mark.asyncio
+async def test_ack_removes_from_pending(fake_client, sample_job):
+ """After ack, message is no longer pending."""
+ msg_id = await enqueue_email(fake_client, sample_job)
+
+ # Consume the message (moves it to pending)
+ consumed = await consume_emails(fake_client, "test-worker", count=1, block_ms=100)
+ assert len(consumed) == 1
+
+ # Acknowledge it
+ await ack_email(fake_client, msg_id)
+
+ # Check pending count for this consumer group
+ pending = await fake_client.xpending(STREAM_KEY, CONSUMER_GROUP)
+ assert pending is None or pending["pending"] == 0
+
+
+@pytest.mark.asyncio
+async def test_ensure_consumer_group_idempotent(fake_client):
+ """Calling ensure_consumer_group twice does not error."""
+ await ensure_consumer_group(fake_client)
+ # Calling again should not raise
+ await ensure_consumer_group(fake_client)
diff --git a/receiptwitness/tests/test_worker/__init__.py b/receiptwitness/tests/test_worker/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/receiptwitness/tests/test_worker/test_email_worker.py b/receiptwitness/tests/test_worker/test_email_worker.py
new file mode 100644
index 0000000..bc05724
--- /dev/null
+++ b/receiptwitness/tests/test_worker/test_email_worker.py
@@ -0,0 +1,188 @@
+"""Tests for email_worker."""
+
+from decimal import Decimal
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+from fakeredis import aioredis as fake_aioredis
+
+from receiptwitness.parsers.email.base import EmailReceipt
+from receiptwitness.queue.email import (
+ EmailJob,
+)
+from receiptwitness.worker.email_worker import (
+ process_job,
+ resolve_user,
+)
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+
+@pytest.fixture
+async def fake_redis():
+ """Fake async Redis client for queue testing."""
+ client = fake_aioredis.FakeRedis(decode_responses=True)
+ yield client
+ await client.aclose()
+
+
+@pytest.fixture
+def sample_email_job():
+ """Sample EmailJob matching DragonflyDB queue schema."""
+ return EmailJob(
+ user_id="token-abc-123",
+ sender="no-reply@meijer.com",
+ recipient="user@example.com",
+ subject="Your Meijer Receipt",
+ body_html="Total: $42.00",
+ body_plain="Total: $42.00",
+ received_at="2026-04-01T12:00:00Z",
+ message_id="msg-xyz-789",
+ )
+
+
+@pytest.fixture
+def sample_email():
+ """Sample EmailReceipt for parser testing."""
+ return EmailReceipt(
+ sender="no-reply@meijer.com",
+ recipient="user@example.com",
+ subject="Your Meijer Receipt",
+ body_html="Total: $42.00
Receipt #12345",
+ body_plain="Total: $42.00",
+ received_at="2026-04-01T12:00:00Z",
+ )
+
+
+# ---------------------------------------------------------------------------
+# resolve_user tests
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_resolve_user_valid_token():
+ """Valid token returns user_id string."""
+ mock_session = AsyncMock()
+ mock_result = MagicMock()
+ mock_result.scalar_one_or_none.return_value = "user-uuid-42"
+ mock_session.execute.return_value = mock_result
+ mock_session.__aenter__ = AsyncMock(return_value=mock_session)
+ mock_session.__aexit__ = AsyncMock(return_value=None)
+
+ factory = MagicMock(return_value=mock_session)
+
+ with patch(
+ "receiptwitness.worker.email_worker.get_async_session_factory",
+ return_value=factory,
+ ):
+ user_id = await resolve_user("token-abc-123")
+
+ assert user_id == "user-uuid-42"
+ factory.assert_called_once()
+
+
+@pytest.mark.asyncio
+async def test_resolve_user_invalid_token():
+ """Invalid token returns None."""
+ mock_session = AsyncMock()
+ mock_result = MagicMock()
+ mock_result.scalar_one_or_none.return_value = None
+ mock_session.execute.return_value = mock_result
+ mock_session.__aenter__ = AsyncMock(return_value=mock_session)
+ mock_session.__aexit__ = AsyncMock(return_value=None)
+
+ factory = MagicMock(return_value=mock_session)
+
+ with patch(
+ "receiptwitness.worker.email_worker.get_async_session_factory",
+ return_value=factory,
+ ):
+ user_id = await resolve_user("bad-token")
+
+ assert user_id is None
+
+
+# ---------------------------------------------------------------------------
+# process_job tests
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_process_job_unknown_retailer(sample_email_job):
+ """Unknown retailer logs warning and returns True (ack, no retry)."""
+ unknown_job = EmailJob(
+ user_id="token-abc-123",
+ sender="no-reply@unknownretailer.com",
+ recipient="user@example.com",
+ subject="Receipt",
+ body_html="",
+ body_plain="",
+ received_at="2026-04-01T12:00:00Z",
+ message_id="msg-xyz-789",
+ )
+
+ with (
+ patch(
+ "receiptwitness.worker.email_worker.resolve_user",
+ return_value="user-uuid-42",
+ ),
+ patch(
+ "receiptwitness.worker.email_worker.publish_receipt_ingested",
+ new_callable=AsyncMock,
+ ) as mock_publish,
+ ):
+ result = await process_job("msg-id-1", unknown_job)
+
+ assert result is True
+ mock_publish.assert_not_called()
+
+
+@pytest.mark.asyncio
+async def test_process_job_success(sample_email_job, sample_email):
+ """Known retailer: full pipeline runs — parse, normalize, publish event."""
+ parsed_data = {
+ "receipt_id": "RCP-999",
+ "purchase_date": "2026-04-01",
+ "total": Decimal("42.00"),
+ "items": [
+ {
+ "product_name_raw": "ORGANIC BANANAS",
+ "quantity": Decimal("1"),
+ "unit_price": Decimal("0.69"),
+ "extended_price": Decimal("0.69"),
+ },
+ ],
+ }
+
+ mock_parser = MagicMock()
+ mock_parser.parse.return_value = parsed_data
+
+ with (
+ patch(
+ "receiptwitness.worker.email_worker.resolve_user",
+ return_value="user-uuid-42",
+ ),
+ patch.dict(
+ "receiptwitness.worker.email_worker.PARSERS",
+ {"meijer": mock_parser},
+ clear=False,
+ ),
+ patch(
+ "receiptwitness.worker.email_worker.publish_receipt_ingested",
+ new_callable=AsyncMock,
+ ) as mock_publish,
+ ):
+ result = await process_job("msg-id-1", sample_email_job)
+
+ assert result is True
+ mock_parser.parse.assert_called_once()
+ mock_publish.assert_called_once_with(
+ user_id="user-uuid-42",
+ store_slug="meijer",
+ purchase_id="RCP-999",
+ purchase_date="2026-04-01",
+ item_count=1,
+ total=Decimal("42.00"),
+ )