From fb0bb0102c2dc26e86ac2304816f34865b9cd18a Mon Sep 17 00:00:00 2001 From: Flea Flicker Date: Thu, 28 May 2026 18:50:53 +0000 Subject: [PATCH] fix(receiptwitness): pool DB engine and Redis client to prevent connection exhaustion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit email_worker calls get_async_session_factory() inside every resolve_user() call, which creates a brand-new async engine (and thus a brand-new connection pool) on every message. In a tight consumer loop processing 5 messages per batch, this rapidly exhausts DragonflyDB/Postgres connection limits and manifests as ConnectionResetError. Fix: cache the async engine in a module-level dict keyed by URL in cartsnitch_common.database:get_async_engine(), matching the pattern already used in receiptwitness:events.py for the Redis connection pool. Also add pool_size=10, max_overflow=20, pool_pre_ping=True for 健壮连接管理. Similarly, receiptwitness/queue/email.py:get_redis() was creating a new Redis connection on every call with no pooling. Share a ConnectionPool (max_connections=30) across all get_redis() callers. Fixes CAR-1078 Co-Authored-By: Paperclip --- common/src/cartsnitch_common/database.py | 27 +++++++++++++++--- .../src/receiptwitness/queue/email.py | 28 +++++++++++++++---- 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/common/src/cartsnitch_common/database.py b/common/src/cartsnitch_common/database.py index 76a4f35..ec27b6c 100644 --- a/common/src/cartsnitch_common/database.py +++ b/common/src/cartsnitch_common/database.py @@ -1,17 +1,36 @@ """Database engine and session factories for sync and async usage.""" from collections.abc import AsyncGenerator, Generator +from typing import TYPE_CHECKING from sqlalchemy import create_engine -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import Session, sessionmaker from cartsnitch_common.config import settings +if TYPE_CHECKING: + from sqlalchemy.engine import Engine -def get_async_engine(url: str | None = None): - """Create an async SQLAlchemy engine.""" - return create_async_engine(url or settings.database_url, echo=settings.debug) +# Module-level async engine cache — one engine per unique URL, shared across all callers. +# This prevents pool exhaustion in high-throughput workers (e.g. email-worker hitting +# DragonflyDB/Postgres repeatedly per message). pool_size=10, max_overflow=20 gives +# headroom for bursts while capping max connections at 30 per URL. +_async_engine_cache: dict[str, "AsyncEngine"] = {} + + +def get_async_engine(url: str | None = None) -> "AsyncEngine": + """Get or create a cached async engine for the given URL.""" + target = url or settings.database_url + if target not in _async_engine_cache: + _async_engine_cache[target] = create_async_engine( + target, + echo=settings.debug, + pool_size=10, + max_overflow=20, + pool_pre_ping=True, + ) + return _async_engine_cache[target] def get_sync_engine(url: str | None = None): diff --git a/receiptwitness/src/receiptwitness/queue/email.py b/receiptwitness/src/receiptwitness/queue/email.py index c76148e..28ff456 100644 --- a/receiptwitness/src/receiptwitness/queue/email.py +++ b/receiptwitness/src/receiptwitness/queue/email.py @@ -16,6 +16,29 @@ logger = logging.getLogger(__name__) STREAM_KEY = "email:receipts" CONSUMER_GROUP = "email-workers" +# Module-level Redis/DragonflyDB connection pool — shared across all worker calls. +# Without pooling, each call to get_redis() opens a new TCP connection. In a tight +# consumer loop this causes ConnectionResetError when DragonflyDB's connection limit +# is hit under load. max_connections=30 (10 base + 20 overflow) mirrors the engine pool. +_redis_pool: aioredis.ConnectionPool | None = None + + +def _get_redis_pool() -> aioredis.ConnectionPool: + """Get or create the shared DragonflyDB connection pool.""" + global _redis_pool + if _redis_pool is None: + _redis_pool = aioredis.ConnectionPool.from_url( + settings.redis_url, + decode_responses=True, + max_connections=30, + ) + return _redis_pool + + +async def get_redis() -> aioredis.Redis: + """Get async Redis/DragonflyDB client backed by a shared connection pool.""" + return aioredis.Redis(connection_pool=_get_redis_pool()) + @dataclass class EmailJob: @@ -31,11 +54,6 @@ class EmailJob: message_id: str # from email provider, for dedup -async def get_redis() -> aioredis.Redis: - """Get async Redis/DragonflyDB client.""" - return cast(aioredis.Redis, aioredis.from_url(settings.redis_url, decode_responses=True)) - - async def ensure_consumer_group(client: aioredis.Redis) -> None: """Create consumer group if it does not exist.""" try: