Squashed 'receiptwitness/' content from commit e8d374a

git-subtree-dir: receiptwitness
git-subtree-split: e8d374a89ed8978f429598e02d31b1c5963efe22
This commit is contained in:
Coupon Carl
2026-03-28 02:24:22 +00:00
commit 253fd8464f
27 changed files with 2559 additions and 0 deletions
+12
View File
@@ -0,0 +1,12 @@
__pycache__/
*.pyc
.pytest_cache/
*.egg-info/
dist/
.venv/
.env
.git/
.github/
tests/
*.md
renovate.json
+7
View File
@@ -0,0 +1,7 @@
__pycache__/
*.pyc
.pytest_cache/
*.egg-info/
dist/
.venv/
.env
+227
View File
@@ -0,0 +1,227 @@
# ReceiptWitness — CartSnitch Receipt Ingestion Service
## Project Context
CartSnitch is a self-hosted grocery price intelligence platform built as a polyrepo microservices architecture. This repo (`cartsnitch/receiptwitness`) is the receipt/purchase history ingestion service.
**GitHub org:** github.com/cartsnitch
**Domain:** cartsnitch.com
### CartSnitch Services
| Repo | Service | Purpose |
|------|---------|---------|
| `cartsnitch/common` | — | Shared models, schemas, utilities |
| `cartsnitch/receiptwitness` | ReceiptWitness | Purchase data ingestion via retailer scrapers (this repo) |
| `cartsnitch/api` | API Gateway | Frontend-facing REST API |
| `cartsnitch/cartsnitch` | Frontend | React PWA (mobile-first) |
| `cartsnitch/stickershock` | StickerShock | Price increase detection & CPI comparison |
| `cartsnitch/shrinkray` | ShrinkRay | Shrinkflation monitoring |
| `cartsnitch/clipartist` | ClipArtist | Coupon/deal watching & shopping optimization |
| `cartsnitch/infra` | — | K8s manifests, Flux kustomizations |
### Architecture Decisions
- **Polyrepo:** Each service has its own repo, Dockerfile, CI/CD pipeline.
- **Shared DB:** One PostgreSQL cluster. This service writes to `purchases`, `purchase_items`, `price_history` tables. Models come from `cartsnitch-common`.
- **Inter-service comms:** REST (synchronous) + Redis pub/sub (async events).
- **Target scale:** 5001,000 users. Each user has their own authenticated sessions to up to 3 retailers.
## What This Service Does
ReceiptWitness authenticates with grocery retailer web portals using per-user sessions, scrapes purchase history / receipt data, parses it into structured records, and writes it to the shared database. After ingestion, it publishes a `cartsnitch.receipts.ingested` event so downstream services (StickerShock, ClipArtist) can react.
### Target Retailers (MVP)
#### Meijer (mPerks)
- **Auth:** No public API. Session cookie-based auth on mperks.meijer.com.
- **Receipt location:** meijer.com/mperks/receipts-savings.html (or underlying XHR endpoints)
- **Approach:** Playwright login → capture session → hit receipt XHR endpoints directly. Map the API calls the frontend makes via browser dev tools network tab.
- **Prior art:** `dapperfu/python_Meijer` (requires MITM proxy for auth — avoid this pattern, prefer direct browser automation).
- **Data available:** Digital receipts appear ~15 minutes after purchase if mPerks ID was used at checkout. Includes item names, prices, discounts, savings.
#### Kroger
- **Auth:** No public API for purchase history (that's behind Partner API). Session cookie-based auth on kroger.com.
- **Receipt location:** kroger.com/mypurchases
- **Approach:** Playwright login → scrape purchase history pages or intercept XHR endpoints.
- **Anti-bot:** Kroger uses Akamai Bot Manager. Aggressive headless browser detection. Need Playwright stealth, realistic fingerprinting, human-like interaction pacing.
- **Prior art:** `phyllis-vance/KrogerScrape` (.NET, old), `callaginn/kroger-sweeper` (Puppeteer/Node), `ThermoMan/Get-Kroger-Grocery-List` (Greasemonkey userscript).
- **Kroger public API:** Free developer account at developer.kroger.com provides product catalog data (`product.compact` scope) — useful for enriching scraped receipt data with UPCs, categories, product images. NOT useful for purchase history.
- **Data available:** Purchase history tied to Kroger Plus loyalty card. Shows items, prices, quantities.
#### Target (Circle)
- **Auth:** Session-based auth on target.com.
- **Receipt location:** target.com account → Orders → In-store tab, or target.com/account/orders
- **Approach:** Playwright login → scrape in-store purchase history.
- **Data available:** ~1 year of history if user paid with a linked card, used the Target app wallet, or entered their Target Circle phone number at checkout. Includes item names, prices.
## Tech Stack
- Python 3.12+
- Playwright (Python async API) for headless browser automation
- FastAPI (lightweight internal API for triggering scrapes, health checks, status)
- SQLAlchemy 2.0 (via `cartsnitch-common`)
- Redis (pub/sub event publishing)
- APScheduler or Celery (for scheduled scraping jobs)
- cryptography / Fernet (encrypting stored session data)
## Repo Structure
```
receiptwitness/
├── CLAUDE.md
├── README.md
├── pyproject.toml
├── Dockerfile # Playwright + Chromium headless
├── docker-compose.yml # Local dev (Postgres, Redis, this service)
├── src/
│ └── receiptwitness/
│ ├── __init__.py
│ ├── config.py # Service-specific settings
│ ├── main.py # FastAPI app + scheduler bootstrap
│ ├── scrapers/
│ │ ├── __init__.py
│ │ ├── base.py # Abstract BaseScraper class
│ │ ├── meijer.py # Meijer/mPerks scraper
│ │ ├── kroger.py # Kroger scraper
│ │ └── target.py # Target/Circle scraper
│ ├── parsers/
│ │ ├── __init__.py
│ │ ├── meijer.py # Parse raw Meijer receipt data → PurchaseItem records
│ │ ├── kroger.py
│ │ └── target.py
│ ├── session/
│ │ ├── __init__.py
│ │ ├── manager.py # Session storage, retrieval, refresh logic
│ │ └── encryption.py # Encrypt/decrypt session cookies at rest
│ ├── scheduler.py # Scrape scheduling (per-user cron jobs)
│ ├── events.py # Publish receipt.ingested events to Redis
│ ├── api/
│ │ ├── __init__.py
│ │ ├── routes.py # Internal API: trigger scrape, check status, health
│ │ └── auth.py # Internal service auth (API key or JWT)
│ └── enrichment.py # Optional: enrich receipt data via Kroger public API
└── tests/
├── conftest.py
├── fixtures/ # Sample receipt HTML/JSON for testing parsers
│ ├── meijer_receipt.json
│ ├── kroger_receipt.html
│ └── target_receipt.html
├── test_scrapers/
├── test_parsers/
└── test_session/
```
## Scraper Architecture
### Base Scraper Pattern
```python
class BaseScraper(ABC):
"""All retailer scrapers implement this interface."""
@abstractmethod
async def login(self, credentials: UserStoreAccount) -> SessionData: ...
@abstractmethod
async def check_session(self, session: SessionData) -> bool: ...
@abstractmethod
async def scrape_receipts(self, session: SessionData, since: datetime | None) -> list[RawReceipt]: ...
@abstractmethod
def parse_receipt(self, raw: RawReceipt) -> tuple[Purchase, list[PurchaseItem]]: ...
```
### Scraping Flow
1. **Scheduler fires** for a user+store combination
2. **Load session** from `user_store_accounts` table (encrypted)
3. **Check session validity** — quick lightweight request to verify auth
4. **If expired:** launch Playwright, re-authenticate, save new session
5. **Scrape receipts** since `last_sync_at` timestamp
6. **Parse** raw data into `Purchase` and `PurchaseItem` records
7. **Deduplicate** — skip receipts already in DB (match on `receipt_id` per store)
8. **Write to DB** — insert new purchases and items
9. **Derive price_history** entries from purchase_items
10. **Publish event**`cartsnitch.receipts.ingested` to Redis
11. **Update** `user_store_accounts.last_sync_at`
### Session Management
- Sessions (cookies, tokens) are encrypted at rest using Fernet symmetric encryption.
- The encryption key is provided via environment variable, not stored in the DB.
- Sessions are stored in the `user_store_accounts` table as encrypted JSONB.
- Each scrape attempt first checks if the existing session is valid before launching a full Playwright browser instance.
- When a session expires, the service needs the user's stored credentials OR a manual re-auth flow (the user logs in via the frontend, and we capture the session).
### Anti-Bot Considerations
- Use `playwright-stealth` or equivalent to mask automation signals.
- Set realistic viewport sizes, user agents, and locale settings.
- Add human-like delays between page navigations (randomized 1-5 seconds).
- For Kroger specifically (Akamai Bot Manager): may need to use non-headless mode on initial auth, or route through a persistent browser profile that has established trust.
- Rate limit scraping: no more than 1 scrape per user per store per hour. Default cadence: once daily.
- Store and reuse browser profiles/cookies to minimize fresh logins.
### Dockerfile
The Dockerfile must include Playwright and Chromium. Base image pattern:
```dockerfile
FROM mcr.microsoft.com/playwright/python:v1.49.0-noble
# Install deps, copy code, etc.
```
This is a large image (~2GB) due to Chromium. Consider multi-stage builds if the final image can be slimmed down.
## Internal API Endpoints
This service exposes a lightweight internal API (not public-facing):
- `GET /health` — health check
- `GET /status/{user_id}` — sync status per store for a user
- `POST /scrape/{user_id}/{store_slug}` — trigger an immediate scrape for a user+store
- `POST /scrape/{user_id}/all` — trigger scrape across all configured stores
- `GET /sessions/{user_id}` — list configured store sessions and their status
The public-facing API gateway (`cartsnitch/api`) proxies user-facing requests to this service's internal API.
## Events Published
### `cartsnitch.receipts.ingested`
Published after new receipt data is successfully written to the DB.
```json
{
"event_type": "cartsnitch.receipts.ingested",
"timestamp": "2026-03-15T12:00:00Z",
"service": "receiptwitness",
"payload": {
"user_id": "uuid",
"store_slug": "meijer",
"purchase_id": "uuid",
"purchase_date": "2026-03-14",
"item_count": 23,
"total": 87.42
}
}
```
## Development Workflow
- **Never push directly to main.** Always create feature branches and open PRs.
- Branch naming: `feature/<store>/<description>` or `fix/<description>`
- Use conventional commits: `feat:`, `fix:`, `refactor:`, `docs:`, `chore:`
- Test parsers with fixture data (sample receipts in `tests/fixtures/`). Scraper integration tests require real credentials and should be tagged/skipped in CI.
- Local dev: `docker-compose up` starts Postgres, Redis, and the service. Playwright runs inside the container.
## Important Notes
- The Playwright container image is large. On K8s, consider using a dedicated node or tolerating scheduling delays.
- Each user needs their own authenticated sessions. At 1,000 users × 3 stores = 3,000 sessions to manage. Sessions expire at different rates per retailer.
- Scraping must be respectful: randomized intervals, rate limiting, no parallel scraping of the same store for the same user.
- Receipt data structure varies significantly between retailers. The parsers must be robust and handle edge cases (returns, voided items, weighted produce, BOGO items, coupon stacking).
- Kroger's public API (`product.compact` scope) can be used to enrich scraped data with UPCs and product metadata after receipt parsing. This is optional but improves product normalization downstream.
- Store credentials for users should ideally NOT be stored by CartSnitch. Prefer a flow where the user authenticates in a controlled browser session, and we capture/store only the resulting session cookies. If credential storage is necessary, use strong encryption and make the tradeoffs clear to users.
+67
View File
@@ -0,0 +1,67 @@
# Stage 1: Build dependencies
FROM python:3.12-slim AS build
WORKDIR /app
# git is required to install cartsnitch-common from GitHub; build-essential and
# libpq-dev are needed to compile any C-extension wheels (e.g. psycopg2 fallback)
RUN apt-get update && apt-get install -y --no-install-recommends \
git \
libpq-dev \
build-essential \
&& rm -rf /var/lib/apt/lists/*
COPY pyproject.toml ./
COPY src/ ./src/
# cartsnitch-common is not on PyPI — install it directly from GitHub, then
# install the rest of the package dependencies in a single resolver pass so
# pip can satisfy the cartsnitch-common>=0.1.0 constraint declared in
# pyproject.toml without hitting PyPI for it.
RUN pip install --no-cache-dir --prefix=/install \
"cartsnitch-common @ git+https://github.com/cartsnitch/common.git@76685ed0384103228cd670b477b967e7752ebe6b" \
.
# Stage 2: Production image with Playwright + Chromium
FROM python:3.12-slim AS prod
WORKDIR /app
# Install Playwright system dependencies for Chromium
RUN apt-get update && apt-get install -y --no-install-recommends \
libnss3 \
libatk1.0-0 \
libatk-bridge2.0-0 \
libcups2 \
libdrm2 \
libxkbcommon0 \
libxcomposite1 \
libxdamage1 \
libxrandr2 \
libgbm1 \
libpango-1.0-0 \
libcairo2 \
libasound2 \
libxshmfence1 \
libx11-xcb1 \
libxcb-dri3-0 \
fonts-liberation \
&& rm -rf /var/lib/apt/lists/*
RUN adduser --system --group --uid 1000 app
COPY --from=build /install /usr/local
COPY src/ ./src/
# Install Playwright Chromium browser (runs as root; /opt/playwright is world-readable)
RUN PLAYWRIGHT_BROWSERS_PATH=/opt/playwright playwright install chromium
ENV PLAYWRIGHT_BROWSERS_PATH=/opt/playwright
USER 1000
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=3s \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"
CMD ["uvicorn", "receiptwitness.main:app", "--host", "0.0.0.0", "--port", "8000"]
+4
View File
@@ -0,0 +1,4 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": ["local>cartsnitch/.github:renovate-config"]
}
+1
View File
@@ -0,0 +1 @@
"""ReceiptWitness — CartSnitch receipt ingestion service."""
+1
View File
@@ -0,0 +1 @@
"""Internal API for ReceiptWitness service."""
+10
View File
@@ -0,0 +1,10 @@
"""Internal API routes for triggering scrapes and checking status."""
from fastapi import APIRouter
router = APIRouter()
@router.get("/health")
async def health():
return {"status": "ok", "service": "receiptwitness"}
+26
View File
@@ -0,0 +1,26 @@
"""Service-specific configuration for ReceiptWitness."""
from pydantic_settings import BaseSettings
class ReceiptWitnessSettings(BaseSettings):
model_config = {"env_prefix": "RW_"}
# Inherited from cartsnitch-common
database_url: str = "postgresql+asyncpg://cartsnitch:cartsnitch@localhost:5432/cartsnitch"
redis_url: str = "redis://localhost:6379/0"
# Session encryption
session_encryption_key: str = ""
# Scraping defaults
scrape_interval_seconds: int = 86400 # 24 hours
min_request_delay_ms: int = 1000
max_request_delay_ms: int = 5000
# Playwright
headless: bool = True
browser_timeout_ms: int = 60000
settings = ReceiptWitnessSettings()
+75
View File
@@ -0,0 +1,75 @@
"""Publish receipt ingestion events to Redis/DragonflyDB pub/sub."""
import json
import logging
from datetime import UTC, datetime
from decimal import Decimal
import redis.asyncio as aioredis
from receiptwitness.config import settings
logger = logging.getLogger(__name__)
CHANNEL_RECEIPTS_INGESTED = "cartsnitch.receipts.ingested"
# Module-level connection pool — shared across all publish calls
_pool: aioredis.ConnectionPool | None = None
class _DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, Decimal):
return float(o)
return super().default(o)
def _get_pool() -> aioredis.ConnectionPool:
"""Get or create the shared Redis connection pool."""
global _pool
if _pool is None:
_pool = aioredis.ConnectionPool.from_url(
settings.redis_url, decode_responses=True, max_connections=10
)
return _pool
async def get_redis_client() -> aioredis.Redis:
"""Create an async Redis/DragonflyDB client with connection pooling."""
return aioredis.Redis(connection_pool=_get_pool())
async def publish_receipt_ingested(
user_id: str,
store_slug: str,
purchase_id: str,
purchase_date: str,
item_count: int,
total: Decimal | float,
) -> None:
"""Publish a cartsnitch.receipts.ingested event after successful ingestion."""
event = {
"event_type": CHANNEL_RECEIPTS_INGESTED,
"timestamp": datetime.now(UTC).isoformat(),
"service": "receiptwitness",
"payload": {
"user_id": user_id,
"store_slug": store_slug,
"purchase_id": purchase_id,
"purchase_date": purchase_date,
"item_count": item_count,
"total": float(total) if isinstance(total, Decimal) else total,
},
}
try:
client = await get_redis_client()
await client.publish(CHANNEL_RECEIPTS_INGESTED, json.dumps(event, cls=_DecimalEncoder))
logger.info(
"Published %s event for purchase %s",
CHANNEL_RECEIPTS_INGESTED,
purchase_id,
)
except aioredis.ConnectionError:
logger.error("Failed to publish event — Redis/DragonflyDB connection error")
raise
+8
View File
@@ -0,0 +1,8 @@
"""FastAPI app entrypoint for ReceiptWitness."""
from fastapi import FastAPI
from receiptwitness.api.routes import router
app = FastAPI(title="ReceiptWitness", version="0.1.0")
app.include_router(router)
+1
View File
@@ -0,0 +1 @@
"""Receipt parsers for each retailer."""
+148
View File
@@ -0,0 +1,148 @@
"""Kroger receipt parser.
Transforms raw Kroger receipt JSON into the common PurchaseCreate schema.
Kroger receipt data uses different field names than Meijer — this parser
handles Kroger-specific naming conventions and receipt structure.
"""
import logging
from decimal import Decimal, InvalidOperation
from receiptwitness.scrapers.base import RawReceipt
logger = logging.getLogger(__name__)
def _to_decimal(value, default: str = "0") -> Decimal:
"""Safely convert a value to Decimal."""
if value is None:
return Decimal(default)
try:
return Decimal(str(value))
except (InvalidOperation, ValueError, TypeError):
return Decimal(default)
def _parse_item(item: dict) -> dict:
"""Parse a single line item from a Kroger receipt.
Kroger items typically include fields like:
- description / itemDescription / productName
- upc / krogerProductId
- quantity / qty
- basePrice / unitPrice / price
- totalPrice / extendedAmount / lineTotal
- regularPrice / originalPrice
- salePrice / promoPrice
- couponAmount / couponSavings
- loyaltyDiscount / fuelPointsDiscount / plusCardSavings
- department / category / aisle
"""
description = (
item.get("description")
or item.get("itemDescription")
or item.get("productName")
or item.get("name")
or "UNKNOWN ITEM"
)
quantity = _to_decimal(item.get("quantity", item.get("qty", item.get("quantitySold", 1))), "1")
unit_price = _to_decimal(item.get("basePrice", item.get("unitPrice", item.get("price", 0))))
extended_price = _to_decimal(
item.get("totalPrice", item.get("extendedAmount", item.get("lineTotal")))
)
# Compute extended_price if not provided
if extended_price == Decimal("0") and unit_price != Decimal("0"):
extended_price = unit_price * quantity
regular_price = item.get("regularPrice", item.get("originalPrice"))
sale_price = item.get("salePrice", item.get("promoPrice"))
coupon_discount = item.get(
"couponAmount", item.get("couponSavings", item.get("couponDiscount"))
)
loyalty_discount = item.get(
"plusCardSavings",
item.get("loyaltyDiscount", item.get("fuelPointsDiscount")),
)
# UPC handling — Kroger may use krogerProductId or upc
upc = item.get("upc", item.get("UPC", item.get("krogerProductId")))
if upc:
upc = str(upc).strip().lstrip("0") or None
category = item.get("department", item.get("category", item.get("aisle")))
# Weight info for produce/deli items
weight = item.get("weight", item.get("netWeight"))
extra = {}
if weight is not None:
extra["weight"] = str(weight)
weight_uom = item.get("weightUom", item.get("unitOfMeasure"))
if weight_uom:
extra["weight_uom"] = weight_uom
result = {
"product_name_raw": description.strip(),
"upc": upc,
"quantity": quantity,
"unit_price": unit_price,
"extended_price": extended_price,
"regular_price": (_to_decimal(regular_price) if regular_price is not None else None),
"sale_price": (_to_decimal(sale_price) if sale_price is not None else None),
"coupon_discount": (_to_decimal(coupon_discount) if coupon_discount is not None else None),
"loyalty_discount": (
_to_decimal(loyalty_discount) if loyalty_discount is not None else None
),
"category_raw": category.strip() if category else None,
}
return result
def parse_kroger_receipt(raw: RawReceipt) -> dict:
"""Parse a RawReceipt from Kroger into a PurchaseCreate-compatible dict."""
data = raw.raw_data
detail = data.get("detail", {})
# Parse items — Kroger uses "items" or "lineItems" or "receiptItems"
raw_items = detail.get("items", detail.get("lineItems", detail.get("receiptItems", [])))
items = []
for raw_item in raw_items:
# Skip voided / returned items
if raw_item.get("voided") or raw_item.get("status") in (
"VOIDED",
"RETURNED",
):
logger.debug("Skipping voided/returned item: %s", raw_item.get("description"))
continue
if raw_item.get("returnFlag") or raw_item.get("isReturn"):
logger.debug("Skipping returned item: %s", raw_item.get("description"))
continue
items.append(_parse_item(raw_item))
# Parse totals — Kroger uses various field names
total = _to_decimal(
detail.get(
"total",
data.get("total", data.get("orderTotal", data.get("grandTotal", 0))),
)
)
subtotal = detail.get("subtotal", data.get("subtotal", data.get("subTotal")))
tax = detail.get("tax", data.get("tax", data.get("salesTax")))
savings = detail.get(
"totalSavings",
data.get("savings", data.get("totalDiscount", data.get("youSaved"))),
)
return {
"receipt_id": raw.receipt_id,
"purchase_date": raw.purchase_date,
"total": total,
"subtotal": _to_decimal(subtotal) if subtotal is not None else None,
"tax": _to_decimal(tax) if tax is not None else None,
"savings_total": _to_decimal(savings) if savings is not None else None,
"source_url": raw.source_url,
"raw_data": data,
"items": items,
}
+138
View File
@@ -0,0 +1,138 @@
"""Parse raw Meijer mPerks receipt data into PurchaseCreate-compatible dicts.
The mPerks receipt JSON structure (reverse-engineered from their SPA)
typically looks like:
Transaction listing:
{
"transactions": [
{
"transactionId": "12345",
"transactionDate": "2026-03-10T14:30:00Z",
"storeNumber": "123",
"total": 87.42,
"savings": 12.50
}
]
}
Receipt detail:
{
"receiptId": "12345",
"items": [
{
"description": "ORGANIC BANANAS",
"upc": "0000000004011",
"quantity": 1,
"price": 0.69,
"extendedPrice": 0.69,
"regularPrice": 0.79,
"salePrice": 0.69,
"couponDiscount": 0.0,
"mperksDiscount": 0.10,
"category": "PRODUCE"
}
],
"subtotal": 74.92,
"tax": 5.24,
"total": 87.42,
"totalSavings": 12.50
}
"""
import logging
from decimal import Decimal, InvalidOperation
from receiptwitness.scrapers.base import RawReceipt
logger = logging.getLogger(__name__)
def _to_decimal(value, default: str = "0") -> Decimal:
"""Safely convert a value to Decimal."""
if value is None:
return Decimal(default)
try:
return Decimal(str(value))
except (InvalidOperation, ValueError, TypeError):
return Decimal(default)
def _parse_item(item: dict) -> dict:
"""Parse a single line item from Meijer receipt detail."""
description = (
item.get("description") or item.get("itemDescription") or item.get("name") or "UNKNOWN ITEM"
)
quantity = _to_decimal(item.get("quantity", item.get("qty", 1)), "1")
unit_price = _to_decimal(item.get("price", item.get("unitPrice", 0)))
extended_price = _to_decimal(item.get("extendedPrice", item.get("totalPrice")))
# If extended_price wasn't provided, compute it
if extended_price == Decimal("0") and unit_price != Decimal("0"):
extended_price = unit_price * quantity
regular_price = item.get("regularPrice")
sale_price = item.get("salePrice")
coupon_discount = item.get("couponDiscount", item.get("couponSavings"))
loyalty_discount = item.get("mperksDiscount", item.get("loyaltyDiscount"))
upc = item.get("upc", item.get("UPC"))
if upc:
upc = str(upc).strip().lstrip("0") or None
category = item.get("category", item.get("departmentDescription"))
return {
"product_name_raw": description.strip(),
"upc": upc,
"quantity": quantity,
"unit_price": unit_price,
"extended_price": extended_price,
"regular_price": _to_decimal(regular_price) if regular_price is not None else None,
"sale_price": _to_decimal(sale_price) if sale_price is not None else None,
"coupon_discount": (_to_decimal(coupon_discount) if coupon_discount is not None else None),
"loyalty_discount": (
_to_decimal(loyalty_discount) if loyalty_discount is not None else None
),
"category_raw": category.strip() if category else None,
}
def parse_meijer_receipt(raw: RawReceipt) -> dict:
"""Parse a RawReceipt from Meijer into a PurchaseCreate-compatible dict.
Returns a dict with keys matching PurchaseCreate schema fields.
The caller is responsible for setting store_id and store_location_id
from the store registry.
"""
data = raw.raw_data
detail = data.get("detail", {})
# Parse items from the detail response
raw_items = detail.get("items", detail.get("lineItems", []))
items = []
for raw_item in raw_items:
# Skip voided items
if raw_item.get("voided") or raw_item.get("status") == "VOIDED":
logger.debug("Skipping voided item: %s", raw_item.get("description"))
continue
items.append(_parse_item(raw_item))
# Parse totals
total = _to_decimal(detail.get("total", data.get("total", data.get("transactionTotal", 0))))
subtotal = detail.get("subtotal", data.get("subtotal"))
tax = detail.get("tax", data.get("tax"))
savings = detail.get("totalSavings", data.get("savings", data.get("totalDiscount")))
return {
"receipt_id": raw.receipt_id,
"purchase_date": raw.purchase_date,
"total": total,
"subtotal": _to_decimal(subtotal) if subtotal is not None else None,
"tax": _to_decimal(tax) if tax is not None else None,
"savings_total": _to_decimal(savings) if savings is not None else None,
"source_url": raw.source_url,
"raw_data": data,
"items": items,
}
+191
View File
@@ -0,0 +1,191 @@
"""Target Circle receipt parser.
Transforms raw Target in-store receipt JSON into the common PurchaseCreate schema.
Target receipt data includes Circle pricing, BOGO deals, and Circle rewards
discounts that need special handling.
Target receipt detail structure (reverse-engineered from target.com SPA):
{
"orderId": "TGT-2026-0315-7890",
"items": [
{
"description": "GOOD & GATHER WHOLE MILK GAL",
"tcin": "14767459",
"upc": "0085239100123",
"quantity": 1,
"unitPrice": 3.89,
"totalPrice": 3.89,
"regularPrice": 4.19,
"circlePrice": 3.89,
"couponDiscount": 0.0,
"circleRewardsDiscount": 0.30,
"promoDescription": "Circle offer: Save 30c",
"department": "GROCERY"
}
],
"subtotal": 78.32,
"tax": 4.89,
"total": 83.21,
"totalSavings": 11.45
}
"""
import logging
from decimal import Decimal, InvalidOperation
from receiptwitness.scrapers.base import RawReceipt
logger = logging.getLogger(__name__)
def _to_decimal(value, default: str = "0") -> Decimal:
"""Safely convert a value to Decimal."""
if value is None:
return Decimal(default)
try:
return Decimal(str(value))
except (InvalidOperation, ValueError, TypeError):
return Decimal(default)
def _parse_item(item: dict) -> dict:
"""Parse a single line item from a Target receipt.
Target items may include fields like:
- description / itemDescription / productName
- tcin (Target internal product ID) / upc / dpci
- quantity / qty
- unitPrice / price
- totalPrice / extendedPrice / lineTotal
- regularPrice / originalPrice
- circlePrice / salePrice / promoPrice
- couponDiscount / couponSavings
- circleRewardsDiscount / circleDiscount / loyaltyDiscount
- promoDescription / offerDescription (e.g. "BOGO 50% off", "Circle offer")
- department / category
"""
description = (
item.get("description")
or item.get("itemDescription")
or item.get("productName")
or item.get("name")
or "UNKNOWN ITEM"
)
quantity = _to_decimal(item.get("quantity", item.get("qty", item.get("quantitySold", 1))), "1")
unit_price = _to_decimal(item.get("unitPrice", item.get("price", item.get("basePrice", 0))))
extended_price = _to_decimal(
item.get("totalPrice", item.get("extendedPrice", item.get("lineTotal")))
)
# Compute extended_price if not provided
if extended_price == Decimal("0") and unit_price != Decimal("0"):
extended_price = unit_price * quantity
regular_price = item.get("regularPrice", item.get("originalPrice"))
# Target Circle pricing — circlePrice takes precedence over generic salePrice
sale_price = item.get("circlePrice", item.get("salePrice", item.get("promoPrice")))
coupon_discount = item.get(
"couponDiscount", item.get("couponSavings", item.get("couponAmount"))
)
# Circle rewards / loyalty discount
loyalty_discount = item.get(
"circleRewardsDiscount",
item.get("circleDiscount", item.get("loyaltyDiscount")),
)
# UPC handling — Target may use tcin, upc, or dpci
upc = item.get("upc", item.get("UPC"))
if upc:
upc = str(upc).strip().lstrip("0") or None
# Target also has TCIN (Target.com Item Number) and DPCI (Department/Class/Item)
tcin = item.get("tcin", item.get("TCIN"))
dpci = item.get("dpci", item.get("DPCI"))
category = item.get("department", item.get("category"))
# Capture promo/deal description for BOGO and Circle offers
promo_description = item.get("promoDescription", item.get("offerDescription"))
# Weight info for produce/deli items
weight = item.get("weight", item.get("netWeight"))
extra: dict = {}
if weight is not None:
extra["weight"] = str(weight)
weight_uom = item.get("weightUom", item.get("unitOfMeasure"))
if weight_uom:
extra["weight_uom"] = weight_uom
if tcin:
extra["tcin"] = str(tcin)
if dpci:
extra["dpci"] = str(dpci)
if promo_description:
extra["promo_description"] = promo_description
result: dict = {
"product_name_raw": description.strip(),
"upc": upc,
"quantity": quantity,
"unit_price": unit_price,
"extended_price": extended_price,
"regular_price": _to_decimal(regular_price) if regular_price is not None else None,
"sale_price": _to_decimal(sale_price) if sale_price is not None else None,
"coupon_discount": (_to_decimal(coupon_discount) if coupon_discount is not None else None),
"loyalty_discount": (
_to_decimal(loyalty_discount) if loyalty_discount is not None else None
),
"category_raw": category.strip() if category else None,
}
return result
def parse_target_receipt(raw: RawReceipt) -> dict:
"""Parse a RawReceipt from Target into a PurchaseCreate-compatible dict."""
data = raw.raw_data
detail = data.get("detail", {})
# Parse items — Target uses "items" or "lineItems"
raw_items = detail.get("items", detail.get("lineItems", []))
items = []
for raw_item in raw_items:
# Skip voided / returned items
if raw_item.get("voided") or raw_item.get("status") in (
"VOIDED",
"RETURNED",
"CANCELLED",
):
logger.debug("Skipping voided/returned item: %s", raw_item.get("description"))
continue
if raw_item.get("returnFlag") or raw_item.get("isReturn"):
logger.debug("Skipping returned item: %s", raw_item.get("description"))
continue
items.append(_parse_item(raw_item))
# Parse totals
total = _to_decimal(
detail.get(
"total",
data.get("total", data.get("orderTotal", data.get("grandTotal", 0))),
)
)
subtotal = detail.get("subtotal", data.get("subtotal", data.get("subTotal")))
tax = detail.get("tax", data.get("tax", data.get("salesTax")))
savings = detail.get(
"totalSavings",
data.get("savings", data.get("totalDiscount", data.get("circleSavings"))),
)
return {
"receipt_id": raw.receipt_id,
"purchase_date": raw.purchase_date,
"total": total,
"subtotal": _to_decimal(subtotal) if subtotal is not None else None,
"tax": _to_decimal(tax) if tax is not None else None,
"savings_total": _to_decimal(savings) if savings is not None else None,
"source_url": raw.source_url,
"raw_data": data,
"items": items,
}
+30
View File
@@ -0,0 +1,30 @@
"""Receipt & product matching pipeline — receipt normalization and product dedup."""
from receiptwitness.pipeline.matching import (
ConfidenceLevel,
ProductMatcher,
match_purchase_item,
)
from receiptwitness.pipeline.normalization import (
MatchMethod,
MatchResult,
clean_name,
extract_size_info,
jaccard_similarity,
normalize_product,
)
from receiptwitness.pipeline.receipt import normalize_receipt, parse_meijer_item
__all__ = [
"ConfidenceLevel",
"MatchMethod",
"MatchResult",
"ProductMatcher",
"clean_name",
"extract_size_info",
"jaccard_similarity",
"match_purchase_item",
"normalize_product",
"normalize_receipt",
"parse_meijer_item",
]
+136
View File
@@ -0,0 +1,136 @@
"""Product matching & dedup — UPC primary, fuzzy name fallback, confidence scoring.
Wraps the Phase 1 normalization module with confidence-level classification
and batch matching for purchase ingestion.
"""
import uuid
from dataclasses import dataclass
from cartsnitch_common.constants import MatchConfidence
from cartsnitch_common.models.product import NormalizedProduct
from cartsnitch_common.schemas.purchase import PurchaseItemCreate
from sqlalchemy.orm import Session
from receiptwitness.pipeline.normalization import (
MatchMethod,
MatchResult,
extract_size_info,
normalize_product,
)
# Re-export for convenience
ConfidenceLevel = MatchConfidence
@dataclass(frozen=True)
class MatchOutcome:
"""Result of matching a single purchase item to a normalized product."""
item_index: int
match: MatchResult | None
confidence_level: MatchConfidence
created_new: bool = False
def classify_confidence(score: float, method: MatchMethod) -> MatchConfidence:
"""Classify a match score into high/medium/low confidence."""
if method == MatchMethod.UPC:
return MatchConfidence.HIGH
# Name-based matching thresholds
if score >= 0.8:
return MatchConfidence.HIGH
if score >= 0.5:
return MatchConfidence.MEDIUM
return MatchConfidence.LOW
def _create_product_from_item(
session: Session,
item: PurchaseItemCreate,
) -> NormalizedProduct:
"""Create a new NormalizedProduct from a purchase item that had no match."""
size_info = extract_size_info(item.product_name_raw)
product = NormalizedProduct(
id=uuid.uuid4(),
canonical_name=item.product_name_raw,
size=size_info[0] if size_info else None,
size_unit=size_info[1] if size_info else None,
upc_variants=[item.upc] if item.upc else [],
)
session.add(product)
session.flush()
return product
class ProductMatcher:
"""Batch product matcher for purchase ingestion.
Usage:
matcher = ProductMatcher(session)
outcomes = matcher.match_items(items)
"""
def __init__(
self,
session: Session,
name_threshold: float = 0.4,
auto_create: bool = True,
):
self.session = session
self.name_threshold = name_threshold
self.auto_create = auto_create
def match_single(
self,
item: PurchaseItemCreate,
) -> tuple[NormalizedProduct | None, MatchResult | None, MatchConfidence]:
"""Match a single purchase item to a normalized product.
Returns (product, match_result, confidence_level).
If auto_create is True and no match found, creates a new product.
"""
result = normalize_product(
self.session,
item.product_name_raw,
upc=item.upc,
name_threshold=self.name_threshold,
)
if result:
confidence = classify_confidence(result.confidence, result.method)
return result.product, result, confidence
if self.auto_create:
product = _create_product_from_item(self.session, item)
return product, None, MatchConfidence.LOW
return None, None, MatchConfidence.LOW
def match_items(self, items: list[PurchaseItemCreate]) -> list[MatchOutcome]:
"""Match a batch of purchase items. Returns outcomes in order."""
outcomes: list[MatchOutcome] = []
for idx, item in enumerate(items):
product, result, confidence = self.match_single(item)
created = result is None and product is not None
outcomes.append(
MatchOutcome(
item_index=idx,
match=result,
confidence_level=confidence,
created_new=created,
)
)
return outcomes
def match_purchase_item(
session: Session,
item: PurchaseItemCreate,
name_threshold: float = 0.4,
auto_create: bool = True,
) -> tuple[NormalizedProduct | None, MatchConfidence]:
"""Convenience function: match a single item, return (product, confidence)."""
matcher = ProductMatcher(session, name_threshold=name_threshold, auto_create=auto_create)
product, _, confidence = matcher.match_single(item)
return product, confidence
@@ -0,0 +1,155 @@
"""Product normalization — Phase 1: UPC matching + fuzzy name matching.
Matches products across retailers by:
1. Exact UPC match (highest confidence)
2. Fuzzy name matching via token-based Jaccard similarity (lower confidence)
"""
import re
from dataclasses import dataclass
from enum import StrEnum
from cartsnitch_common.models.product import NormalizedProduct
from sqlalchemy import select
from sqlalchemy.orm import Session
class MatchMethod(StrEnum):
"""How a product match was determined."""
UPC = "upc"
NAME = "name"
@dataclass(frozen=True)
class MatchResult:
"""Result of a product normalization attempt."""
product: NormalizedProduct
confidence: float
method: MatchMethod
# Noise words stripped during name cleaning
_NOISE_WORDS = frozenset(
{
"the",
"a",
"an",
"and",
"or",
"of",
"with",
"in",
"for",
"to",
"brand",
"original",
"classic",
"new",
"improved",
}
)
# Regex for extracting size info (e.g., "16 oz", "1.5 lb", "12 ct")
_SIZE_PATTERN = re.compile(
r"(\d+(?:\.\d+)?)\s*(oz|fl\s*oz|lb|lbs|g|kg|ml|l|ct|pk|count|pack)\b",
re.IGNORECASE,
)
def clean_name(name: str) -> str:
"""Normalize a product name for comparison.
- Lowercase
- Remove size info (e.g., "16 oz")
- Strip noise words
- Collapse whitespace
"""
cleaned = name.lower()
cleaned = _SIZE_PATTERN.sub("", cleaned)
cleaned = re.sub(r"[^\w\s]", " ", cleaned)
tokens = cleaned.split()
tokens = [t for t in tokens if t not in _NOISE_WORDS]
return " ".join(tokens)
def extract_size_info(name: str) -> tuple[str, str] | None:
"""Extract (size, unit) from a product name, if present."""
match = _SIZE_PATTERN.search(name)
if match:
return match.group(1), match.group(2).lower().replace(" ", "_")
return None
def jaccard_similarity(a: str, b: str) -> float:
"""Token-based Jaccard similarity between two cleaned names."""
tokens_a = set(a.split())
tokens_b = set(b.split())
if not tokens_a or not tokens_b:
return 0.0
intersection = tokens_a & tokens_b
union = tokens_a | tokens_b
return len(intersection) / len(union)
def match_by_upc(session: Session, upc: str) -> MatchResult | None:
"""Find a normalized product by exact UPC match.
Loads products with upc_variants and checks membership in Python
for cross-database compatibility (works on both PostgreSQL and SQLite).
"""
# TODO: Use PostgreSQL JSON containment query (@>) for production.
# Current approach loads all products into memory — acceptable for tests
# and small datasets, but will not scale.
stmt = select(NormalizedProduct).where(NormalizedProduct.upc_variants.is_not(None))
products = session.execute(stmt).scalars().all()
for product in products:
if product.upc_variants and upc in product.upc_variants:
return MatchResult(product=product, confidence=1.0, method=MatchMethod.UPC)
return None
def match_by_name(
session: Session,
name: str,
threshold: float = 0.5,
) -> MatchResult | None:
"""Find the best normalized product by fuzzy name matching.
Loads all normalized products and computes Jaccard similarity.
Returns the best match above the threshold, or None.
"""
# TODO: Use pg_trgm similarity index for production.
# Current approach loads all products into memory — acceptable for tests
# and small datasets, but will not scale.
cleaned = clean_name(name)
stmt = select(NormalizedProduct)
products = session.execute(stmt).scalars().all()
best_match: NormalizedProduct | None = None
best_score = 0.0
for product in products:
score = jaccard_similarity(cleaned, clean_name(product.canonical_name))
if score > best_score and score >= threshold:
best_score = score
best_match = product
if best_match:
return MatchResult(product=best_match, confidence=best_score, method=MatchMethod.NAME)
return None
def normalize_product(
session: Session,
name: str,
upc: str | None = None,
name_threshold: float = 0.5,
) -> MatchResult | None:
"""Full normalization pipeline: UPC first, then fuzzy name fallback."""
if upc:
result = match_by_upc(session, upc)
if result:
return result
return match_by_name(session, name, threshold=name_threshold)
+144
View File
@@ -0,0 +1,144 @@
"""Receipt normalization — parse raw Meijer scraper output into purchase records.
Maps raw receipt fields, cleans product names, extracts quantities/units.
"""
import re
from datetime import date
from decimal import Decimal, InvalidOperation
from cartsnitch_common.schemas.purchase import PurchaseCreate, PurchaseItemCreate
def _clean_product_name(raw: str) -> str:
"""Clean raw product name from scraper output."""
cleaned = raw.strip()
# Remove leading/trailing non-alphanumeric chars
cleaned = re.sub(r"^\W+|\W+$", "", cleaned)
# Collapse internal whitespace
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned
def _safe_decimal(
value: str | float | int | Decimal | None,
default: Decimal = Decimal("0"),
) -> Decimal:
"""Safely convert a value to Decimal."""
if value is None:
return default
try:
return Decimal(str(value))
except (InvalidOperation, ValueError):
return default
def parse_meijer_item(raw_item: dict) -> PurchaseItemCreate:
"""Parse a single Meijer scraper line item into a PurchaseItemCreate.
Expected raw_item keys (from Meijer scraper):
- description / name: product name
- upc / upcCode: UPC barcode
- quantity / qty: number of units
- unitPrice / price: per-unit price
- extendedPrice / totalPrice: line total
- regularPrice: shelf price before discounts
- salePrice: sale price if applicable
- couponAmount / couponDiscount: coupon savings
- loyaltyAmount / loyaltyDiscount: loyalty savings
- category / department: raw category
"""
name = raw_item.get("description") or raw_item.get("name") or ""
cleaned_name = _clean_product_name(name)
upc = raw_item.get("upc") or raw_item.get("upcCode")
if upc:
upc = str(upc).strip().lstrip("0") or str(upc).strip()
qty = _safe_decimal(
raw_item.get("quantity") or raw_item.get("qty"),
default=Decimal("1"),
)
unit_price = _safe_decimal(raw_item.get("unitPrice") or raw_item.get("price"))
extended = _safe_decimal(raw_item.get("extendedPrice") or raw_item.get("totalPrice"))
if extended == Decimal("0") and unit_price > 0:
extended = unit_price * qty
regular = raw_item.get("regularPrice")
sale = raw_item.get("salePrice")
coupon = raw_item.get("couponAmount") or raw_item.get("couponDiscount")
loyalty = raw_item.get("loyaltyAmount") or raw_item.get("loyaltyDiscount")
category = raw_item.get("category") or raw_item.get("department")
return PurchaseItemCreate(
product_name_raw=cleaned_name,
upc=upc,
quantity=qty,
unit_price=unit_price,
extended_price=extended,
regular_price=_safe_decimal(regular) if regular is not None else None,
sale_price=_safe_decimal(sale) if sale is not None else None,
coupon_discount=_safe_decimal(coupon) if coupon is not None else None,
loyalty_discount=_safe_decimal(loyalty) if loyalty is not None else None,
category_raw=str(category).strip() if category else None,
)
def normalize_receipt(
raw_receipt: dict,
user_id: str,
store_id: str,
) -> PurchaseCreate:
"""Parse a complete Meijer raw receipt into a PurchaseCreate.
Expected raw_receipt keys:
- receiptId / receipt_id / id: unique receipt identifier
- date / purchaseDate / purchase_date: purchase date (YYYY-MM-DD or similar)
- total / totalAmount: receipt total
- subtotal: pre-tax subtotal
- tax / taxAmount: tax amount
- savings / totalSavings: total discount savings
- items: list of raw line item dicts
"""
import uuid
receipt_id = str(
raw_receipt.get("receiptId")
or raw_receipt.get("receipt_id")
or raw_receipt.get("id")
or uuid.uuid4()
)
raw_date = (
raw_receipt.get("date")
or raw_receipt.get("purchaseDate")
or raw_receipt.get("purchase_date")
)
if isinstance(raw_date, str):
purchase_date = date.fromisoformat(raw_date[:10])
elif isinstance(raw_date, date):
purchase_date = raw_date
else:
purchase_date = date.today()
total = _safe_decimal(raw_receipt.get("total") or raw_receipt.get("totalAmount"))
subtotal = raw_receipt.get("subtotal")
tax = raw_receipt.get("tax") or raw_receipt.get("taxAmount")
savings = raw_receipt.get("savings") or raw_receipt.get("totalSavings")
raw_items = raw_receipt.get("items") or []
items = [parse_meijer_item(item) for item in raw_items]
return PurchaseCreate(
user_id=uuid.UUID(user_id) if isinstance(user_id, str) else user_id,
store_id=uuid.UUID(store_id) if isinstance(store_id, str) else store_id,
receipt_id=receipt_id,
purchase_date=purchase_date,
total=total,
subtotal=_safe_decimal(subtotal) if subtotal is not None else None,
tax=_safe_decimal(tax) if tax is not None else None,
savings_total=_safe_decimal(savings) if savings is not None else None,
raw_data=raw_receipt,
items=items,
)
+1
View File
@@ -0,0 +1 @@
"""Retailer scrapers."""
+72
View File
@@ -0,0 +1,72 @@
"""Abstract base scraper interface for all retailer scrapers."""
import asyncio
import random
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from receiptwitness.config import settings
@dataclass
class SessionData:
"""Holds session cookies and metadata for a retailer login."""
cookies: list[dict]
user_agent: str
created_at: datetime
expires_at: datetime | None = None
extra: dict = field(default_factory=dict)
@dataclass
class RawReceipt:
"""Raw receipt data before parsing."""
receipt_id: str
purchase_date: str
store_number: str | None = None
raw_data: dict = field(default_factory=dict)
source_url: str | None = None
class BaseScraper(ABC):
"""All retailer scrapers implement this interface.
Provides common functionality: human-like delays, rate limiting guards,
and the abstract methods each retailer scraper must implement.
"""
@abstractmethod
async def login(self, username: str, password: str) -> SessionData:
"""Authenticate with the retailer portal and return session data."""
...
@abstractmethod
async def check_session(self, session: SessionData) -> bool:
"""Verify if an existing session is still valid."""
...
@abstractmethod
async def scrape_receipts(
self, session: SessionData, since: datetime | None = None
) -> list[RawReceipt]:
"""Scrape receipt data from the retailer portal."""
...
@abstractmethod
def parse_receipt(self, raw: RawReceipt) -> dict:
"""Parse a raw receipt into structured data.
Returns a dict with keys matching PurchaseCreate schema fields,
including an 'items' list matching PurchaseItemCreate fields.
"""
...
async def human_delay(self, min_ms: int | None = None, max_ms: int | None = None) -> None:
"""Sleep for a randomized human-like interval."""
lo = min_ms or settings.min_request_delay_ms
hi = max_ms or settings.max_request_delay_ms
delay = random.randint(lo, hi) / 1000.0
await asyncio.sleep(delay)
+344
View File
@@ -0,0 +1,344 @@
"""Kroger loyalty portal scraper using Playwright.
Kroger uses Akamai Bot Manager for aggressive headless browser detection.
This scraper uses enhanced stealth measures including playwright-stealth,
realistic fingerprinting, and human-like interaction pacing.
"""
import logging
from datetime import UTC, datetime, timedelta
from typing import cast
from playwright.async_api import BrowserContext, Page, Playwright, async_playwright
from receiptwitness.config import settings
from receiptwitness.scrapers.base import BaseScraper, RawReceipt, SessionData
logger = logging.getLogger(__name__)
# Kroger endpoints
KROGER_BASE = "https://www.kroger.com"
KROGER_LOGIN_PAGE = f"{KROGER_BASE}/signin"
KROGER_PURCHASE_HISTORY = f"{KROGER_BASE}/mypurchases"
KROGER_RECEIPT_API = f"{KROGER_BASE}/atlas/v1/purchase-history/api"
KROGER_RECEIPT_DETAIL_API = f"{KROGER_BASE}/atlas/v1/receipt/api"
KROGER_ACCOUNT_PAGE = f"{KROGER_BASE}/account/dashboard"
# Realistic browser fingerprint — Chrome on Windows (matches Kroger's typical audience)
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
DEFAULT_VIEWPORT = {"width": 1920, "height": 1080}
DEFAULT_LOCALE = "en-US"
DEFAULT_TIMEZONE = "America/New_York"
class KrogerScraper(BaseScraper):
"""Scraper for Kroger loyalty purchase history.
Kroger uses Akamai Bot Manager which aggressively detects headless
browsers. This scraper employs enhanced stealth measures:
- Masks webdriver/automation signals
- Sets realistic browser fingerprint
- Uses human-like interaction pacing
- Preserves browser context across sessions
"""
async def _create_stealth_context(
self, playwright_instance: Playwright, cookies: list[dict] | None = None
) -> BrowserContext:
"""Create a browser context with enhanced stealth for Akamai evasion."""
browser = await playwright_instance.chromium.launch(
headless=settings.headless,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-infobars",
"--window-size=1920,1080",
],
)
context = await browser.new_context(
user_agent=DEFAULT_USER_AGENT,
viewport=DEFAULT_VIEWPORT, # type: ignore[arg-type]
locale=DEFAULT_LOCALE,
timezone_id=DEFAULT_TIMEZONE,
java_script_enabled=True,
bypass_csp=False,
color_scheme="light",
has_touch=False,
)
# Enhanced stealth script targeting Akamai Bot Manager detection vectors
await context.add_init_script(
"""
// Mask webdriver flag
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// Chrome runtime object
window.chrome = {
runtime: {},
loadTimes: function() {},
csi: function() {},
app: { isInstalled: false }
};
// Realistic plugin array
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
// Languages
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
});
// Platform
Object.defineProperty(navigator, 'platform', {
get: () => 'Win32'
});
// Hardware concurrency
Object.defineProperty(navigator, 'hardwareConcurrency', {
get: () => 8
});
// Device memory
Object.defineProperty(navigator, 'deviceMemory', {
get: () => 8
});
// Permissions query override (Akamai checks this)
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) =>
parameters.name === 'notifications'
? Promise.resolve({ state: Notification.permission })
: originalQuery(parameters);
// WebGL vendor/renderer (avoid "Google Inc." / "ANGLE" tells)
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function(parameter) {
if (parameter === 37445) return 'Intel Inc.';
if (parameter === 37446) return 'Intel Iris OpenGL Engine';
return getParameter.call(this, parameter);
};
"""
)
if cookies:
await context.add_cookies(cookies) # type: ignore[arg-type]
return cast(BrowserContext, context)
async def login(self, username: str, password: str) -> SessionData:
"""Log in to Kroger and capture session cookies."""
async with async_playwright() as p:
context = await self._create_stealth_context(p)
page = await context.new_page()
try:
return await self._perform_login(page, context, username, password)
finally:
if context.browser:
await context.browser.close()
async def _perform_login(
self, page: Page, context: BrowserContext, username: str, password: str
) -> SessionData:
"""Execute the Kroger login flow."""
logger.info("Navigating to Kroger sign-in page")
await page.goto(KROGER_LOGIN_PAGE, wait_until="networkidle")
await self.human_delay(2000, 4000)
# Kroger login form — email/username field
email_input = page.locator(
'input[id="SignIn-emailInput"], '
'input[name="email"], '
'input[type="email"], '
'input[data-testid="SignIn-emailInput"]'
)
await email_input.wait_for(state="visible", timeout=settings.browser_timeout_ms)
await email_input.click()
await self.human_delay(300, 700)
await email_input.fill(username)
await self.human_delay(800, 1500)
# Password field
password_input = page.locator(
'input[id="SignIn-passwordInput"], '
'input[name="password"], '
'input[type="password"], '
'input[data-testid="SignIn-passwordInput"]'
)
await password_input.wait_for(state="visible", timeout=settings.browser_timeout_ms)
await password_input.click()
await self.human_delay(300, 700)
await password_input.fill(password)
await self.human_delay(1000, 2000)
# Sign-in button
sign_in_btn = page.locator(
'button[id="SignIn-submitButton"], '
'button[data-testid="SignIn-submitButton"], '
'button[type="submit"]:has-text("Sign In")'
)
await sign_in_btn.click()
# Wait for redirect away from sign-in page
await page.wait_for_url(
lambda url: "signin" not in url.lower(),
timeout=settings.browser_timeout_ms,
)
await self.human_delay(1500, 3000)
# Capture cookies
raw_cookies = await context.cookies()
cookies = [dict(c) for c in raw_cookies]
now = datetime.now(UTC)
logger.info("Kroger login successful, captured %d cookies", len(cookies))
return SessionData(
cookies=cookies,
user_agent=DEFAULT_USER_AGENT,
created_at=now,
expires_at=now + timedelta(hours=2),
extra={"retailer": "kroger"},
)
async def check_session(self, session: SessionData) -> bool:
"""Check if the Kroger session is still valid."""
if session.expires_at and datetime.now(UTC) > session.expires_at:
logger.info("Kroger session expired based on timestamp")
return False
async with async_playwright() as p:
context = await self._create_stealth_context(p, cookies=session.cookies)
page = await context.new_page()
try:
response = await page.goto(KROGER_ACCOUNT_PAGE, wait_until="networkidle")
current_url = page.url.lower()
is_valid = "signin" not in current_url and response is not None and response.ok
logger.info("Kroger session check: valid=%s (url=%s)", is_valid, page.url)
return is_valid
except Exception:
logger.exception("Kroger session check failed")
return False
finally:
if context.browser:
await context.browser.close()
async def scrape_receipts(
self, session: SessionData, since: datetime | None = None
) -> list[RawReceipt]:
"""Scrape purchase history from Kroger."""
async with async_playwright() as p:
context = await self._create_stealth_context(p, cookies=session.cookies)
page = await context.new_page()
try:
return await self._fetch_receipts(page, since)
finally:
if context.browser:
await context.browser.close()
async def _fetch_receipts(self, page: Page, since: datetime | None) -> list[RawReceipt]:
"""Fetch receipt list and details from Kroger purchase history."""
# Navigate to purchase history to establish context
await page.goto(KROGER_PURCHASE_HISTORY, wait_until="networkidle")
await self.human_delay(1500, 3000)
receipts: list[RawReceipt] = []
# Kroger purchase history API endpoint
api_response = await page.request.get(KROGER_RECEIPT_API)
if not api_response.ok:
logger.warning(
"Kroger purchase history request failed: %d %s",
api_response.status,
api_response.status_text,
)
return []
response = await api_response.json()
if not isinstance(response, dict):
logger.warning("Unexpected purchase history response type: %s", type(response))
return []
# Handle Kroger's response structure
orders = response.get("orders", response.get("purchases", []))
if not isinstance(orders, list):
logger.warning("No orders found in Kroger purchase history response")
return []
logger.info("Found %d orders in Kroger purchase history", len(orders))
for order in orders:
raw_id = order.get("orderId") or order.get("receiptId") or order.get("id") or ""
order_id = str(raw_id)
purchase_date = order.get(
"purchaseDate", order.get("transactionDate", order.get("date", ""))
)
# Filter by date if 'since' is provided
if since and purchase_date:
try:
txn_dt = datetime.fromisoformat(purchase_date.replace("Z", "+00:00"))
if txn_dt < since:
continue
except (ValueError, TypeError):
pass
if not order_id:
continue
await self.human_delay(1000, 2500)
# Fetch receipt detail
detail = await self._fetch_receipt_detail(page, order_id)
raw_store = (
order.get("storeNumber")
or order.get("divisionNumber")
or order.get("storeId")
or ""
)
store_number = str(raw_store)
receipts.append(
RawReceipt(
receipt_id=order_id,
purchase_date=purchase_date,
store_number=store_number,
raw_data={**order, "detail": detail},
source_url=f"{KROGER_RECEIPT_DETAIL_API}?orderId={order_id}",
)
)
logger.info("Scraped %d receipts from Kroger", len(receipts))
return receipts
async def _fetch_receipt_detail(self, page: Page, order_id: str) -> dict:
"""Fetch detailed receipt data for a single Kroger order."""
try:
url = f"{KROGER_RECEIPT_DETAIL_API}?orderId={order_id}"
api_response = await page.request.get(url)
if not api_response.ok:
logger.warning(
"Kroger receipt detail request failed for %s: %d",
order_id,
api_response.status,
)
return {}
detail = await api_response.json()
return detail if isinstance(detail, dict) else {}
except Exception:
logger.exception("Failed to fetch Kroger receipt detail for %s", order_id)
return {}
def parse_receipt(self, raw: RawReceipt) -> dict:
"""Parse raw Kroger receipt into structured purchase data."""
from receiptwitness.parsers.kroger import parse_kroger_receipt
return parse_kroger_receipt(raw)
+301
View File
@@ -0,0 +1,301 @@
"""Meijer mPerks scraper using Playwright.
Meijer has no public API. We reverse-engineer the XHR endpoints the mPerks
web app uses to pull purchase history and receipt data. The flow:
1. Launch stealth Playwright browser
2. Navigate to mPerks login page and authenticate
3. Capture session cookies after successful login
4. Use those cookies to hit the mPerks receipt API endpoints directly
5. Parse receipt JSON into structured PurchaseCreate records
Key endpoints (reverse-engineered from mPerks SPA):
- Login: POST https://www.meijer.com/bin/meijer/account/login
- Receipts: GET https://www.meijer.com/bin/meijer/profile/purchasehistory
- Receipt detail: GET https://www.meijer.com/bin/meijer/profile/receipt?receiptId=...
"""
import logging
from datetime import UTC, datetime, timedelta
from typing import cast
from playwright.async_api import BrowserContext, Page, Playwright, async_playwright
from receiptwitness.config import settings
from receiptwitness.scrapers.base import BaseScraper, RawReceipt, SessionData
logger = logging.getLogger(__name__)
# Meijer mPerks URLs
MEIJER_BASE = "https://www.meijer.com"
MEIJER_LOGIN_PAGE = f"{MEIJER_BASE}/shopping/login.html"
MEIJER_LOGIN_API = f"{MEIJER_BASE}/bin/meijer/account/login"
MEIJER_PURCHASE_HISTORY = f"{MEIJER_BASE}/bin/meijer/profile/purchasehistory"
MEIJER_RECEIPT_DETAIL = f"{MEIJER_BASE}/bin/meijer/profile/receipt"
MEIJER_MPERKS_HOME = f"{MEIJER_BASE}/mperks.html"
# Realistic browser fingerprint
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
DEFAULT_VIEWPORT = {"width": 1920, "height": 1080}
DEFAULT_LOCALE = "en-US"
DEFAULT_TIMEZONE = "America/Detroit" # Meijer HQ is in Grand Rapids, MI
class MeijerScraper(BaseScraper):
"""Scraper for Meijer mPerks purchase history."""
async def _create_stealth_context(
self, playwright_instance: Playwright, cookies: list[dict] | None = None
) -> BrowserContext:
"""Create a browser context with stealth settings."""
browser = await playwright_instance.chromium.launch(
headless=settings.headless,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
],
)
context = await browser.new_context(
user_agent=DEFAULT_USER_AGENT,
viewport=DEFAULT_VIEWPORT, # type: ignore[arg-type]
locale=DEFAULT_LOCALE,
timezone_id=DEFAULT_TIMEZONE,
java_script_enabled=True,
bypass_csp=False,
)
# Mask webdriver flag
await context.add_init_script(
"""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// Mask chrome automation indicators
window.chrome = { runtime: {} };
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
});
"""
)
if cookies:
await context.add_cookies(cookies) # type: ignore[arg-type]
return cast(BrowserContext, context)
async def login(self, username: str, password: str) -> SessionData:
"""Log in to Meijer mPerks and capture session cookies.
The mPerks login flow:
1. Navigate to login page
2. Fill email and password fields
3. Click sign-in button
4. Wait for redirect to mPerks dashboard
5. Extract session cookies
"""
async with async_playwright() as p:
context = await self._create_stealth_context(p)
page = await context.new_page()
try:
return await self._perform_login(page, context, username, password)
finally:
if context.browser:
await context.browser.close()
async def _perform_login(
self, page: Page, context: BrowserContext, username: str, password: str
) -> SessionData:
"""Execute the login flow on the mPerks portal."""
logger.info("Navigating to Meijer login page")
await page.goto(MEIJER_LOGIN_PAGE, wait_until="networkidle")
await self.human_delay(1500, 3000)
# Fill email field
email_input = page.locator('input[type="email"], input[name="email"], #email')
await email_input.wait_for(state="visible", timeout=settings.browser_timeout_ms)
await email_input.click()
await self.human_delay(200, 500)
await email_input.fill(username)
await self.human_delay(500, 1000)
# Fill password field
password_input = page.locator('input[type="password"], input[name="password"], #password')
await password_input.wait_for(state="visible", timeout=settings.browser_timeout_ms)
await password_input.click()
await self.human_delay(200, 500)
await password_input.fill(password)
await self.human_delay(500, 1500)
# Click sign-in button
sign_in_btn = page.locator(
'button[type="submit"], button:has-text("Sign In"), button:has-text("Log In")'
)
await sign_in_btn.click()
# Wait for navigation after login
await page.wait_for_url(
lambda url: "login" not in url.lower(),
timeout=settings.browser_timeout_ms,
)
await self.human_delay(1000, 2000)
# Capture cookies
raw_cookies = await context.cookies()
cookies = [dict(c) for c in raw_cookies]
now = datetime.now(UTC)
logger.info("Meijer login successful, captured %d cookies", len(cookies))
return SessionData(
cookies=cookies,
user_agent=DEFAULT_USER_AGENT,
created_at=now,
expires_at=now + timedelta(hours=4),
)
async def check_session(self, session: SessionData) -> bool:
"""Check if the mPerks session is still valid.
Makes a lightweight request to the mPerks home page and checks
if we get redirected to login (session expired) or not.
"""
if session.expires_at and datetime.now(UTC) > session.expires_at:
logger.info("Meijer session expired based on timestamp")
return False
async with async_playwright() as p:
context = await self._create_stealth_context(p, cookies=session.cookies)
page = await context.new_page()
try:
response = await page.goto(MEIJER_MPERKS_HOME, wait_until="networkidle")
current_url = page.url.lower()
is_valid = "login" not in current_url and response is not None and response.ok
logger.info("Meijer session check: valid=%s (url=%s)", is_valid, page.url)
return is_valid
except Exception:
logger.exception("Meijer session check failed")
return False
finally:
if context.browser:
await context.browser.close()
async def scrape_receipts(
self, session: SessionData, since: datetime | None = None
) -> list[RawReceipt]:
"""Scrape purchase history from Meijer mPerks.
Uses the XHR endpoints the mPerks SPA calls to fetch receipt data.
The purchase history endpoint returns a list of recent transactions,
and we can fetch individual receipt details for line items.
"""
async with async_playwright() as p:
context = await self._create_stealth_context(p, cookies=session.cookies)
page = await context.new_page()
try:
return await self._fetch_receipts(page, since)
finally:
if context.browser:
await context.browser.close()
async def _fetch_receipts(self, page: Page, since: datetime | None) -> list[RawReceipt]:
"""Fetch receipt list and detail via mPerks XHR endpoints.
Uses Playwright's page.request API (APIRequestContext) instead of
page.evaluate(fetch(...)) for better observability — requests show up
in Playwright traces and can be intercepted by route handlers.
"""
# Navigate to mPerks to establish context (cookies need domain context)
await page.goto(MEIJER_MPERKS_HOME, wait_until="networkidle")
await self.human_delay(1000, 2000)
receipts: list[RawReceipt] = []
# Fetch purchase history listing via page.request (APIRequestContext)
api_response = await page.request.get(MEIJER_PURCHASE_HISTORY)
if not api_response.ok:
logger.warning(
"Purchase history request failed: %d %s",
api_response.status,
api_response.status_text,
)
return []
response = await api_response.json()
if not isinstance(response, dict):
logger.warning("Unexpected purchase history response type: %s", type(response))
return []
transactions = response.get("transactions", response.get("purchaseHistory", []))
if not isinstance(transactions, list):
logger.warning("No transactions found in purchase history response")
return []
logger.info("Found %d transactions in Meijer purchase history", len(transactions))
for txn in transactions:
receipt_id = str(txn.get("transactionId", txn.get("receiptId", "")))
purchase_date = txn.get("transactionDate", txn.get("purchaseDate", ""))
# Filter by date if 'since' is provided
if since and purchase_date:
try:
txn_dt = datetime.fromisoformat(purchase_date.replace("Z", "+00:00"))
if txn_dt < since:
continue
except (ValueError, TypeError):
pass
if not receipt_id:
continue
await self.human_delay(800, 2000)
# Fetch receipt detail
detail = await self._fetch_receipt_detail(page, receipt_id)
receipts.append(
RawReceipt(
receipt_id=receipt_id,
purchase_date=purchase_date,
store_number=str(txn.get("storeNumber", txn.get("storeId", ""))),
raw_data={**txn, "detail": detail},
source_url=f"{MEIJER_RECEIPT_DETAIL}?receiptId={receipt_id}",
)
)
logger.info("Scraped %d receipts from Meijer", len(receipts))
return receipts
async def _fetch_receipt_detail(self, page: Page, receipt_id: str) -> dict:
"""Fetch detailed receipt data for a single transaction.
Uses Playwright's page.request API for traceability.
"""
try:
url = f"{MEIJER_RECEIPT_DETAIL}?receiptId={receipt_id}"
api_response = await page.request.get(url)
if not api_response.ok:
logger.warning(
"Receipt detail request failed for %s: %d",
receipt_id,
api_response.status,
)
return {}
detail = await api_response.json()
return detail if isinstance(detail, dict) else {}
except Exception:
logger.exception("Failed to fetch receipt detail for %s", receipt_id)
return {}
def parse_receipt(self, raw: RawReceipt) -> dict:
"""Parse raw Meijer receipt into structured purchase data.
Delegates to the dedicated parser module.
"""
from receiptwitness.parsers.meijer import parse_meijer_receipt
return parse_meijer_receipt(raw)
+326
View File
@@ -0,0 +1,326 @@
"""Target Circle scraper using Playwright.
Target stores ~1 year of in-store purchase history tied to Circle accounts.
Purchases appear when the user pays with a linked card, uses the Target app
wallet, or enters their Circle phone number at checkout.
Key endpoints (reverse-engineered from target.com SPA):
- Login: POST https://gsp.target.com/gsp/authentications/v1/auth_codes
- Order history: GET https://api.target.com/order_history/v1/orders (in-store tab)
- Receipt detail: GET https://api.target.com/order_history/v1/orders/{orderId}
"""
import logging
from datetime import UTC, datetime, timedelta
from typing import cast
from playwright.async_api import BrowserContext, Page, Playwright, async_playwright
from receiptwitness.config import settings
from receiptwitness.scrapers.base import BaseScraper, RawReceipt, SessionData
logger = logging.getLogger(__name__)
# Target endpoints
TARGET_BASE = "https://www.target.com"
TARGET_LOGIN_PAGE = f"{TARGET_BASE}/login"
TARGET_ACCOUNT_PAGE = f"{TARGET_BASE}/account"
TARGET_ORDER_HISTORY = f"{TARGET_BASE}/account/orders"
TARGET_ORDER_API = "https://api.target.com/order_history/v1/orders"
TARGET_RECEIPT_API = "https://api.target.com/order_history/v1/orders"
# Realistic browser fingerprint — Chrome on Windows
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
DEFAULT_VIEWPORT = {"width": 1920, "height": 1080}
DEFAULT_LOCALE = "en-US"
DEFAULT_TIMEZONE = "America/Detroit" # SE Michigan coverage
class TargetScraper(BaseScraper):
"""Scraper for Target Circle in-store purchase history.
Target's order history SPA loads purchase data from internal API
endpoints. This scraper authenticates via the web login flow,
captures session cookies, and uses those to hit the order history
API for in-store receipt data.
"""
async def _create_stealth_context(
self, playwright_instance: Playwright, cookies: list[dict] | None = None
) -> BrowserContext:
"""Create a browser context with stealth settings for Target."""
browser = await playwright_instance.chromium.launch(
headless=settings.headless,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-dev-shm-usage",
],
)
context = await browser.new_context(
user_agent=DEFAULT_USER_AGENT,
viewport=DEFAULT_VIEWPORT, # type: ignore[arg-type]
locale=DEFAULT_LOCALE,
timezone_id=DEFAULT_TIMEZONE,
java_script_enabled=True,
bypass_csp=False,
color_scheme="light",
has_touch=False,
)
# Mask webdriver and automation signals
await context.add_init_script(
"""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
window.chrome = {
runtime: {},
loadTimes: function() {},
csi: function() {},
app: { isInstalled: false }
};
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
});
Object.defineProperty(navigator, 'platform', {
get: () => 'Win32'
});
Object.defineProperty(navigator, 'hardwareConcurrency', {
get: () => 8
});
Object.defineProperty(navigator, 'deviceMemory', {
get: () => 8
});
"""
)
if cookies:
await context.add_cookies(cookies) # type: ignore[arg-type]
return cast(BrowserContext, context)
async def login(self, username: str, password: str) -> SessionData:
"""Log in to Target and capture session cookies."""
async with async_playwright() as p:
context = await self._create_stealth_context(p)
page = await context.new_page()
try:
return await self._perform_login(page, context, username, password)
finally:
if context.browser:
await context.browser.close()
async def _perform_login(
self, page: Page, context: BrowserContext, username: str, password: str
) -> SessionData:
"""Execute the Target login flow."""
logger.info("Navigating to Target sign-in page")
await page.goto(TARGET_LOGIN_PAGE, wait_until="networkidle")
await self.human_delay(2000, 4000)
# Target login form — email/username field
email_input = page.locator(
'input[id="username"], '
'input[name="username"], '
'input[type="email"], '
'input[data-test="username"]'
)
await email_input.wait_for(state="visible", timeout=settings.browser_timeout_ms)
await email_input.click()
await self.human_delay(300, 700)
await email_input.fill(username)
await self.human_delay(800, 1500)
# Password field
password_input = page.locator(
'input[id="password"], '
'input[name="password"], '
'input[type="password"], '
'input[data-test="password"]'
)
await password_input.wait_for(state="visible", timeout=settings.browser_timeout_ms)
await password_input.click()
await self.human_delay(300, 700)
await password_input.fill(password)
await self.human_delay(1000, 2000)
# Sign-in button
sign_in_btn = page.locator(
'button[id="login"], '
'button[data-test="login-button"], '
'button[type="submit"]:has-text("Sign in")'
)
await sign_in_btn.click()
# Wait for redirect away from login page
await page.wait_for_url(
lambda url: "login" not in url.lower(),
timeout=settings.browser_timeout_ms,
)
await self.human_delay(1500, 3000)
# Capture cookies
raw_cookies = await context.cookies()
cookies = [dict(c) for c in raw_cookies]
now = datetime.now(UTC)
logger.info("Target login successful, captured %d cookies", len(cookies))
return SessionData(
cookies=cookies,
user_agent=DEFAULT_USER_AGENT,
created_at=now,
expires_at=now + timedelta(hours=2),
extra={"retailer": "target"},
)
async def check_session(self, session: SessionData) -> bool:
"""Check if the Target session is still valid."""
if session.expires_at and datetime.now(UTC) > session.expires_at:
logger.info("Target session expired based on timestamp")
return False
async with async_playwright() as p:
context = await self._create_stealth_context(p, cookies=session.cookies)
page = await context.new_page()
try:
response = await page.goto(TARGET_ACCOUNT_PAGE, wait_until="networkidle")
current_url = page.url.lower()
is_valid = "login" not in current_url and response is not None and response.ok
logger.info("Target session check: valid=%s (url=%s)", is_valid, page.url)
return is_valid
except Exception:
logger.exception("Target session check failed")
return False
finally:
if context.browser:
await context.browser.close()
async def scrape_receipts(
self, session: SessionData, since: datetime | None = None
) -> list[RawReceipt]:
"""Scrape in-store purchase history from Target Circle."""
async with async_playwright() as p:
context = await self._create_stealth_context(p, cookies=session.cookies)
page = await context.new_page()
try:
return await self._fetch_receipts(page, since)
finally:
if context.browser:
await context.browser.close()
async def _fetch_receipts(self, page: Page, since: datetime | None) -> list[RawReceipt]:
"""Fetch receipt list and details from Target order history.
Target's order history page has separate tabs for online and in-store
purchases. We target the in-store tab which shows Circle-linked
transactions.
"""
# Navigate to order history to establish context
await page.goto(TARGET_ORDER_HISTORY, wait_until="networkidle")
await self.human_delay(1500, 3000)
receipts: list[RawReceipt] = []
# Target order history API — filter for in-store purchases
api_response = await page.request.get(
TARGET_ORDER_API,
params={"channel": "in_store", "limit": "50"},
)
if not api_response.ok:
logger.warning(
"Target order history request failed: %d %s",
api_response.status,
api_response.status_text,
)
return []
response = await api_response.json()
if not isinstance(response, dict):
logger.warning("Unexpected order history response type: %s", type(response))
return []
# Target uses "orders" key for in-store purchase list
orders = response.get("orders", response.get("transactions", []))
if not isinstance(orders, list):
logger.warning("No orders found in Target order history response")
return []
logger.info("Found %d in-store orders in Target history", len(orders))
for order in orders:
raw_id = order.get("orderId") or order.get("transactionId") or order.get("id") or ""
order_id = str(raw_id)
purchase_date = order.get(
"purchaseDate",
order.get("transactionDate", order.get("date", "")),
)
# Filter by date if 'since' is provided
if since and purchase_date:
try:
txn_dt = datetime.fromisoformat(purchase_date.replace("Z", "+00:00"))
if txn_dt < since:
continue
except (ValueError, TypeError):
pass
if not order_id:
continue
await self.human_delay(1000, 2500)
# Fetch receipt detail
detail = await self._fetch_receipt_detail(page, order_id)
raw_store = (
order.get("storeNumber") or order.get("storeId") or order.get("locationId") or ""
)
store_number = str(raw_store)
receipts.append(
RawReceipt(
receipt_id=order_id,
purchase_date=purchase_date,
store_number=store_number,
raw_data={**order, "detail": detail},
source_url=f"{TARGET_RECEIPT_API}/{order_id}",
)
)
logger.info("Scraped %d receipts from Target", len(receipts))
return receipts
async def _fetch_receipt_detail(self, page: Page, order_id: str) -> dict:
"""Fetch detailed receipt data for a single Target order."""
try:
url = f"{TARGET_RECEIPT_API}/{order_id}"
api_response = await page.request.get(url)
if not api_response.ok:
logger.warning(
"Target receipt detail request failed for %s: %d",
order_id,
api_response.status,
)
return {}
detail = await api_response.json()
return detail if isinstance(detail, dict) else {}
except Exception:
logger.exception("Failed to fetch Target receipt detail for %s", order_id)
return {}
def parse_receipt(self, raw: RawReceipt) -> dict:
"""Parse raw Target receipt into structured purchase data."""
from receiptwitness.parsers.target import parse_target_receipt
return parse_target_receipt(raw)
+1
View File
@@ -0,0 +1 @@
"""Session management — encrypted cookie storage and refresh logic."""
+52
View File
@@ -0,0 +1,52 @@
"""Fernet-based encryption for session cookies at rest.
Session data (cookies, tokens) is encrypted before writing to the database
and decrypted only when needed for a scrape. The encryption key is provided
via the RW_SESSION_ENCRYPTION_KEY environment variable — it is never stored
in the database or logged.
"""
import json
import logging
from cryptography.fernet import Fernet, InvalidToken
from receiptwitness.config import settings
logger = logging.getLogger(__name__)
def _get_fernet() -> Fernet:
"""Get a Fernet instance using the configured encryption key."""
key = settings.session_encryption_key
if not key:
raise ValueError(
"RW_SESSION_ENCRYPTION_KEY is not set. "
"Generate one with: "
"python -c 'from cryptography.fernet import Fernet; "
"print(Fernet.generate_key().decode())'"
)
return Fernet(key.encode() if isinstance(key, str) else key)
def encrypt_session_data(data: dict) -> str:
"""Encrypt session data dict to a Fernet token string.
The data is JSON-serialized, then encrypted. The result is a
URL-safe base64-encoded string suitable for storing in JSONB.
"""
f = _get_fernet()
plaintext = json.dumps(data, default=str).encode("utf-8")
return f.encrypt(plaintext).decode("utf-8")
def decrypt_session_data(encrypted: str) -> dict:
"""Decrypt a Fernet token string back to a session data dict."""
f = _get_fernet()
try:
plaintext = f.decrypt(encrypted.encode("utf-8"))
result: dict = json.loads(plaintext)
return result
except InvalidToken:
logger.error("Failed to decrypt session data — invalid token or wrong key")
raise
+81
View File
@@ -0,0 +1,81 @@
"""Session storage, retrieval, and refresh logic.
Manages the lifecycle of retailer session data:
- Load encrypted session from DB
- Check validity via scraper
- Re-authenticate if expired
- Save new session back (encrypted)
"""
import logging
from dataclasses import asdict
from datetime import UTC, datetime
from receiptwitness.scrapers.base import BaseScraper, SessionData
from receiptwitness.session.encryption import decrypt_session_data, encrypt_session_data
logger = logging.getLogger(__name__)
def session_from_db_record(session_data_encrypted: str | None) -> SessionData | None:
"""Deserialize and decrypt a session from the database.
The session_data column in user_store_accounts stores the Fernet-encrypted
JSON of the SessionData fields.
"""
if not session_data_encrypted:
return None
try:
data = decrypt_session_data(session_data_encrypted)
return SessionData(
cookies=data["cookies"],
user_agent=data["user_agent"],
created_at=datetime.fromisoformat(data["created_at"]),
expires_at=(
datetime.fromisoformat(data["expires_at"]) if data.get("expires_at") else None
),
extra=data.get("extra", {}),
)
except Exception:
logger.exception("Failed to load session from DB record")
return None
def session_to_db_value(session: SessionData) -> str:
"""Serialize and encrypt a session for database storage."""
data = asdict(session)
# Convert datetime objects to ISO strings for JSON serialization
data["created_at"] = session.created_at.isoformat()
if session.expires_at:
data["expires_at"] = session.expires_at.isoformat()
return encrypt_session_data(data)
async def get_valid_session(
scraper: BaseScraper,
session_data_encrypted: str | None,
username: str,
password: str,
) -> tuple[SessionData, bool]:
"""Get a valid session, re-authenticating if needed.
Returns:
A tuple of (session, was_refreshed). If was_refreshed is True,
the caller should persist the new session to the database.
"""
# Try existing session first
existing = session_from_db_record(session_data_encrypted)
if existing:
if existing.expires_at and datetime.now(UTC) > existing.expires_at:
logger.info("Session expired by timestamp, re-authenticating")
elif await scraper.check_session(existing):
logger.info("Existing session is valid")
return existing, False
else:
logger.info("Session check failed, re-authenticating")
# Need to re-authenticate
logger.info("Performing fresh login")
new_session = await scraper.login(username, password)
return new_session, True