forked from cartsnitch/cartsnitch
157 lines
4.4 KiB
Python
157 lines
4.4 KiB
Python
"""Product normalization — Phase 1: UPC matching + fuzzy name matching.
|
|
|
|
Matches products across retailers by:
|
|
1. Exact UPC match (highest confidence)
|
|
2. Fuzzy name matching via token-based Jaccard similarity (lower confidence)
|
|
"""
|
|
|
|
import re
|
|
from dataclasses import dataclass
|
|
from enum import StrEnum
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from cartsnitch_common.models.product import NormalizedProduct
|
|
|
|
|
|
class MatchMethod(StrEnum):
|
|
"""How a product match was determined."""
|
|
|
|
UPC = "upc"
|
|
NAME = "name"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MatchResult:
|
|
"""Result of a product normalization attempt."""
|
|
|
|
product: NormalizedProduct
|
|
confidence: float
|
|
method: MatchMethod
|
|
|
|
|
|
# Noise words stripped during name cleaning
|
|
_NOISE_WORDS = frozenset(
|
|
{
|
|
"the",
|
|
"a",
|
|
"an",
|
|
"and",
|
|
"or",
|
|
"of",
|
|
"with",
|
|
"in",
|
|
"for",
|
|
"to",
|
|
"brand",
|
|
"original",
|
|
"classic",
|
|
"new",
|
|
"improved",
|
|
}
|
|
)
|
|
|
|
# Regex for extracting size info (e.g., "16 oz", "1.5 lb", "12 ct")
|
|
_SIZE_PATTERN = re.compile(
|
|
r"(\d+(?:\.\d+)?)\s*(oz|fl\s*oz|lb|lbs|g|kg|ml|l|ct|pk|count|pack)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def clean_name(name: str) -> str:
|
|
"""Normalize a product name for comparison.
|
|
|
|
- Lowercase
|
|
- Remove size info (e.g., "16 oz")
|
|
- Strip noise words
|
|
- Collapse whitespace
|
|
"""
|
|
cleaned = name.lower()
|
|
cleaned = _SIZE_PATTERN.sub("", cleaned)
|
|
cleaned = re.sub(r"[^\w\s]", " ", cleaned)
|
|
tokens = cleaned.split()
|
|
tokens = [t for t in tokens if t not in _NOISE_WORDS]
|
|
return " ".join(tokens)
|
|
|
|
|
|
def extract_size_info(name: str) -> tuple[str, str] | None:
|
|
"""Extract (size, unit) from a product name, if present."""
|
|
match = _SIZE_PATTERN.search(name)
|
|
if match:
|
|
return match.group(1), match.group(2).lower().replace(" ", "_")
|
|
return None
|
|
|
|
|
|
def jaccard_similarity(a: str, b: str) -> float:
|
|
"""Token-based Jaccard similarity between two cleaned names."""
|
|
tokens_a = set(a.split())
|
|
tokens_b = set(b.split())
|
|
if not tokens_a or not tokens_b:
|
|
return 0.0
|
|
intersection = tokens_a & tokens_b
|
|
union = tokens_a | tokens_b
|
|
return len(intersection) / len(union)
|
|
|
|
|
|
def match_by_upc(session: Session, upc: str) -> MatchResult | None:
|
|
"""Find a normalized product by exact UPC match.
|
|
|
|
Loads products with upc_variants and checks membership in Python
|
|
for cross-database compatibility (works on both PostgreSQL and SQLite).
|
|
"""
|
|
# TODO: Use PostgreSQL JSON containment query (@>) for production.
|
|
# Current approach loads all products into memory — acceptable for tests
|
|
# and small datasets, but will not scale.
|
|
stmt = select(NormalizedProduct).where(NormalizedProduct.upc_variants.is_not(None))
|
|
products = session.execute(stmt).scalars().all()
|
|
for product in products:
|
|
if product.upc_variants and upc in product.upc_variants:
|
|
return MatchResult(product=product, confidence=1.0, method=MatchMethod.UPC)
|
|
return None
|
|
|
|
|
|
def match_by_name(
|
|
session: Session,
|
|
name: str,
|
|
threshold: float = 0.5,
|
|
) -> MatchResult | None:
|
|
"""Find the best normalized product by fuzzy name matching.
|
|
|
|
Loads all normalized products and computes Jaccard similarity.
|
|
Returns the best match above the threshold, or None.
|
|
"""
|
|
# TODO: Use pg_trgm similarity index for production.
|
|
# Current approach loads all products into memory — acceptable for tests
|
|
# and small datasets, but will not scale.
|
|
cleaned = clean_name(name)
|
|
stmt = select(NormalizedProduct)
|
|
products = session.execute(stmt).scalars().all()
|
|
|
|
best_match: NormalizedProduct | None = None
|
|
best_score = 0.0
|
|
|
|
for product in products:
|
|
score = jaccard_similarity(cleaned, clean_name(product.canonical_name))
|
|
if score > best_score and score >= threshold:
|
|
best_score = score
|
|
best_match = product
|
|
|
|
if best_match:
|
|
return MatchResult(product=best_match, confidence=best_score, method=MatchMethod.NAME)
|
|
return None
|
|
|
|
|
|
def normalize_product(
|
|
session: Session,
|
|
name: str,
|
|
upc: str | None = None,
|
|
name_threshold: float = 0.5,
|
|
) -> MatchResult | None:
|
|
"""Full normalization pipeline: UPC first, then fuzzy name fallback."""
|
|
if upc:
|
|
result = match_by_upc(session, upc)
|
|
if result:
|
|
return result
|
|
return match_by_name(session, name, threshold=name_threshold)
|