Skip to content

Events

EventBus

Event bus used to dispatch events to subscribed callbacks.

Parameters:

Name Type Description Default
callbacks Iterable[Callable[[Event], Awaitable[None] | None]]

Callbacks to subscribe to the event bus.

None
Source code in src/aiosalesforce/events/event_bus.py
class EventBus:
    """
    Event bus used to dispatch events to subscribed callbacks.

    Parameters
    ----------
    callbacks : Iterable[Callable[[Event], Awaitable[None] | None]], optional
        Callbacks to subscribe to the event bus.

    """

    _callbacks: set[CallbackType]

    def __init__(self, callbacks: Iterable[CallbackType] | None = None) -> None:
        self._callbacks = set()
        for callback in callbacks or []:
            self.subscribe_callback(callback)

    def subscribe_callback(self, callback: CallbackType) -> None:
        """
        Subscribe a callback to the event bus.

        Parameters
        ----------
        callback : CallbackType
            Function to be called when an event is published.

        """
        self._callbacks.add(callback)

    def unsubscribe_callback(self, callback: CallbackType) -> None:
        """
        Unsubscribe a callback from the event bus.

        Parameters
        ----------
        callback : CallbackType
            Function to be unsubscribed.

        """
        self._callbacks.discard(callback)

    @staticmethod
    async def __dispatch_event_to_callback(
        callback: CallbackType, event: Event
    ) -> None:
        if inspect.iscoroutinefunction(callback):
            await callback(event)
        else:
            await asyncio.to_thread(callback, event)

    async def publish_event(self, event: Event) -> None:
        """Publish an event and dispatch it to all subscribed callbacks."""
        async with asyncio.TaskGroup() as tg:
            for callback in self._callbacks:
                tg.create_task(self.__dispatch_event_to_callback(callback, event))

subscribe_callback(callback)

Subscribe a callback to the event bus.

Parameters:

Name Type Description Default
callback CallbackType

Function to be called when an event is published.

required
Source code in src/aiosalesforce/events/event_bus.py
def subscribe_callback(self, callback: CallbackType) -> None:
    """
    Subscribe a callback to the event bus.

    Parameters
    ----------
    callback : CallbackType
        Function to be called when an event is published.

    """
    self._callbacks.add(callback)

unsubscribe_callback(callback)

Unsubscribe a callback from the event bus.

Parameters:

Name Type Description Default
callback CallbackType

Function to be unsubscribed.

required
Source code in src/aiosalesforce/events/event_bus.py
def unsubscribe_callback(self, callback: CallbackType) -> None:
    """
    Unsubscribe a callback from the event bus.

    Parameters
    ----------
    callback : CallbackType
        Function to be unsubscribed.

    """
    self._callbacks.discard(callback)

publish_event(event) async

Publish an event and dispatch it to all subscribed callbacks.

Source code in src/aiosalesforce/events/event_bus.py
async def publish_event(self, event: Event) -> None:
    """Publish an event and dispatch it to all subscribed callbacks."""
    async with asyncio.TaskGroup() as tg:
        for callback in self._callbacks:
            tg.create_task(self.__dispatch_event_to_callback(callback, event))

Event dataclass

Base class for all events.

Source code in src/aiosalesforce/events/events.py
@dataclass
class Event:
    """Base class for all events."""

    type: Literal[
        "request",
        "response",
        "retry",
        "rest_api_call_consumption",
        "bulk_api_batch_consumption",
    ]

RequestEvent dataclass

Bases: Event

Emitted before a request is sent for the first time.

Source code in src/aiosalesforce/events/events.py
@dataclass
class RequestEvent(Event):
    """Emitted before a request is sent for the first time."""

    type: Literal["request"]
    request: Request

RetryEvent dataclass

Bases: Event, ResponseMixin

Emitted immediately before a request is retried.

Source code in src/aiosalesforce/events/events.py
@dataclass
class RetryEvent(Event, ResponseMixin):
    """Emitted immediately before a request is retried."""

    type: Literal["retry"]
    attempt: int
    request: Request
    response: Response | None = None
    exception: Exception | None = None

ResponseEvent dataclass

Bases: Event, ResponseMixin

Emitted after an OK (status code < 300) response is received.

Source code in src/aiosalesforce/events/events.py
@dataclass
class ResponseEvent(Event, ResponseMixin):
    """Emitted after an OK (status code < 300) response is received."""

    type: Literal["response"]
    response: Response

RestApiCallConsumptionEvent dataclass

Bases: Event, ResponseMixin

Emitted after a REST API call is consumed.

Source code in src/aiosalesforce/events/events.py
@dataclass
class RestApiCallConsumptionEvent(Event, ResponseMixin):
    """Emitted after a REST API call is consumed."""

    type: Literal["rest_api_call_consumption"]
    response: Response
    count: int

BulkApiBatchConsumptionEvent dataclass

Bases: Event, ResponseMixin

Emitted after a Bulk API batch is consumed.

Source code in src/aiosalesforce/events/events.py
@dataclass
class BulkApiBatchConsumptionEvent(Event, ResponseMixin):
    """Emitted after a Bulk API batch is consumed."""

    type: Literal["bulk_api_batch_consumption"]
    response: Response
    count: int

ResponseMixin

Mixin class providing properties for events which may have response.

Source code in src/aiosalesforce/events/events.py
class ResponseMixin:
    """Mixin class providing properties for events which may have response."""

    response: Response | None

    @property
    def consumed(self) -> int | None:
        """Number of API calls consumed in the current 24-hour period."""
        return self.__api_usage[0]

    @property
    def remaining(self) -> int | None:
        """Number of API calls remaining in the current 24-hour period."""
        return self.__api_usage[1]

    @cached_property
    def __api_usage(self) -> tuple[int, int] | tuple[None, None]:
        if self.response is None:
            return (None, None)
        try:
            match_ = re.fullmatch(
                r"^api-usage=(\d+)/(\d+)$",
                str(self.response.headers["Sforce-Limit-Info"]).strip(),
            )
        except KeyError:
            return (None, None)
        if match_ is None:  # pragma: no cover
            return (None, None)
        consumed, remaining = match_.groups()
        return int(consumed), int(remaining)

consumed: int | None property

Number of API calls consumed in the current 24-hour period.

remaining: int | None property

Number of API calls remaining in the current 24-hour period.