forked from cartsnitch/cartsnitch
fb0bb0102c
email_worker calls get_async_session_factory() inside every resolve_user() call, which creates a brand-new async engine (and thus a brand-new connection pool) on every message. In a tight consumer loop processing 5 messages per batch, this rapidly exhausts DragonflyDB/Postgres connection limits and manifests as ConnectionResetError. Fix: cache the async engine in a module-level dict keyed by URL in cartsnitch_common.database:get_async_engine(), matching the pattern already used in receiptwitness:events.py for the Redis connection pool. Also add pool_size=10, max_overflow=20, pool_pre_ping=True for 健壮连接管理. Similarly, receiptwitness/queue/email.py:get_redis() was creating a new Redis connection on every call with no pooling. Share a ConnectionPool (max_connections=30) across all get_redis() callers. Fixes CAR-1078 Co-Authored-By: Paperclip <noreply@paperclip.ing>
65 lines
2.4 KiB
Python
65 lines
2.4 KiB
Python
"""Database engine and session factories for sync and async usage."""
|
|
|
|
from collections.abc import AsyncGenerator, Generator
|
|
from typing import TYPE_CHECKING
|
|
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
|
|
from sqlalchemy.orm import Session, sessionmaker
|
|
|
|
from cartsnitch_common.config import settings
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.engine import Engine
|
|
|
|
# Module-level async engine cache — one engine per unique URL, shared across all callers.
|
|
# This prevents pool exhaustion in high-throughput workers (e.g. email-worker hitting
|
|
# DragonflyDB/Postgres repeatedly per message). pool_size=10, max_overflow=20 gives
|
|
# headroom for bursts while capping max connections at 30 per URL.
|
|
_async_engine_cache: dict[str, "AsyncEngine"] = {}
|
|
|
|
|
|
def get_async_engine(url: str | None = None) -> "AsyncEngine":
|
|
"""Get or create a cached async engine for the given URL."""
|
|
target = url or settings.database_url
|
|
if target not in _async_engine_cache:
|
|
_async_engine_cache[target] = create_async_engine(
|
|
target,
|
|
echo=settings.debug,
|
|
pool_size=10,
|
|
max_overflow=20,
|
|
pool_pre_ping=True,
|
|
)
|
|
return _async_engine_cache[target]
|
|
|
|
|
|
def get_sync_engine(url: str | None = None):
|
|
"""Create a sync SQLAlchemy engine."""
|
|
return create_engine(url or settings.database_url_sync, echo=settings.debug)
|
|
|
|
|
|
def get_async_session_factory(url: str | None = None) -> async_sessionmaker[AsyncSession]:
|
|
"""Create an async session factory."""
|
|
engine = get_async_engine(url)
|
|
return async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
|
|
|
|
|
def get_sync_session_factory(url: str | None = None) -> sessionmaker[Session]:
|
|
"""Create a sync session factory."""
|
|
engine = get_sync_engine(url)
|
|
return sessionmaker(engine, expire_on_commit=False)
|
|
|
|
|
|
async def get_async_session(url: str | None = None) -> AsyncGenerator[AsyncSession, None]:
|
|
"""Dependency for async session injection."""
|
|
factory = get_async_session_factory(url)
|
|
async with factory() as session:
|
|
yield session
|
|
|
|
|
|
def get_sync_session(url: str | None = None) -> Generator[Session, None, None]:
|
|
"""Dependency for sync session injection."""
|
|
factory = get_sync_session_factory(url)
|
|
with factory() as session:
|
|
yield session
|