forked from cartsnitch/cartsnitch
fix(receiptwitness): pool DB engine and Redis client to prevent connection exhaustion
email_worker calls get_async_session_factory() inside every resolve_user() call, which creates a brand-new async engine (and thus a brand-new connection pool) on every message. In a tight consumer loop processing 5 messages per batch, this rapidly exhausts DragonflyDB/Postgres connection limits and manifests as ConnectionResetError. Fix: cache the async engine in a module-level dict keyed by URL in cartsnitch_common.database:get_async_engine(), matching the pattern already used in receiptwitness:events.py for the Redis connection pool. Also add pool_size=10, max_overflow=20, pool_pre_ping=True for 健壮连接管理. Similarly, receiptwitness/queue/email.py:get_redis() was creating a new Redis connection on every call with no pooling. Share a ConnectionPool (max_connections=30) across all get_redis() callers. Fixes CAR-1078 Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -1,17 +1,36 @@
|
||||
"""Database engine and session factories for sync and async usage."""
|
||||
|
||||
from collections.abc import AsyncGenerator, Generator
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from cartsnitch_common.config import settings
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
def get_async_engine(url: str | None = None):
|
||||
"""Create an async SQLAlchemy engine."""
|
||||
return create_async_engine(url or settings.database_url, echo=settings.debug)
|
||||
# Module-level async engine cache — one engine per unique URL, shared across all callers.
|
||||
# This prevents pool exhaustion in high-throughput workers (e.g. email-worker hitting
|
||||
# DragonflyDB/Postgres repeatedly per message). pool_size=10, max_overflow=20 gives
|
||||
# headroom for bursts while capping max connections at 30 per URL.
|
||||
_async_engine_cache: dict[str, "AsyncEngine"] = {}
|
||||
|
||||
|
||||
def get_async_engine(url: str | None = None) -> "AsyncEngine":
|
||||
"""Get or create a cached async engine for the given URL."""
|
||||
target = url or settings.database_url
|
||||
if target not in _async_engine_cache:
|
||||
_async_engine_cache[target] = create_async_engine(
|
||||
target,
|
||||
echo=settings.debug,
|
||||
pool_size=10,
|
||||
max_overflow=20,
|
||||
pool_pre_ping=True,
|
||||
)
|
||||
return _async_engine_cache[target]
|
||||
|
||||
|
||||
def get_sync_engine(url: str | None = None):
|
||||
|
||||
@@ -16,6 +16,29 @@ logger = logging.getLogger(__name__)
|
||||
STREAM_KEY = "email:receipts"
|
||||
CONSUMER_GROUP = "email-workers"
|
||||
|
||||
# Module-level Redis/DragonflyDB connection pool — shared across all worker calls.
|
||||
# Without pooling, each call to get_redis() opens a new TCP connection. In a tight
|
||||
# consumer loop this causes ConnectionResetError when DragonflyDB's connection limit
|
||||
# is hit under load. max_connections=30 (10 base + 20 overflow) mirrors the engine pool.
|
||||
_redis_pool: aioredis.ConnectionPool | None = None
|
||||
|
||||
|
||||
def _get_redis_pool() -> aioredis.ConnectionPool:
|
||||
"""Get or create the shared DragonflyDB connection pool."""
|
||||
global _redis_pool
|
||||
if _redis_pool is None:
|
||||
_redis_pool = aioredis.ConnectionPool.from_url(
|
||||
settings.redis_url,
|
||||
decode_responses=True,
|
||||
max_connections=30,
|
||||
)
|
||||
return _redis_pool
|
||||
|
||||
|
||||
async def get_redis() -> aioredis.Redis:
|
||||
"""Get async Redis/DragonflyDB client backed by a shared connection pool."""
|
||||
return aioredis.Redis(connection_pool=_get_redis_pool())
|
||||
|
||||
|
||||
@dataclass
|
||||
class EmailJob:
|
||||
@@ -31,11 +54,6 @@ class EmailJob:
|
||||
message_id: str # from email provider, for dedup
|
||||
|
||||
|
||||
async def get_redis() -> aioredis.Redis:
|
||||
"""Get async Redis/DragonflyDB client."""
|
||||
return cast(aioredis.Redis, aioredis.from_url(settings.redis_url, decode_responses=True))
|
||||
|
||||
|
||||
async def ensure_consumer_group(client: aioredis.Redis) -> None:
|
||||
"""Create consumer group if it does not exist."""
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user