forked from cartsnitch/cartsnitch
2460a00d4e
- Refactor database.py to use init_db()/close_db() lifecycle - Add create_db_engine() with pool_size=10, max_overflow=20, pool_pre_ping=True - Replace cache.py stub with real Redis client using redis.asyncio - Implement init_redis()/close_redis() with graceful error handling - Replace no-op lifespan in main.py with proper startup/shutdown - Enhance health endpoint to check DB and Redis connectivity - Add tests for database, cache, and health endpoint lifecycle Co-Authored-By: Paperclip <noreply@paperclip.ing>
61 lines
1.6 KiB
Python
61 lines
1.6 KiB
Python
"""Database session management for the API gateway."""
|
|
|
|
from collections.abc import AsyncGenerator
|
|
from typing import TYPE_CHECKING
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
|
|
from cartsnitch_api.config import settings
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.engine import Engine
|
|
|
|
|
|
_engine: "Engine | None" = None
|
|
async_session_factory: async_sessionmaker[AsyncSession] | None = None
|
|
|
|
|
|
def create_db_engine():
|
|
return create_async_engine(
|
|
settings.database_url,
|
|
pool_size=10,
|
|
max_overflow=20,
|
|
pool_pre_ping=True,
|
|
pool_recycle=3600,
|
|
echo=False,
|
|
)
|
|
|
|
|
|
async def init_db() -> None:
|
|
global _engine, async_session_factory
|
|
_engine = create_db_engine()
|
|
async_session_factory = async_sessionmaker(_engine, class_=AsyncSession, expire_on_commit=False)
|
|
|
|
|
|
async def close_db() -> None:
|
|
global _engine, async_session_factory
|
|
if _engine is not None:
|
|
await _engine.dispose()
|
|
_engine = None
|
|
async_session_factory = None
|
|
|
|
|
|
def get_engine():
|
|
return _engine
|
|
|
|
|
|
async def get_db() -> AsyncGenerator[AsyncSession, None]:
|
|
if async_session_factory is None:
|
|
raise RuntimeError("Database not initialized. Call init_db() first.")
|
|
async with async_session_factory() as session:
|
|
yield session
|
|
|
|
|
|
# Backward compatibility: module-level engine proxy that delegates to _engine
|
|
def __getattr__(name: str):
|
|
if name == "engine":
|
|
if _engine is None:
|
|
raise RuntimeError("Database not initialized. Call init_db() first.")
|
|
return _engine
|
|
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
|