Files
cartsnitch-fork-test/common/src/cartsnitch_common/events.py
T

29 lines
744 B
Python

"""Event bus helpers for Redis pub/sub."""
from datetime import UTC, datetime
from typing import Any, cast
from redis import Redis
from cartsnitch_common.constants import EventType
from cartsnitch_common.schemas.events import EventEnvelope
def publish_event(
redis_client: Redis,
event_type: EventType,
service: str,
payload: dict[str, Any],
) -> int:
"""Publish an event to the Redis pub/sub channel.
Returns the number of subscribers that received the message.
"""
envelope = EventEnvelope(
event_type=event_type,
timestamp=datetime.now(UTC),
service=service,
payload=payload,
)
return cast(int, redis_client.publish(event_type.value, envelope.model_dump_json()))