Files
api/src/cartsnitch_api/cache.py
T
Barcode Betty 8deaf6e599
CI / lint (pull_request) Successful in 5s
CI / typecheck (pull_request) Successful in 19s
CI / test (pull_request) Successful in 22s
CI / build-and-push (pull_request) Has been skipped
fix(ci): resolve dev lint + typecheck failures (CAR-1330)
Three CI-blocking issues on dev branch (also present on uat, fixed in 2b20946):

1. tests/conftest.py — remove extra blank line (ruff format).
2. src/cartsnitch_api/middleware/rate_limit.py — delete duplicate
   _public_limiter/_auth_limiter/_auth_strict_limiter forward-decl block
   (the second occurrence; mypy no-redef).
3. src/cartsnitch_api/cache.py:38 — annotate
   value: str | bytes | None so mypy doesn't widen redis client return
   to Any (no-any-return).

Verified: ruff check . && ruff format --check . && mypy src/cartsnitch_api
all pass.

Sibling of CAR-1330 (which fixes uat directly). Heals dev so future
dev → uat promotions stay green.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-06-09 11:13:44 +00:00

82 lines
2.6 KiB
Python

"""Redis/DragonflyDB caching helpers."""
import redis.asyncio as redis
from cartsnitch_api.config import settings
class CacheClient:
"""Redis/DragonflyDB caching with connection pooling.
Will be used for expensive queries: price trends, product comparisons.
Cache invalidation via Redis pub/sub events from other services.
"""
def __init__(self) -> None:
self._pool: redis.ConnectionPool | None = None
self._client: redis.Redis | None = None
async def initialize(self) -> None:
"""Initialize the Redis connection pool."""
self._pool = redis.ConnectionPool.from_url(
settings.redis_url,
max_connections=20,
decode_responses=True,
)
self._client = redis.Redis(connection_pool=self._pool)
async def close(self) -> None:
"""Close the Redis connection pool."""
if self._client:
await self._client.aclose()
if self._pool:
await self._pool.aclose()
async def get(self, key: str) -> str | None:
if not self._client:
return None
value: str | bytes | None = await self._client.get(key)
if value is None:
return None
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
return value
async def set(self, key: str, value: str, ttl_seconds: int = 300) -> None:
if not self._client:
return
await self._client.set(key, value, ex=ttl_seconds)
async def delete(self, key: str) -> None:
if not self._client:
return
await self._client.delete(key)
async def invalidate_price_cache(self, product_id: str) -> None:
"""Invalidate all price-related cache entries for a product."""
if not self._client:
return
pattern = f"price:*:{product_id}"
await self._delete_pattern(pattern)
async def invalidate_product_cache(self, product_id: str) -> None:
"""Invalidate the product detail cache entry."""
if not self._client:
return
await self._client.delete(f"product:{product_id}")
async def _delete_pattern(self, pattern: str) -> None:
"""Delete all keys matching a pattern using SCAN."""
if not self._client:
return
cursor = 0
while True:
cursor, keys = await self._client.scan(cursor=cursor, match=pattern, count=100)
if keys:
await self._client.delete(*keys)
if cursor == 0:
break
cache_client = CacheClient()