Squashed 'receiptwitness/' content from commit e8d374a

git-subtree-dir: receiptwitness
git-subtree-split: e8d374a89ed8978f429598e02d31b1c5963efe22
This commit is contained in:
Coupon Carl
2026-03-28 02:24:22 +00:00
commit 342906c9d1
53 changed files with 7300 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
"""ReceiptWitness — CartSnitch receipt ingestion service."""
+1
View File
@@ -0,0 +1 @@
"""Internal API for ReceiptWitness service."""
+10
View File
@@ -0,0 +1,10 @@
"""Internal API routes for triggering scrapes and checking status."""
from fastapi import APIRouter
router = APIRouter()
@router.get("/health")
async def health():
return {"status": "ok", "service": "receiptwitness"}
+26
View File
@@ -0,0 +1,26 @@
"""Service-specific configuration for ReceiptWitness."""
from pydantic_settings import BaseSettings
class ReceiptWitnessSettings(BaseSettings):
model_config = {"env_prefix": "RW_"}
# Inherited from cartsnitch-common
database_url: str = "postgresql+asyncpg://cartsnitch:cartsnitch@localhost:5432/cartsnitch"
redis_url: str = "redis://localhost:6379/0"
# Session encryption
session_encryption_key: str = ""
# Scraping defaults
scrape_interval_seconds: int = 86400 # 24 hours
min_request_delay_ms: int = 1000
max_request_delay_ms: int = 5000
# Playwright
headless: bool = True
browser_timeout_ms: int = 60000
settings = ReceiptWitnessSettings()
+75
View File
@@ -0,0 +1,75 @@
"""Publish receipt ingestion events to Redis/DragonflyDB pub/sub."""
import json
import logging
from datetime import UTC, datetime
from decimal import Decimal
import redis.asyncio as aioredis
from receiptwitness.config import settings
logger = logging.getLogger(__name__)
CHANNEL_RECEIPTS_INGESTED = "cartsnitch.receipts.ingested"
# Module-level connection pool — shared across all publish calls
_pool: aioredis.ConnectionPool | None = None
class _DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, Decimal):
return float(o)
return super().default(o)
def _get_pool() -> aioredis.ConnectionPool:
"""Get or create the shared Redis connection pool."""
global _pool
if _pool is None:
_pool = aioredis.ConnectionPool.from_url(
settings.redis_url, decode_responses=True, max_connections=10
)
return _pool
async def get_redis_client() -> aioredis.Redis:
"""Create an async Redis/DragonflyDB client with connection pooling."""
return aioredis.Redis(connection_pool=_get_pool())
async def publish_receipt_ingested(
user_id: str,
store_slug: str,
purchase_id: str,
purchase_date: str,
item_count: int,
total: Decimal | float,
) -> None:
"""Publish a cartsnitch.receipts.ingested event after successful ingestion."""
event = {
"event_type": CHANNEL_RECEIPTS_INGESTED,
"timestamp": datetime.now(UTC).isoformat(),
"service": "receiptwitness",
"payload": {
"user_id": user_id,
"store_slug": store_slug,
"purchase_id": purchase_id,
"purchase_date": purchase_date,
"item_count": item_count,
"total": float(total) if isinstance(total, Decimal) else total,
},
}
try:
client = await get_redis_client()
await client.publish(CHANNEL_RECEIPTS_INGESTED, json.dumps(event, cls=_DecimalEncoder))
logger.info(
"Published %s event for purchase %s",
CHANNEL_RECEIPTS_INGESTED,
purchase_id,
)
except aioredis.ConnectionError:
logger.error("Failed to publish event — Redis/DragonflyDB connection error")
raise
+8
View File
@@ -0,0 +1,8 @@
"""FastAPI app entrypoint for ReceiptWitness."""
from fastapi import FastAPI
from receiptwitness.api.routes import router
app = FastAPI(title="ReceiptWitness", version="0.1.0")
app.include_router(router)
+1
View File
@@ -0,0 +1 @@
"""Receipt parsers for each retailer."""
+148
View File
@@ -0,0 +1,148 @@
"""Kroger receipt parser.
Transforms raw Kroger receipt JSON into the common PurchaseCreate schema.
Kroger receipt data uses different field names than Meijer — this parser
handles Kroger-specific naming conventions and receipt structure.
"""
import logging
from decimal import Decimal, InvalidOperation
from receiptwitness.scrapers.base import RawReceipt
logger = logging.getLogger(__name__)
def _to_decimal(value, default: str = "0") -> Decimal:
"""Safely convert a value to Decimal."""
if value is None:
return Decimal(default)
try:
return Decimal(str(value))
except (InvalidOperation, ValueError, TypeError):
return Decimal(default)
def _parse_item(item: dict) -> dict:
"""Parse a single line item from a Kroger receipt.
Kroger items typically include fields like:
- description / itemDescription / productName
- upc / krogerProductId
- quantity / qty
- basePrice / unitPrice / price
- totalPrice / extendedAmount / lineTotal
- regularPrice / originalPrice
- salePrice / promoPrice
- couponAmount / couponSavings
- loyaltyDiscount / fuelPointsDiscount / plusCardSavings
- department / category / aisle
"""
description = (
item.get("description")
or item.get("itemDescription")
or item.get("productName")
or item.get("name")
or "UNKNOWN ITEM"
)
quantity = _to_decimal(item.get("quantity", item.get("qty", item.get("quantitySold", 1))), "1")
unit_price = _to_decimal(item.get("basePrice", item.get("unitPrice", item.get("price", 0))))
extended_price = _to_decimal(
item.get("totalPrice", item.get("extendedAmount", item.get("lineTotal")))
)
# Compute extended_price if not provided
if extended_price == Decimal("0") and unit_price != Decimal("0"):
extended_price = unit_price * quantity
regular_price = item.get("regularPrice", item.get("originalPrice"))
sale_price = item.get("salePrice", item.get("promoPrice"))
coupon_discount = item.get(
"couponAmount", item.get("couponSavings", item.get("couponDiscount"))
)
loyalty_discount = item.get(
"plusCardSavings",
item.get("loyaltyDiscount", item.get("fuelPointsDiscount")),
)
# UPC handling — Kroger may use krogerProductId or upc
upc = item.get("upc", item.get("UPC", item.get("krogerProductId")))
if upc:
upc = str(upc).strip().lstrip("0") or None
category = item.get("department", item.get("category", item.get("aisle")))
# Weight info for produce/deli items
weight = item.get("weight", item.get("netWeight"))
extra = {}
if weight is not None:
extra["weight"] = str(weight)
weight_uom = item.get("weightUom", item.get("unitOfMeasure"))
if weight_uom:
extra["weight_uom"] = weight_uom
result = {
"product_name_raw": description.strip(),
"upc": upc,
"quantity": quantity,
"unit_price": unit_price,
"extended_price": extended_price,
"regular_price": (_to_decimal(regular_price) if regular_price is not None else None),
"sale_price": (_to_decimal(sale_price) if sale_price is not None else None),
"coupon_discount": (_to_decimal(coupon_discount) if coupon_discount is not None else None),
"loyalty_discount": (
_to_decimal(loyalty_discount) if loyalty_discount is not None else None
),
"category_raw": category.strip() if category else None,
}
return result
def parse_kroger_receipt(raw: RawReceipt) -> dict:
"""Parse a RawReceipt from Kroger into a PurchaseCreate-compatible dict."""
data = raw.raw_data
detail = data.get("detail", {})
# Parse items — Kroger uses "items" or "lineItems" or "receiptItems"
raw_items = detail.get("items", detail.get("lineItems", detail.get("receiptItems", [])))
items = []
for raw_item in raw_items:
# Skip voided / returned items
if raw_item.get("voided") or raw_item.get("status") in (
"VOIDED",
"RETURNED",
):
logger.debug("Skipping voided/returned item: %s", raw_item.get("description"))
continue
if raw_item.get("returnFlag") or raw_item.get("isReturn"):
logger.debug("Skipping returned item: %s", raw_item.get("description"))
continue
items.append(_parse_item(raw_item))
# Parse totals — Kroger uses various field names
total = _to_decimal(
detail.get(
"total",
data.get("total", data.get("orderTotal", data.get("grandTotal", 0))),
)
)
subtotal = detail.get("subtotal", data.get("subtotal", data.get("subTotal")))
tax = detail.get("tax", data.get("tax", data.get("salesTax")))
savings = detail.get(
"totalSavings",
data.get("savings", data.get("totalDiscount", data.get("youSaved"))),
)
return {
"receipt_id": raw.receipt_id,
"purchase_date": raw.purchase_date,
"total": total,
"subtotal": _to_decimal(subtotal) if subtotal is not None else None,
"tax": _to_decimal(tax) if tax is not None else None,
"savings_total": _to_decimal(savings) if savings is not None else None,
"source_url": raw.source_url,
"raw_data": data,
"items": items,
}
+138
View File
@@ -0,0 +1,138 @@
"""Parse raw Meijer mPerks receipt data into PurchaseCreate-compatible dicts.
The mPerks receipt JSON structure (reverse-engineered from their SPA)
typically looks like:
Transaction listing:
{
"transactions": [
{
"transactionId": "12345",
"transactionDate": "2026-03-10T14:30:00Z",
"storeNumber": "123",
"total": 87.42,
"savings": 12.50
}
]
}
Receipt detail:
{
"receiptId": "12345",
"items": [
{
"description": "ORGANIC BANANAS",
"upc": "0000000004011",
"quantity": 1,
"price": 0.69,
"extendedPrice": 0.69,
"regularPrice": 0.79,
"salePrice": 0.69,
"couponDiscount": 0.0,
"mperksDiscount": 0.10,
"category": "PRODUCE"
}
],
"subtotal": 74.92,
"tax": 5.24,
"total": 87.42,
"totalSavings": 12.50
}
"""
import logging
from decimal import Decimal, InvalidOperation
from receiptwitness.scrapers.base import RawReceipt
logger = logging.getLogger(__name__)
def _to_decimal(value, default: str = "0") -> Decimal:
"""Safely convert a value to Decimal."""
if value is None:
return Decimal(default)
try:
return Decimal(str(value))
except (InvalidOperation, ValueError, TypeError):
return Decimal(default)
def _parse_item(item: dict) -> dict:
"""Parse a single line item from Meijer receipt detail."""
description = (
item.get("description") or item.get("itemDescription") or item.get("name") or "UNKNOWN ITEM"
)
quantity = _to_decimal(item.get("quantity", item.get("qty", 1)), "1")
unit_price = _to_decimal(item.get("price", item.get("unitPrice", 0)))
extended_price = _to_decimal(item.get("extendedPrice", item.get("totalPrice")))
# If extended_price wasn't provided, compute it
if extended_price == Decimal("0") and unit_price != Decimal("0"):
extended_price = unit_price * quantity
regular_price = item.get("regularPrice")
sale_price = item.get("salePrice")
coupon_discount = item.get("couponDiscount", item.get("couponSavings"))
loyalty_discount = item.get("mperksDiscount", item.get("loyaltyDiscount"))
upc = item.get("upc", item.get("UPC"))
if upc:
upc = str(upc).strip().lstrip("0") or None
category = item.get("category", item.get("departmentDescription"))
return {
"product_name_raw": description.strip(),
"upc": upc,
"quantity": quantity,
"unit_price": unit_price,
"extended_price": extended_price,
"regular_price": _to_decimal(regular_price) if regular_price is not None else None,
"sale_price": _to_decimal(sale_price) if sale_price is not None else None,
"coupon_discount": (_to_decimal(coupon_discount) if coupon_discount is not None else None),
"loyalty_discount": (
_to_decimal(loyalty_discount) if loyalty_discount is not None else None
),
"category_raw": category.strip() if category else None,
}
def parse_meijer_receipt(raw: RawReceipt) -> dict:
"""Parse a RawReceipt from Meijer into a PurchaseCreate-compatible dict.
Returns a dict with keys matching PurchaseCreate schema fields.
The caller is responsible for setting store_id and store_location_id
from the store registry.
"""
data = raw.raw_data
detail = data.get("detail", {})
# Parse items from the detail response
raw_items = detail.get("items", detail.get("lineItems", []))
items = []
for raw_item in raw_items:
# Skip voided items
if raw_item.get("voided") or raw_item.get("status") == "VOIDED":
logger.debug("Skipping voided item: %s", raw_item.get("description"))
continue
items.append(_parse_item(raw_item))
# Parse totals
total = _to_decimal(detail.get("total", data.get("total", data.get("transactionTotal", 0))))
subtotal = detail.get("subtotal", data.get("subtotal"))
tax = detail.get("tax", data.get("tax"))
savings = detail.get("totalSavings", data.get("savings", data.get("totalDiscount")))
return {
"receipt_id": raw.receipt_id,
"purchase_date": raw.purchase_date,
"total": total,
"subtotal": _to_decimal(subtotal) if subtotal is not None else None,
"tax": _to_decimal(tax) if tax is not None else None,
"savings_total": _to_decimal(savings) if savings is not None else None,
"source_url": raw.source_url,
"raw_data": data,
"items": items,
}
+191
View File
@@ -0,0 +1,191 @@
"""Target Circle receipt parser.
Transforms raw Target in-store receipt JSON into the common PurchaseCreate schema.
Target receipt data includes Circle pricing, BOGO deals, and Circle rewards
discounts that need special handling.
Target receipt detail structure (reverse-engineered from target.com SPA):
{
"orderId": "TGT-2026-0315-7890",
"items": [
{
"description": "GOOD & GATHER WHOLE MILK GAL",
"tcin": "14767459",
"upc": "0085239100123",
"quantity": 1,
"unitPrice": 3.89,
"totalPrice": 3.89,
"regularPrice": 4.19,
"circlePrice": 3.89,
"couponDiscount": 0.0,
"circleRewardsDiscount": 0.30,
"promoDescription": "Circle offer: Save 30c",
"department": "GROCERY"
}
],
"subtotal": 78.32,
"tax": 4.89,
"total": 83.21,
"totalSavings": 11.45
}
"""
import logging
from decimal import Decimal, InvalidOperation
from receiptwitness.scrapers.base import RawReceipt
logger = logging.getLogger(__name__)
def _to_decimal(value, default: str = "0") -> Decimal:
"""Safely convert a value to Decimal."""
if value is None:
return Decimal(default)
try:
return Decimal(str(value))
except (InvalidOperation, ValueError, TypeError):
return Decimal(default)
def _parse_item(item: dict) -> dict:
"""Parse a single line item from a Target receipt.
Target items may include fields like:
- description / itemDescription / productName
- tcin (Target internal product ID) / upc / dpci
- quantity / qty
- unitPrice / price
- totalPrice / extendedPrice / lineTotal
- regularPrice / originalPrice
- circlePrice / salePrice / promoPrice
- couponDiscount / couponSavings
- circleRewardsDiscount / circleDiscount / loyaltyDiscount
- promoDescription / offerDescription (e.g. "BOGO 50% off", "Circle offer")
- department / category
"""
description = (
item.get("description")
or item.get("itemDescription")
or item.get("productName")
or item.get("name")
or "UNKNOWN ITEM"
)
quantity = _to_decimal(item.get("quantity", item.get("qty", item.get("quantitySold", 1))), "1")
unit_price = _to_decimal(item.get("unitPrice", item.get("price", item.get("basePrice", 0))))
extended_price = _to_decimal(
item.get("totalPrice", item.get("extendedPrice", item.get("lineTotal")))
)
# Compute extended_price if not provided
if extended_price == Decimal("0") and unit_price != Decimal("0"):
extended_price = unit_price * quantity
regular_price = item.get("regularPrice", item.get("originalPrice"))
# Target Circle pricing — circlePrice takes precedence over generic salePrice
sale_price = item.get("circlePrice", item.get("salePrice", item.get("promoPrice")))
coupon_discount = item.get(
"couponDiscount", item.get("couponSavings", item.get("couponAmount"))
)
# Circle rewards / loyalty discount
loyalty_discount = item.get(
"circleRewardsDiscount",
item.get("circleDiscount", item.get("loyaltyDiscount")),
)
# UPC handling — Target may use tcin, upc, or dpci
upc = item.get("upc", item.get("UPC"))
if upc:
upc = str(upc).strip().lstrip("0") or None
# Target also has TCIN (Target.com Item Number) and DPCI (Department/Class/Item)
tcin = item.get("tcin", item.get("TCIN"))
dpci = item.get("dpci", item.get("DPCI"))
category = item.get("department", item.get("category"))
# Capture promo/deal description for BOGO and Circle offers
promo_description = item.get("promoDescription", item.get("offerDescription"))
# Weight info for produce/deli items
weight = item.get("weight", item.get("netWeight"))
extra: dict = {}
if weight is not None:
extra["weight"] = str(weight)
weight_uom = item.get("weightUom", item.get("unitOfMeasure"))
if weight_uom:
extra["weight_uom"] = weight_uom
if tcin:
extra["tcin"] = str(tcin)
if dpci:
extra["dpci"] = str(dpci)
if promo_description:
extra["promo_description"] = promo_description
result: dict = {
"product_name_raw": description.strip(),
"upc": upc,
"quantity": quantity,
"unit_price": unit_price,
"extended_price": extended_price,
"regular_price": _to_decimal(regular_price) if regular_price is not None else None,
"sale_price": _to_decimal(sale_price) if sale_price is not None else None,
"coupon_discount": (_to_decimal(coupon_discount) if coupon_discount is not None else None),
"loyalty_discount": (
_to_decimal(loyalty_discount) if loyalty_discount is not None else None
),
"category_raw": category.strip() if category else None,
}
return result
def parse_target_receipt(raw: RawReceipt) -> dict:
"""Parse a RawReceipt from Target into a PurchaseCreate-compatible dict."""
data = raw.raw_data
detail = data.get("detail", {})
# Parse items — Target uses "items" or "lineItems"
raw_items = detail.get("items", detail.get("lineItems", []))
items = []
for raw_item in raw_items:
# Skip voided / returned items
if raw_item.get("voided") or raw_item.get("status") in (
"VOIDED",
"RETURNED",
"CANCELLED",
):
logger.debug("Skipping voided/returned item: %s", raw_item.get("description"))
continue
if raw_item.get("returnFlag") or raw_item.get("isReturn"):
logger.debug("Skipping returned item: %s", raw_item.get("description"))
continue
items.append(_parse_item(raw_item))
# Parse totals
total = _to_decimal(
detail.get(
"total",
data.get("total", data.get("orderTotal", data.get("grandTotal", 0))),
)
)
subtotal = detail.get("subtotal", data.get("subtotal", data.get("subTotal")))
tax = detail.get("tax", data.get("tax", data.get("salesTax")))
savings = detail.get(
"totalSavings",
data.get("savings", data.get("totalDiscount", data.get("circleSavings"))),
)
return {
"receipt_id": raw.receipt_id,
"purchase_date": raw.purchase_date,
"total": total,
"subtotal": _to_decimal(subtotal) if subtotal is not None else None,
"tax": _to_decimal(tax) if tax is not None else None,
"savings_total": _to_decimal(savings) if savings is not None else None,
"source_url": raw.source_url,
"raw_data": data,
"items": items,
}
+30
View File
@@ -0,0 +1,30 @@
"""Receipt & product matching pipeline — receipt normalization and product dedup."""
from receiptwitness.pipeline.matching import (
ConfidenceLevel,
ProductMatcher,
match_purchase_item,
)
from receiptwitness.pipeline.normalization import (
MatchMethod,
MatchResult,
clean_name,
extract_size_info,
jaccard_similarity,
normalize_product,
)
from receiptwitness.pipeline.receipt import normalize_receipt, parse_meijer_item
__all__ = [
"ConfidenceLevel",
"MatchMethod",
"MatchResult",
"ProductMatcher",
"clean_name",
"extract_size_info",
"jaccard_similarity",
"match_purchase_item",
"normalize_product",
"normalize_receipt",
"parse_meijer_item",
]
+136
View File
@@ -0,0 +1,136 @@
"""Product matching & dedup — UPC primary, fuzzy name fallback, confidence scoring.
Wraps the Phase 1 normalization module with confidence-level classification
and batch matching for purchase ingestion.
"""
import uuid
from dataclasses import dataclass
from cartsnitch_common.constants import MatchConfidence
from cartsnitch_common.models.product import NormalizedProduct
from cartsnitch_common.schemas.purchase import PurchaseItemCreate
from sqlalchemy.orm import Session
from receiptwitness.pipeline.normalization import (
MatchMethod,
MatchResult,
extract_size_info,
normalize_product,
)
# Re-export for convenience
ConfidenceLevel = MatchConfidence
@dataclass(frozen=True)
class MatchOutcome:
"""Result of matching a single purchase item to a normalized product."""
item_index: int
match: MatchResult | None
confidence_level: MatchConfidence
created_new: bool = False
def classify_confidence(score: float, method: MatchMethod) -> MatchConfidence:
"""Classify a match score into high/medium/low confidence."""
if method == MatchMethod.UPC:
return MatchConfidence.HIGH
# Name-based matching thresholds
if score >= 0.8:
return MatchConfidence.HIGH
if score >= 0.5:
return MatchConfidence.MEDIUM
return MatchConfidence.LOW
def _create_product_from_item(
session: Session,
item: PurchaseItemCreate,
) -> NormalizedProduct:
"""Create a new NormalizedProduct from a purchase item that had no match."""
size_info = extract_size_info(item.product_name_raw)
product = NormalizedProduct(
id=uuid.uuid4(),
canonical_name=item.product_name_raw,
size=size_info[0] if size_info else None,
size_unit=size_info[1] if size_info else None,
upc_variants=[item.upc] if item.upc else [],
)
session.add(product)
session.flush()
return product
class ProductMatcher:
"""Batch product matcher for purchase ingestion.
Usage:
matcher = ProductMatcher(session)
outcomes = matcher.match_items(items)
"""
def __init__(
self,
session: Session,
name_threshold: float = 0.4,
auto_create: bool = True,
):
self.session = session
self.name_threshold = name_threshold
self.auto_create = auto_create
def match_single(
self,
item: PurchaseItemCreate,
) -> tuple[NormalizedProduct | None, MatchResult | None, MatchConfidence]:
"""Match a single purchase item to a normalized product.
Returns (product, match_result, confidence_level).
If auto_create is True and no match found, creates a new product.
"""
result = normalize_product(
self.session,
item.product_name_raw,
upc=item.upc,
name_threshold=self.name_threshold,
)
if result:
confidence = classify_confidence(result.confidence, result.method)
return result.product, result, confidence
if self.auto_create:
product = _create_product_from_item(self.session, item)
return product, None, MatchConfidence.LOW
return None, None, MatchConfidence.LOW
def match_items(self, items: list[PurchaseItemCreate]) -> list[MatchOutcome]:
"""Match a batch of purchase items. Returns outcomes in order."""
outcomes: list[MatchOutcome] = []
for idx, item in enumerate(items):
product, result, confidence = self.match_single(item)
created = result is None and product is not None
outcomes.append(
MatchOutcome(
item_index=idx,
match=result,
confidence_level=confidence,
created_new=created,
)
)
return outcomes
def match_purchase_item(
session: Session,
item: PurchaseItemCreate,
name_threshold: float = 0.4,
auto_create: bool = True,
) -> tuple[NormalizedProduct | None, MatchConfidence]:
"""Convenience function: match a single item, return (product, confidence)."""
matcher = ProductMatcher(session, name_threshold=name_threshold, auto_create=auto_create)
product, _, confidence = matcher.match_single(item)
return product, confidence
@@ -0,0 +1,155 @@
"""Product normalization — Phase 1: UPC matching + fuzzy name matching.
Matches products across retailers by:
1. Exact UPC match (highest confidence)
2. Fuzzy name matching via token-based Jaccard similarity (lower confidence)
"""
import re
from dataclasses import dataclass
from enum import StrEnum
from cartsnitch_common.models.product import NormalizedProduct
from sqlalchemy import select
from sqlalchemy.orm import Session
class MatchMethod(StrEnum):
"""How a product match was determined."""
UPC = "upc"
NAME = "name"
@dataclass(frozen=True)
class MatchResult:
"""Result of a product normalization attempt."""
product: NormalizedProduct
confidence: float
method: MatchMethod
# Noise words stripped during name cleaning
_NOISE_WORDS = frozenset(
{
"the",
"a",
"an",
"and",
"or",
"of",
"with",
"in",
"for",
"to",
"brand",
"original",
"classic",
"new",
"improved",
}
)
# Regex for extracting size info (e.g., "16 oz", "1.5 lb", "12 ct")
_SIZE_PATTERN = re.compile(
r"(\d+(?:\.\d+)?)\s*(oz|fl\s*oz|lb|lbs|g|kg|ml|l|ct|pk|count|pack)\b",
re.IGNORECASE,
)
def clean_name(name: str) -> str:
"""Normalize a product name for comparison.
- Lowercase
- Remove size info (e.g., "16 oz")
- Strip noise words
- Collapse whitespace
"""
cleaned = name.lower()
cleaned = _SIZE_PATTERN.sub("", cleaned)
cleaned = re.sub(r"[^\w\s]", " ", cleaned)
tokens = cleaned.split()
tokens = [t for t in tokens if t not in _NOISE_WORDS]
return " ".join(tokens)
def extract_size_info(name: str) -> tuple[str, str] | None:
"""Extract (size, unit) from a product name, if present."""
match = _SIZE_PATTERN.search(name)
if match:
return match.group(1), match.group(2).lower().replace(" ", "_")
return None
def jaccard_similarity(a: str, b: str) -> float:
"""Token-based Jaccard similarity between two cleaned names."""
tokens_a = set(a.split())
tokens_b = set(b.split())
if not tokens_a or not tokens_b:
return 0.0
intersection = tokens_a & tokens_b
union = tokens_a | tokens_b
return len(intersection) / len(union)
def match_by_upc(session: Session, upc: str) -> MatchResult | None:
"""Find a normalized product by exact UPC match.
Loads products with upc_variants and checks membership in Python
for cross-database compatibility (works on both PostgreSQL and SQLite).
"""
# TODO: Use PostgreSQL JSON containment query (@>) for production.
# Current approach loads all products into memory — acceptable for tests
# and small datasets, but will not scale.
stmt = select(NormalizedProduct).where(NormalizedProduct.upc_variants.is_not(None))
products = session.execute(stmt).scalars().all()
for product in products:
if product.upc_variants and upc in product.upc_variants:
return MatchResult(product=product, confidence=1.0, method=MatchMethod.UPC)
return None
def match_by_name(
session: Session,
name: str,
threshold: float = 0.5,
) -> MatchResult | None:
"""Find the best normalized product by fuzzy name matching.
Loads all normalized products and computes Jaccard similarity.
Returns the best match above the threshold, or None.
"""
# TODO: Use pg_trgm similarity index for production.
# Current approach loads all products into memory — acceptable for tests
# and small datasets, but will not scale.
cleaned = clean_name(name)
stmt = select(NormalizedProduct)
products = session.execute(stmt).scalars().all()
best_match: NormalizedProduct | None = None
best_score = 0.0
for product in products:
score = jaccard_similarity(cleaned, clean_name(product.canonical_name))
if score > best_score and score >= threshold:
best_score = score
best_match = product
if best_match:
return MatchResult(product=best_match, confidence=best_score, method=MatchMethod.NAME)
return None
def normalize_product(
session: Session,
name: str,
upc: str | None = None,
name_threshold: float = 0.5,
) -> MatchResult | None:
"""Full normalization pipeline: UPC first, then fuzzy name fallback."""
if upc:
result = match_by_upc(session, upc)
if result:
return result
return match_by_name(session, name, threshold=name_threshold)
+144
View File
@@ -0,0 +1,144 @@
"""Receipt normalization — parse raw Meijer scraper output into purchase records.
Maps raw receipt fields, cleans product names, extracts quantities/units.
"""
import re
from datetime import date
from decimal import Decimal, InvalidOperation
from cartsnitch_common.schemas.purchase import PurchaseCreate, PurchaseItemCreate
def _clean_product_name(raw: str) -> str:
"""Clean raw product name from scraper output."""
cleaned = raw.strip()
# Remove leading/trailing non-alphanumeric chars
cleaned = re.sub(r"^\W+|\W+$", "", cleaned)
# Collapse internal whitespace
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned
def _safe_decimal(
value: str | float | int | Decimal | None,
default: Decimal = Decimal("0"),
) -> Decimal:
"""Safely convert a value to Decimal."""
if value is None:
return default
try:
return Decimal(str(value))
except (InvalidOperation, ValueError):
return default
def parse_meijer_item(raw_item: dict) -> PurchaseItemCreate:
"""Parse a single Meijer scraper line item into a PurchaseItemCreate.
Expected raw_item keys (from Meijer scraper):
- description / name: product name
- upc / upcCode: UPC barcode
- quantity / qty: number of units
- unitPrice / price: per-unit price
- extendedPrice / totalPrice: line total
- regularPrice: shelf price before discounts
- salePrice: sale price if applicable
- couponAmount / couponDiscount: coupon savings
- loyaltyAmount / loyaltyDiscount: loyalty savings
- category / department: raw category
"""
name = raw_item.get("description") or raw_item.get("name") or ""
cleaned_name = _clean_product_name(name)
upc = raw_item.get("upc") or raw_item.get("upcCode")
if upc:
upc = str(upc).strip().lstrip("0") or str(upc).strip()
qty = _safe_decimal(
raw_item.get("quantity") or raw_item.get("qty"),
default=Decimal("1"),
)
unit_price = _safe_decimal(raw_item.get("unitPrice") or raw_item.get("price"))
extended = _safe_decimal(raw_item.get("extendedPrice") or raw_item.get("totalPrice"))
if extended == Decimal("0") and unit_price > 0:
extended = unit_price * qty
regular = raw_item.get("regularPrice")
sale = raw_item.get("salePrice")
coupon = raw_item.get("couponAmount") or raw_item.get("couponDiscount")
loyalty = raw_item.get("loyaltyAmount") or raw_item.get("loyaltyDiscount")
category = raw_item.get("category") or raw_item.get("department")
return PurchaseItemCreate(
product_name_raw=cleaned_name,
upc=upc,
quantity=qty,
unit_price=unit_price,
extended_price=extended,
regular_price=_safe_decimal(regular) if regular is not None else None,
sale_price=_safe_decimal(sale) if sale is not None else None,
coupon_discount=_safe_decimal(coupon) if coupon is not None else None,
loyalty_discount=_safe_decimal(loyalty) if loyalty is not None else None,
category_raw=str(category).strip() if category else None,
)
def normalize_receipt(
raw_receipt: dict,
user_id: str,
store_id: str,
) -> PurchaseCreate:
"""Parse a complete Meijer raw receipt into a PurchaseCreate.
Expected raw_receipt keys:
- receiptId / receipt_id / id: unique receipt identifier
- date / purchaseDate / purchase_date: purchase date (YYYY-MM-DD or similar)
- total / totalAmount: receipt total
- subtotal: pre-tax subtotal
- tax / taxAmount: tax amount
- savings / totalSavings: total discount savings
- items: list of raw line item dicts
"""
import uuid
receipt_id = str(
raw_receipt.get("receiptId")
or raw_receipt.get("receipt_id")
or raw_receipt.get("id")
or uuid.uuid4()
)
raw_date = (
raw_receipt.get("date")
or raw_receipt.get("purchaseDate")
or raw_receipt.get("purchase_date")
)
if isinstance(raw_date, str):
purchase_date = date.fromisoformat(raw_date[:10])
elif isinstance(raw_date, date):
purchase_date = raw_date
else:
purchase_date = date.today()
total = _safe_decimal(raw_receipt.get("total") or raw_receipt.get("totalAmount"))
subtotal = raw_receipt.get("subtotal")
tax = raw_receipt.get("tax") or raw_receipt.get("taxAmount")
savings = raw_receipt.get("savings") or raw_receipt.get("totalSavings")
raw_items = raw_receipt.get("items") or []
items = [parse_meijer_item(item) for item in raw_items]
return PurchaseCreate(
user_id=uuid.UUID(user_id) if isinstance(user_id, str) else user_id,
store_id=uuid.UUID(store_id) if isinstance(store_id, str) else store_id,
receipt_id=receipt_id,
purchase_date=purchase_date,
total=total,
subtotal=_safe_decimal(subtotal) if subtotal is not None else None,
tax=_safe_decimal(tax) if tax is not None else None,
savings_total=_safe_decimal(savings) if savings is not None else None,
raw_data=raw_receipt,
items=items,
)
+1
View File
@@ -0,0 +1 @@
"""Retailer scrapers."""
+72
View File
@@ -0,0 +1,72 @@
"""Abstract base scraper interface for all retailer scrapers."""
import asyncio
import random
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from receiptwitness.config import settings
@dataclass
class SessionData:
"""Holds session cookies and metadata for a retailer login."""
cookies: list[dict]
user_agent: str
created_at: datetime
expires_at: datetime | None = None
extra: dict = field(default_factory=dict)
@dataclass
class RawReceipt:
"""Raw receipt data before parsing."""
receipt_id: str
purchase_date: str
store_number: str | None = None
raw_data: dict = field(default_factory=dict)
source_url: str | None = None
class BaseScraper(ABC):
"""All retailer scrapers implement this interface.
Provides common functionality: human-like delays, rate limiting guards,
and the abstract methods each retailer scraper must implement.
"""
@abstractmethod
async def login(self, username: str, password: str) -> SessionData:
"""Authenticate with the retailer portal and return session data."""
...
@abstractmethod
async def check_session(self, session: SessionData) -> bool:
"""Verify if an existing session is still valid."""
...
@abstractmethod
async def scrape_receipts(
self, session: SessionData, since: datetime | None = None
) -> list[RawReceipt]:
"""Scrape receipt data from the retailer portal."""
...
@abstractmethod
def parse_receipt(self, raw: RawReceipt) -> dict:
"""Parse a raw receipt into structured data.
Returns a dict with keys matching PurchaseCreate schema fields,
including an 'items' list matching PurchaseItemCreate fields.
"""
...
async def human_delay(self, min_ms: int | None = None, max_ms: int | None = None) -> None:
"""Sleep for a randomized human-like interval."""
lo = min_ms or settings.min_request_delay_ms
hi = max_ms or settings.max_request_delay_ms
delay = random.randint(lo, hi) / 1000.0
await asyncio.sleep(delay)
+344
View File
@@ -0,0 +1,344 @@
"""Kroger loyalty portal scraper using Playwright.
Kroger uses Akamai Bot Manager for aggressive headless browser detection.
This scraper uses enhanced stealth measures including playwright-stealth,
realistic fingerprinting, and human-like interaction pacing.
"""
import logging
from datetime import UTC, datetime, timedelta
from typing import cast
from playwright.async_api import BrowserContext, Page, Playwright, async_playwright
from receiptwitness.config import settings
from receiptwitness.scrapers.base import BaseScraper, RawReceipt, SessionData
logger = logging.getLogger(__name__)
# Kroger endpoints
KROGER_BASE = "https://www.kroger.com"
KROGER_LOGIN_PAGE = f"{KROGER_BASE}/signin"
KROGER_PURCHASE_HISTORY = f"{KROGER_BASE}/mypurchases"
KROGER_RECEIPT_API = f"{KROGER_BASE}/atlas/v1/purchase-history/api"
KROGER_RECEIPT_DETAIL_API = f"{KROGER_BASE}/atlas/v1/receipt/api"
KROGER_ACCOUNT_PAGE = f"{KROGER_BASE}/account/dashboard"
# Realistic browser fingerprint — Chrome on Windows (matches Kroger's typical audience)
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
DEFAULT_VIEWPORT = {"width": 1920, "height": 1080}
DEFAULT_LOCALE = "en-US"
DEFAULT_TIMEZONE = "America/New_York"
class KrogerScraper(BaseScraper):
"""Scraper for Kroger loyalty purchase history.
Kroger uses Akamai Bot Manager which aggressively detects headless
browsers. This scraper employs enhanced stealth measures:
- Masks webdriver/automation signals
- Sets realistic browser fingerprint
- Uses human-like interaction pacing
- Preserves browser context across sessions
"""
async def _create_stealth_context(
self, playwright_instance: Playwright, cookies: list[dict] | None = None
) -> BrowserContext:
"""Create a browser context with enhanced stealth for Akamai evasion."""
browser = await playwright_instance.chromium.launch(
headless=settings.headless,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-infobars",
"--window-size=1920,1080",
],
)
context = await browser.new_context(
user_agent=DEFAULT_USER_AGENT,
viewport=DEFAULT_VIEWPORT, # type: ignore[arg-type]
locale=DEFAULT_LOCALE,
timezone_id=DEFAULT_TIMEZONE,
java_script_enabled=True,
bypass_csp=False,
color_scheme="light",
has_touch=False,
)
# Enhanced stealth script targeting Akamai Bot Manager detection vectors
await context.add_init_script(
"""
// Mask webdriver flag
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// Chrome runtime object
window.chrome = {
runtime: {},
loadTimes: function() {},
csi: function() {},
app: { isInstalled: false }
};
// Realistic plugin array
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
// Languages
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
});
// Platform
Object.defineProperty(navigator, 'platform', {
get: () => 'Win32'
});
// Hardware concurrency
Object.defineProperty(navigator, 'hardwareConcurrency', {
get: () => 8
});
// Device memory
Object.defineProperty(navigator, 'deviceMemory', {
get: () => 8
});
// Permissions query override (Akamai checks this)
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) =>
parameters.name === 'notifications'
? Promise.resolve({ state: Notification.permission })
: originalQuery(parameters);
// WebGL vendor/renderer (avoid "Google Inc." / "ANGLE" tells)
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function(parameter) {
if (parameter === 37445) return 'Intel Inc.';
if (parameter === 37446) return 'Intel Iris OpenGL Engine';
return getParameter.call(this, parameter);
};
"""
)
if cookies:
await context.add_cookies(cookies) # type: ignore[arg-type]
return cast(BrowserContext, context)
async def login(self, username: str, password: str) -> SessionData:
"""Log in to Kroger and capture session cookies."""
async with async_playwright() as p:
context = await self._create_stealth_context(p)
page = await context.new_page()
try:
return await self._perform_login(page, context, username, password)
finally:
if context.browser:
await context.browser.close()
async def _perform_login(
self, page: Page, context: BrowserContext, username: str, password: str
) -> SessionData:
"""Execute the Kroger login flow."""
logger.info("Navigating to Kroger sign-in page")
await page.goto(KROGER_LOGIN_PAGE, wait_until="networkidle")
await self.human_delay(2000, 4000)
# Kroger login form — email/username field
email_input = page.locator(
'input[id="SignIn-emailInput"], '
'input[name="email"], '
'input[type="email"], '
'input[data-testid="SignIn-emailInput"]'
)
await email_input.wait_for(state="visible", timeout=settings.browser_timeout_ms)
await email_input.click()
await self.human_delay(300, 700)
await email_input.fill(username)
await self.human_delay(800, 1500)
# Password field
password_input = page.locator(
'input[id="SignIn-passwordInput"], '
'input[name="password"], '
'input[type="password"], '
'input[data-testid="SignIn-passwordInput"]'
)
await password_input.wait_for(state="visible", timeout=settings.browser_timeout_ms)
await password_input.click()
await self.human_delay(300, 700)
await password_input.fill(password)
await self.human_delay(1000, 2000)
# Sign-in button
sign_in_btn = page.locator(
'button[id="SignIn-submitButton"], '
'button[data-testid="SignIn-submitButton"], '
'button[type="submit"]:has-text("Sign In")'
)
await sign_in_btn.click()
# Wait for redirect away from sign-in page
await page.wait_for_url(
lambda url: "signin" not in url.lower(),
timeout=settings.browser_timeout_ms,
)
await self.human_delay(1500, 3000)
# Capture cookies
raw_cookies = await context.cookies()
cookies = [dict(c) for c in raw_cookies]
now = datetime.now(UTC)
logger.info("Kroger login successful, captured %d cookies", len(cookies))
return SessionData(
cookies=cookies,
user_agent=DEFAULT_USER_AGENT,
created_at=now,
expires_at=now + timedelta(hours=2),
extra={"retailer": "kroger"},
)
async def check_session(self, session: SessionData) -> bool:
"""Check if the Kroger session is still valid."""
if session.expires_at and datetime.now(UTC) > session.expires_at:
logger.info("Kroger session expired based on timestamp")
return False
async with async_playwright() as p:
context = await self._create_stealth_context(p, cookies=session.cookies)
page = await context.new_page()
try:
response = await page.goto(KROGER_ACCOUNT_PAGE, wait_until="networkidle")
current_url = page.url.lower()
is_valid = "signin" not in current_url and response is not None and response.ok
logger.info("Kroger session check: valid=%s (url=%s)", is_valid, page.url)
return is_valid
except Exception:
logger.exception("Kroger session check failed")
return False
finally:
if context.browser:
await context.browser.close()
async def scrape_receipts(
self, session: SessionData, since: datetime | None = None
) -> list[RawReceipt]:
"""Scrape purchase history from Kroger."""
async with async_playwright() as p:
context = await self._create_stealth_context(p, cookies=session.cookies)
page = await context.new_page()
try:
return await self._fetch_receipts(page, since)
finally:
if context.browser:
await context.browser.close()
async def _fetch_receipts(self, page: Page, since: datetime | None) -> list[RawReceipt]:
"""Fetch receipt list and details from Kroger purchase history."""
# Navigate to purchase history to establish context
await page.goto(KROGER_PURCHASE_HISTORY, wait_until="networkidle")
await self.human_delay(1500, 3000)
receipts: list[RawReceipt] = []
# Kroger purchase history API endpoint
api_response = await page.request.get(KROGER_RECEIPT_API)
if not api_response.ok:
logger.warning(
"Kroger purchase history request failed: %d %s",
api_response.status,
api_response.status_text,
)
return []
response = await api_response.json()
if not isinstance(response, dict):
logger.warning("Unexpected purchase history response type: %s", type(response))
return []
# Handle Kroger's response structure
orders = response.get("orders", response.get("purchases", []))
if not isinstance(orders, list):
logger.warning("No orders found in Kroger purchase history response")
return []
logger.info("Found %d orders in Kroger purchase history", len(orders))
for order in orders:
raw_id = order.get("orderId") or order.get("receiptId") or order.get("id") or ""
order_id = str(raw_id)
purchase_date = order.get(
"purchaseDate", order.get("transactionDate", order.get("date", ""))
)
# Filter by date if 'since' is provided
if since and purchase_date:
try:
txn_dt = datetime.fromisoformat(purchase_date.replace("Z", "+00:00"))
if txn_dt < since:
continue
except (ValueError, TypeError):
pass
if not order_id:
continue
await self.human_delay(1000, 2500)
# Fetch receipt detail
detail = await self._fetch_receipt_detail(page, order_id)
raw_store = (
order.get("storeNumber")
or order.get("divisionNumber")
or order.get("storeId")
or ""
)
store_number = str(raw_store)
receipts.append(
RawReceipt(
receipt_id=order_id,
purchase_date=purchase_date,
store_number=store_number,
raw_data={**order, "detail": detail},
source_url=f"{KROGER_RECEIPT_DETAIL_API}?orderId={order_id}",
)
)
logger.info("Scraped %d receipts from Kroger", len(receipts))
return receipts
async def _fetch_receipt_detail(self, page: Page, order_id: str) -> dict:
"""Fetch detailed receipt data for a single Kroger order."""
try:
url = f"{KROGER_RECEIPT_DETAIL_API}?orderId={order_id}"
api_response = await page.request.get(url)
if not api_response.ok:
logger.warning(
"Kroger receipt detail request failed for %s: %d",
order_id,
api_response.status,
)
return {}
detail = await api_response.json()
return detail if isinstance(detail, dict) else {}
except Exception:
logger.exception("Failed to fetch Kroger receipt detail for %s", order_id)
return {}
def parse_receipt(self, raw: RawReceipt) -> dict:
"""Parse raw Kroger receipt into structured purchase data."""
from receiptwitness.parsers.kroger import parse_kroger_receipt
return parse_kroger_receipt(raw)
+301
View File
@@ -0,0 +1,301 @@
"""Meijer mPerks scraper using Playwright.
Meijer has no public API. We reverse-engineer the XHR endpoints the mPerks
web app uses to pull purchase history and receipt data. The flow:
1. Launch stealth Playwright browser
2. Navigate to mPerks login page and authenticate
3. Capture session cookies after successful login
4. Use those cookies to hit the mPerks receipt API endpoints directly
5. Parse receipt JSON into structured PurchaseCreate records
Key endpoints (reverse-engineered from mPerks SPA):
- Login: POST https://www.meijer.com/bin/meijer/account/login
- Receipts: GET https://www.meijer.com/bin/meijer/profile/purchasehistory
- Receipt detail: GET https://www.meijer.com/bin/meijer/profile/receipt?receiptId=...
"""
import logging
from datetime import UTC, datetime, timedelta
from typing import cast
from playwright.async_api import BrowserContext, Page, Playwright, async_playwright
from receiptwitness.config import settings
from receiptwitness.scrapers.base import BaseScraper, RawReceipt, SessionData
logger = logging.getLogger(__name__)
# Meijer mPerks URLs
MEIJER_BASE = "https://www.meijer.com"
MEIJER_LOGIN_PAGE = f"{MEIJER_BASE}/shopping/login.html"
MEIJER_LOGIN_API = f"{MEIJER_BASE}/bin/meijer/account/login"
MEIJER_PURCHASE_HISTORY = f"{MEIJER_BASE}/bin/meijer/profile/purchasehistory"
MEIJER_RECEIPT_DETAIL = f"{MEIJER_BASE}/bin/meijer/profile/receipt"
MEIJER_MPERKS_HOME = f"{MEIJER_BASE}/mperks.html"
# Realistic browser fingerprint
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
DEFAULT_VIEWPORT = {"width": 1920, "height": 1080}
DEFAULT_LOCALE = "en-US"
DEFAULT_TIMEZONE = "America/Detroit" # Meijer HQ is in Grand Rapids, MI
class MeijerScraper(BaseScraper):
"""Scraper for Meijer mPerks purchase history."""
async def _create_stealth_context(
self, playwright_instance: Playwright, cookies: list[dict] | None = None
) -> BrowserContext:
"""Create a browser context with stealth settings."""
browser = await playwright_instance.chromium.launch(
headless=settings.headless,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
],
)
context = await browser.new_context(
user_agent=DEFAULT_USER_AGENT,
viewport=DEFAULT_VIEWPORT, # type: ignore[arg-type]
locale=DEFAULT_LOCALE,
timezone_id=DEFAULT_TIMEZONE,
java_script_enabled=True,
bypass_csp=False,
)
# Mask webdriver flag
await context.add_init_script(
"""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// Mask chrome automation indicators
window.chrome = { runtime: {} };
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
});
"""
)
if cookies:
await context.add_cookies(cookies) # type: ignore[arg-type]
return cast(BrowserContext, context)
async def login(self, username: str, password: str) -> SessionData:
"""Log in to Meijer mPerks and capture session cookies.
The mPerks login flow:
1. Navigate to login page
2. Fill email and password fields
3. Click sign-in button
4. Wait for redirect to mPerks dashboard
5. Extract session cookies
"""
async with async_playwright() as p:
context = await self._create_stealth_context(p)
page = await context.new_page()
try:
return await self._perform_login(page, context, username, password)
finally:
if context.browser:
await context.browser.close()
async def _perform_login(
self, page: Page, context: BrowserContext, username: str, password: str
) -> SessionData:
"""Execute the login flow on the mPerks portal."""
logger.info("Navigating to Meijer login page")
await page.goto(MEIJER_LOGIN_PAGE, wait_until="networkidle")
await self.human_delay(1500, 3000)
# Fill email field
email_input = page.locator('input[type="email"], input[name="email"], #email')
await email_input.wait_for(state="visible", timeout=settings.browser_timeout_ms)
await email_input.click()
await self.human_delay(200, 500)
await email_input.fill(username)
await self.human_delay(500, 1000)
# Fill password field
password_input = page.locator('input[type="password"], input[name="password"], #password')
await password_input.wait_for(state="visible", timeout=settings.browser_timeout_ms)
await password_input.click()
await self.human_delay(200, 500)
await password_input.fill(password)
await self.human_delay(500, 1500)
# Click sign-in button
sign_in_btn = page.locator(
'button[type="submit"], button:has-text("Sign In"), button:has-text("Log In")'
)
await sign_in_btn.click()
# Wait for navigation after login
await page.wait_for_url(
lambda url: "login" not in url.lower(),
timeout=settings.browser_timeout_ms,
)
await self.human_delay(1000, 2000)
# Capture cookies
raw_cookies = await context.cookies()
cookies = [dict(c) for c in raw_cookies]
now = datetime.now(UTC)
logger.info("Meijer login successful, captured %d cookies", len(cookies))
return SessionData(
cookies=cookies,
user_agent=DEFAULT_USER_AGENT,
created_at=now,
expires_at=now + timedelta(hours=4),
)
async def check_session(self, session: SessionData) -> bool:
"""Check if the mPerks session is still valid.
Makes a lightweight request to the mPerks home page and checks
if we get redirected to login (session expired) or not.
"""
if session.expires_at and datetime.now(UTC) > session.expires_at:
logger.info("Meijer session expired based on timestamp")
return False
async with async_playwright() as p:
context = await self._create_stealth_context(p, cookies=session.cookies)
page = await context.new_page()
try:
response = await page.goto(MEIJER_MPERKS_HOME, wait_until="networkidle")
current_url = page.url.lower()
is_valid = "login" not in current_url and response is not None and response.ok
logger.info("Meijer session check: valid=%s (url=%s)", is_valid, page.url)
return is_valid
except Exception:
logger.exception("Meijer session check failed")
return False
finally:
if context.browser:
await context.browser.close()
async def scrape_receipts(
self, session: SessionData, since: datetime | None = None
) -> list[RawReceipt]:
"""Scrape purchase history from Meijer mPerks.
Uses the XHR endpoints the mPerks SPA calls to fetch receipt data.
The purchase history endpoint returns a list of recent transactions,
and we can fetch individual receipt details for line items.
"""
async with async_playwright() as p:
context = await self._create_stealth_context(p, cookies=session.cookies)
page = await context.new_page()
try:
return await self._fetch_receipts(page, since)
finally:
if context.browser:
await context.browser.close()
async def _fetch_receipts(self, page: Page, since: datetime | None) -> list[RawReceipt]:
"""Fetch receipt list and detail via mPerks XHR endpoints.
Uses Playwright's page.request API (APIRequestContext) instead of
page.evaluate(fetch(...)) for better observability — requests show up
in Playwright traces and can be intercepted by route handlers.
"""
# Navigate to mPerks to establish context (cookies need domain context)
await page.goto(MEIJER_MPERKS_HOME, wait_until="networkidle")
await self.human_delay(1000, 2000)
receipts: list[RawReceipt] = []
# Fetch purchase history listing via page.request (APIRequestContext)
api_response = await page.request.get(MEIJER_PURCHASE_HISTORY)
if not api_response.ok:
logger.warning(
"Purchase history request failed: %d %s",
api_response.status,
api_response.status_text,
)
return []
response = await api_response.json()
if not isinstance(response, dict):
logger.warning("Unexpected purchase history response type: %s", type(response))
return []
transactions = response.get("transactions", response.get("purchaseHistory", []))
if not isinstance(transactions, list):
logger.warning("No transactions found in purchase history response")
return []
logger.info("Found %d transactions in Meijer purchase history", len(transactions))
for txn in transactions:
receipt_id = str(txn.get("transactionId", txn.get("receiptId", "")))
purchase_date = txn.get("transactionDate", txn.get("purchaseDate", ""))
# Filter by date if 'since' is provided
if since and purchase_date:
try:
txn_dt = datetime.fromisoformat(purchase_date.replace("Z", "+00:00"))
if txn_dt < since:
continue
except (ValueError, TypeError):
pass
if not receipt_id:
continue
await self.human_delay(800, 2000)
# Fetch receipt detail
detail = await self._fetch_receipt_detail(page, receipt_id)
receipts.append(
RawReceipt(
receipt_id=receipt_id,
purchase_date=purchase_date,
store_number=str(txn.get("storeNumber", txn.get("storeId", ""))),
raw_data={**txn, "detail": detail},
source_url=f"{MEIJER_RECEIPT_DETAIL}?receiptId={receipt_id}",
)
)
logger.info("Scraped %d receipts from Meijer", len(receipts))
return receipts
async def _fetch_receipt_detail(self, page: Page, receipt_id: str) -> dict:
"""Fetch detailed receipt data for a single transaction.
Uses Playwright's page.request API for traceability.
"""
try:
url = f"{MEIJER_RECEIPT_DETAIL}?receiptId={receipt_id}"
api_response = await page.request.get(url)
if not api_response.ok:
logger.warning(
"Receipt detail request failed for %s: %d",
receipt_id,
api_response.status,
)
return {}
detail = await api_response.json()
return detail if isinstance(detail, dict) else {}
except Exception:
logger.exception("Failed to fetch receipt detail for %s", receipt_id)
return {}
def parse_receipt(self, raw: RawReceipt) -> dict:
"""Parse raw Meijer receipt into structured purchase data.
Delegates to the dedicated parser module.
"""
from receiptwitness.parsers.meijer import parse_meijer_receipt
return parse_meijer_receipt(raw)
+326
View File
@@ -0,0 +1,326 @@
"""Target Circle scraper using Playwright.
Target stores ~1 year of in-store purchase history tied to Circle accounts.
Purchases appear when the user pays with a linked card, uses the Target app
wallet, or enters their Circle phone number at checkout.
Key endpoints (reverse-engineered from target.com SPA):
- Login: POST https://gsp.target.com/gsp/authentications/v1/auth_codes
- Order history: GET https://api.target.com/order_history/v1/orders (in-store tab)
- Receipt detail: GET https://api.target.com/order_history/v1/orders/{orderId}
"""
import logging
from datetime import UTC, datetime, timedelta
from typing import cast
from playwright.async_api import BrowserContext, Page, Playwright, async_playwright
from receiptwitness.config import settings
from receiptwitness.scrapers.base import BaseScraper, RawReceipt, SessionData
logger = logging.getLogger(__name__)
# Target endpoints
TARGET_BASE = "https://www.target.com"
TARGET_LOGIN_PAGE = f"{TARGET_BASE}/login"
TARGET_ACCOUNT_PAGE = f"{TARGET_BASE}/account"
TARGET_ORDER_HISTORY = f"{TARGET_BASE}/account/orders"
TARGET_ORDER_API = "https://api.target.com/order_history/v1/orders"
TARGET_RECEIPT_API = "https://api.target.com/order_history/v1/orders"
# Realistic browser fingerprint — Chrome on Windows
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
DEFAULT_VIEWPORT = {"width": 1920, "height": 1080}
DEFAULT_LOCALE = "en-US"
DEFAULT_TIMEZONE = "America/Detroit" # SE Michigan coverage
class TargetScraper(BaseScraper):
"""Scraper for Target Circle in-store purchase history.
Target's order history SPA loads purchase data from internal API
endpoints. This scraper authenticates via the web login flow,
captures session cookies, and uses those to hit the order history
API for in-store receipt data.
"""
async def _create_stealth_context(
self, playwright_instance: Playwright, cookies: list[dict] | None = None
) -> BrowserContext:
"""Create a browser context with stealth settings for Target."""
browser = await playwright_instance.chromium.launch(
headless=settings.headless,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-dev-shm-usage",
],
)
context = await browser.new_context(
user_agent=DEFAULT_USER_AGENT,
viewport=DEFAULT_VIEWPORT, # type: ignore[arg-type]
locale=DEFAULT_LOCALE,
timezone_id=DEFAULT_TIMEZONE,
java_script_enabled=True,
bypass_csp=False,
color_scheme="light",
has_touch=False,
)
# Mask webdriver and automation signals
await context.add_init_script(
"""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
window.chrome = {
runtime: {},
loadTimes: function() {},
csi: function() {},
app: { isInstalled: false }
};
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
});
Object.defineProperty(navigator, 'platform', {
get: () => 'Win32'
});
Object.defineProperty(navigator, 'hardwareConcurrency', {
get: () => 8
});
Object.defineProperty(navigator, 'deviceMemory', {
get: () => 8
});
"""
)
if cookies:
await context.add_cookies(cookies) # type: ignore[arg-type]
return cast(BrowserContext, context)
async def login(self, username: str, password: str) -> SessionData:
"""Log in to Target and capture session cookies."""
async with async_playwright() as p:
context = await self._create_stealth_context(p)
page = await context.new_page()
try:
return await self._perform_login(page, context, username, password)
finally:
if context.browser:
await context.browser.close()
async def _perform_login(
self, page: Page, context: BrowserContext, username: str, password: str
) -> SessionData:
"""Execute the Target login flow."""
logger.info("Navigating to Target sign-in page")
await page.goto(TARGET_LOGIN_PAGE, wait_until="networkidle")
await self.human_delay(2000, 4000)
# Target login form — email/username field
email_input = page.locator(
'input[id="username"], '
'input[name="username"], '
'input[type="email"], '
'input[data-test="username"]'
)
await email_input.wait_for(state="visible", timeout=settings.browser_timeout_ms)
await email_input.click()
await self.human_delay(300, 700)
await email_input.fill(username)
await self.human_delay(800, 1500)
# Password field
password_input = page.locator(
'input[id="password"], '
'input[name="password"], '
'input[type="password"], '
'input[data-test="password"]'
)
await password_input.wait_for(state="visible", timeout=settings.browser_timeout_ms)
await password_input.click()
await self.human_delay(300, 700)
await password_input.fill(password)
await self.human_delay(1000, 2000)
# Sign-in button
sign_in_btn = page.locator(
'button[id="login"], '
'button[data-test="login-button"], '
'button[type="submit"]:has-text("Sign in")'
)
await sign_in_btn.click()
# Wait for redirect away from login page
await page.wait_for_url(
lambda url: "login" not in url.lower(),
timeout=settings.browser_timeout_ms,
)
await self.human_delay(1500, 3000)
# Capture cookies
raw_cookies = await context.cookies()
cookies = [dict(c) for c in raw_cookies]
now = datetime.now(UTC)
logger.info("Target login successful, captured %d cookies", len(cookies))
return SessionData(
cookies=cookies,
user_agent=DEFAULT_USER_AGENT,
created_at=now,
expires_at=now + timedelta(hours=2),
extra={"retailer": "target"},
)
async def check_session(self, session: SessionData) -> bool:
"""Check if the Target session is still valid."""
if session.expires_at and datetime.now(UTC) > session.expires_at:
logger.info("Target session expired based on timestamp")
return False
async with async_playwright() as p:
context = await self._create_stealth_context(p, cookies=session.cookies)
page = await context.new_page()
try:
response = await page.goto(TARGET_ACCOUNT_PAGE, wait_until="networkidle")
current_url = page.url.lower()
is_valid = "login" not in current_url and response is not None and response.ok
logger.info("Target session check: valid=%s (url=%s)", is_valid, page.url)
return is_valid
except Exception:
logger.exception("Target session check failed")
return False
finally:
if context.browser:
await context.browser.close()
async def scrape_receipts(
self, session: SessionData, since: datetime | None = None
) -> list[RawReceipt]:
"""Scrape in-store purchase history from Target Circle."""
async with async_playwright() as p:
context = await self._create_stealth_context(p, cookies=session.cookies)
page = await context.new_page()
try:
return await self._fetch_receipts(page, since)
finally:
if context.browser:
await context.browser.close()
async def _fetch_receipts(self, page: Page, since: datetime | None) -> list[RawReceipt]:
"""Fetch receipt list and details from Target order history.
Target's order history page has separate tabs for online and in-store
purchases. We target the in-store tab which shows Circle-linked
transactions.
"""
# Navigate to order history to establish context
await page.goto(TARGET_ORDER_HISTORY, wait_until="networkidle")
await self.human_delay(1500, 3000)
receipts: list[RawReceipt] = []
# Target order history API — filter for in-store purchases
api_response = await page.request.get(
TARGET_ORDER_API,
params={"channel": "in_store", "limit": "50"},
)
if not api_response.ok:
logger.warning(
"Target order history request failed: %d %s",
api_response.status,
api_response.status_text,
)
return []
response = await api_response.json()
if not isinstance(response, dict):
logger.warning("Unexpected order history response type: %s", type(response))
return []
# Target uses "orders" key for in-store purchase list
orders = response.get("orders", response.get("transactions", []))
if not isinstance(orders, list):
logger.warning("No orders found in Target order history response")
return []
logger.info("Found %d in-store orders in Target history", len(orders))
for order in orders:
raw_id = order.get("orderId") or order.get("transactionId") or order.get("id") or ""
order_id = str(raw_id)
purchase_date = order.get(
"purchaseDate",
order.get("transactionDate", order.get("date", "")),
)
# Filter by date if 'since' is provided
if since and purchase_date:
try:
txn_dt = datetime.fromisoformat(purchase_date.replace("Z", "+00:00"))
if txn_dt < since:
continue
except (ValueError, TypeError):
pass
if not order_id:
continue
await self.human_delay(1000, 2500)
# Fetch receipt detail
detail = await self._fetch_receipt_detail(page, order_id)
raw_store = (
order.get("storeNumber") or order.get("storeId") or order.get("locationId") or ""
)
store_number = str(raw_store)
receipts.append(
RawReceipt(
receipt_id=order_id,
purchase_date=purchase_date,
store_number=store_number,
raw_data={**order, "detail": detail},
source_url=f"{TARGET_RECEIPT_API}/{order_id}",
)
)
logger.info("Scraped %d receipts from Target", len(receipts))
return receipts
async def _fetch_receipt_detail(self, page: Page, order_id: str) -> dict:
"""Fetch detailed receipt data for a single Target order."""
try:
url = f"{TARGET_RECEIPT_API}/{order_id}"
api_response = await page.request.get(url)
if not api_response.ok:
logger.warning(
"Target receipt detail request failed for %s: %d",
order_id,
api_response.status,
)
return {}
detail = await api_response.json()
return detail if isinstance(detail, dict) else {}
except Exception:
logger.exception("Failed to fetch Target receipt detail for %s", order_id)
return {}
def parse_receipt(self, raw: RawReceipt) -> dict:
"""Parse raw Target receipt into structured purchase data."""
from receiptwitness.parsers.target import parse_target_receipt
return parse_target_receipt(raw)
+1
View File
@@ -0,0 +1 @@
"""Session management — encrypted cookie storage and refresh logic."""
+52
View File
@@ -0,0 +1,52 @@
"""Fernet-based encryption for session cookies at rest.
Session data (cookies, tokens) is encrypted before writing to the database
and decrypted only when needed for a scrape. The encryption key is provided
via the RW_SESSION_ENCRYPTION_KEY environment variable — it is never stored
in the database or logged.
"""
import json
import logging
from cryptography.fernet import Fernet, InvalidToken
from receiptwitness.config import settings
logger = logging.getLogger(__name__)
def _get_fernet() -> Fernet:
"""Get a Fernet instance using the configured encryption key."""
key = settings.session_encryption_key
if not key:
raise ValueError(
"RW_SESSION_ENCRYPTION_KEY is not set. "
"Generate one with: "
"python -c 'from cryptography.fernet import Fernet; "
"print(Fernet.generate_key().decode())'"
)
return Fernet(key.encode() if isinstance(key, str) else key)
def encrypt_session_data(data: dict) -> str:
"""Encrypt session data dict to a Fernet token string.
The data is JSON-serialized, then encrypted. The result is a
URL-safe base64-encoded string suitable for storing in JSONB.
"""
f = _get_fernet()
plaintext = json.dumps(data, default=str).encode("utf-8")
return f.encrypt(plaintext).decode("utf-8")
def decrypt_session_data(encrypted: str) -> dict:
"""Decrypt a Fernet token string back to a session data dict."""
f = _get_fernet()
try:
plaintext = f.decrypt(encrypted.encode("utf-8"))
result: dict = json.loads(plaintext)
return result
except InvalidToken:
logger.error("Failed to decrypt session data — invalid token or wrong key")
raise
+81
View File
@@ -0,0 +1,81 @@
"""Session storage, retrieval, and refresh logic.
Manages the lifecycle of retailer session data:
- Load encrypted session from DB
- Check validity via scraper
- Re-authenticate if expired
- Save new session back (encrypted)
"""
import logging
from dataclasses import asdict
from datetime import UTC, datetime
from receiptwitness.scrapers.base import BaseScraper, SessionData
from receiptwitness.session.encryption import decrypt_session_data, encrypt_session_data
logger = logging.getLogger(__name__)
def session_from_db_record(session_data_encrypted: str | None) -> SessionData | None:
"""Deserialize and decrypt a session from the database.
The session_data column in user_store_accounts stores the Fernet-encrypted
JSON of the SessionData fields.
"""
if not session_data_encrypted:
return None
try:
data = decrypt_session_data(session_data_encrypted)
return SessionData(
cookies=data["cookies"],
user_agent=data["user_agent"],
created_at=datetime.fromisoformat(data["created_at"]),
expires_at=(
datetime.fromisoformat(data["expires_at"]) if data.get("expires_at") else None
),
extra=data.get("extra", {}),
)
except Exception:
logger.exception("Failed to load session from DB record")
return None
def session_to_db_value(session: SessionData) -> str:
"""Serialize and encrypt a session for database storage."""
data = asdict(session)
# Convert datetime objects to ISO strings for JSON serialization
data["created_at"] = session.created_at.isoformat()
if session.expires_at:
data["expires_at"] = session.expires_at.isoformat()
return encrypt_session_data(data)
async def get_valid_session(
scraper: BaseScraper,
session_data_encrypted: str | None,
username: str,
password: str,
) -> tuple[SessionData, bool]:
"""Get a valid session, re-authenticating if needed.
Returns:
A tuple of (session, was_refreshed). If was_refreshed is True,
the caller should persist the new session to the database.
"""
# Try existing session first
existing = session_from_db_record(session_data_encrypted)
if existing:
if existing.expires_at and datetime.now(UTC) > existing.expires_at:
logger.info("Session expired by timestamp, re-authenticating")
elif await scraper.check_session(existing):
logger.info("Existing session is valid")
return existing, False
else:
logger.info("Session check failed, re-authenticating")
# Need to re-authenticate
logger.info("Performing fresh login")
new_session = await scraper.login(username, password)
return new_session, True