forked from cartsnitch/cartsnitch
Compare commits
26 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d92bcf433b | |||
| 01ed6dac00 | |||
| a7a55bbf79 | |||
| fb0bb0102c | |||
| d90b00d7ac | |||
| 8983fe5d8f | |||
| a26082d099 | |||
| c39b26050b | |||
| 6b6a50b9ec | |||
| 7c021c4eb5 | |||
| a5404dc824 | |||
| 618da593a6 | |||
| e3ed19f98c | |||
| e54736d900 | |||
| 40abf64888 | |||
| 3615a78f0e | |||
| d785606bd1 | |||
| 48eaf45121 | |||
| 4bf5cd3826 | |||
| a3fca65ea1 | |||
| 25c27d08fe | |||
| aaf645fbe9 | |||
| 80aa58b37a | |||
| 062f6be8ea | |||
| 60beb2d89e | |||
| 9120c834e4 |
+15
-19
@@ -26,11 +26,7 @@ jobs:
|
||||
lint:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "20"
|
||||
cache: npm
|
||||
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
|
||||
- run: npm ci
|
||||
- name: ESLint
|
||||
run: npx eslint .
|
||||
@@ -40,8 +36,8 @@ jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-node@v4
|
||||
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
|
||||
- uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020
|
||||
with:
|
||||
node-version: "20"
|
||||
cache: npm
|
||||
@@ -52,8 +48,8 @@ jobs:
|
||||
audit:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-node@v4
|
||||
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
|
||||
- uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020
|
||||
with:
|
||||
node-version: "20"
|
||||
cache: npm
|
||||
@@ -64,8 +60,8 @@ jobs:
|
||||
e2e:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-node@v4
|
||||
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
|
||||
- uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020
|
||||
with:
|
||||
node-version: "20"
|
||||
cache: npm
|
||||
@@ -77,8 +73,8 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
needs: [test]
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-node@v4
|
||||
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
|
||||
- uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020
|
||||
with:
|
||||
node-version: "20"
|
||||
cache: npm
|
||||
@@ -106,7 +102,7 @@ jobs:
|
||||
calver_tag: ${{ steps.calver.outputs.version }}
|
||||
sha_tag: sha-${{ github.sha }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
@@ -202,7 +198,7 @@ jobs:
|
||||
calver_tag: ${{ steps.calver.outputs.version }}
|
||||
sha_tag: sha-${{ github.sha }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
@@ -290,7 +286,7 @@ jobs:
|
||||
calver_tag: ${{ steps.calver.outputs.version }}
|
||||
sha_tag: sha-${{ github.sha }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
@@ -378,7 +374,7 @@ jobs:
|
||||
calver_tag: ${{ steps.calver.outputs.version }}
|
||||
sha_tag: sha-${{ github.sha }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
@@ -464,7 +460,7 @@ jobs:
|
||||
if: always() && !cancelled() && github.event_name == 'push' && (github.ref == 'refs/heads/dev' || github.ref == 'refs/heads/main')
|
||||
steps:
|
||||
- name: Checkout infra repo
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
|
||||
with:
|
||||
repository: cartsnitch/infra
|
||||
token: ${{ secrets.REGISTRY_TOKEN }}
|
||||
@@ -554,7 +550,7 @@ jobs:
|
||||
if: always() && !cancelled() && github.event_name == 'push' && (github.ref == 'refs/heads/uat' || github.ref == 'refs/heads/main')
|
||||
steps:
|
||||
- name: Checkout infra repo
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
|
||||
with:
|
||||
repository: cartsnitch/infra
|
||||
token: ${{ secrets.REGISTRY_TOKEN }}
|
||||
|
||||
@@ -1,17 +1,36 @@
|
||||
"""Database engine and session factories for sync and async usage."""
|
||||
|
||||
from collections.abc import AsyncGenerator, Generator
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from cartsnitch_common.config import settings
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
def get_async_engine(url: str | None = None):
|
||||
"""Create an async SQLAlchemy engine."""
|
||||
return create_async_engine(url or settings.database_url, echo=settings.debug)
|
||||
# Module-level async engine cache — one engine per unique URL, shared across all callers.
|
||||
# This prevents pool exhaustion in high-throughput workers (e.g. email-worker hitting
|
||||
# DragonflyDB/Postgres repeatedly per message). pool_size=10, max_overflow=20 gives
|
||||
# headroom for bursts while capping max connections at 30 per URL.
|
||||
_async_engine_cache: dict[str, "AsyncEngine"] = {}
|
||||
|
||||
|
||||
def get_async_engine(url: str | None = None) -> "AsyncEngine":
|
||||
"""Get or create a cached async engine for the given URL."""
|
||||
target = url or settings.database_url
|
||||
if target not in _async_engine_cache:
|
||||
_async_engine_cache[target] = create_async_engine(
|
||||
target,
|
||||
echo=settings.debug,
|
||||
pool_size=10,
|
||||
max_overflow=20,
|
||||
pool_pre_ping=True,
|
||||
)
|
||||
return _async_engine_cache[target]
|
||||
|
||||
|
||||
def get_sync_engine(url: str | None = None):
|
||||
|
||||
Generated
+290
-384
File diff suppressed because it is too large
Load Diff
+5
-3
@@ -45,14 +45,16 @@
|
||||
"typescript-eslint": "^8.56.1",
|
||||
"vite": "^6.4.2",
|
||||
"vite-plugin-pwa": "^0.21.2",
|
||||
"vitest": "^3.2.4"
|
||||
"vitest": "^4.1.8"
|
||||
},
|
||||
"overrides": {
|
||||
"@rollup/pluginutils": "5.3.0",
|
||||
"flatted": "^3.4.2",
|
||||
"serialize-javascript": "7.0.5",
|
||||
"brace-expansion": ">=1.1.13",
|
||||
"brace-expansion": ">=5.0.6",
|
||||
"lodash": ">=4.17.24",
|
||||
"minimatch": "^10.2.4"
|
||||
"minimatch": "^10.2.4",
|
||||
"@babel/plugin-transform-modules-systemjs": "^7.29.4",
|
||||
"fast-uri": "^3.1.2"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,29 @@ logger = logging.getLogger(__name__)
|
||||
STREAM_KEY = "email:receipts"
|
||||
CONSUMER_GROUP = "email-workers"
|
||||
|
||||
# Module-level Redis/DragonflyDB connection pool — shared across all worker calls.
|
||||
# Without pooling, each call to get_redis() opens a new TCP connection. In a tight
|
||||
# consumer loop this causes ConnectionResetError when DragonflyDB's connection limit
|
||||
# is hit under load. max_connections=30 (10 base + 20 overflow) mirrors the engine pool.
|
||||
_redis_pool: aioredis.ConnectionPool | None = None
|
||||
|
||||
|
||||
def _get_redis_pool() -> aioredis.ConnectionPool:
|
||||
"""Get or create the shared DragonflyDB connection pool."""
|
||||
global _redis_pool
|
||||
if _redis_pool is None:
|
||||
_redis_pool = aioredis.ConnectionPool.from_url(
|
||||
settings.redis_url,
|
||||
decode_responses=True,
|
||||
max_connections=30,
|
||||
)
|
||||
return _redis_pool
|
||||
|
||||
|
||||
async def get_redis() -> aioredis.Redis:
|
||||
"""Get async Redis/DragonflyDB client backed by a shared connection pool."""
|
||||
return aioredis.Redis(connection_pool=_get_redis_pool())
|
||||
|
||||
|
||||
@dataclass
|
||||
class EmailJob:
|
||||
@@ -31,11 +54,6 @@ class EmailJob:
|
||||
message_id: str # from email provider, for dedup
|
||||
|
||||
|
||||
async def get_redis() -> aioredis.Redis:
|
||||
"""Get async Redis/DragonflyDB client."""
|
||||
return cast(aioredis.Redis, aioredis.from_url(settings.redis_url, decode_responses=True))
|
||||
|
||||
|
||||
async def ensure_consumer_group(client: aioredis.Redis) -> None:
|
||||
"""Create consumer group if it does not exist."""
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user