Skip to content

Retries

RetryPolicy

Bases: RetryBase

Policy for retrying requests.

Parameters:

Name Type Description Default
response_rules list[ResponseRule]

Rules for retrying requests based on their responses.

None
exception_rules list[ExceptionRule]

Rules for retrying requests after an exception.

None
max_retries int

Maximum total number of retries. By default 3.

3
timeout float

Maximum time to retry. By default 60 seconds. Timeout is best effort and may exceed the specified time by up to backoff_max seconds.

60
backoff_base float

Base time to sleep between retries. By default 0.5 seconds.

0.5
backoff_factor float

Factor to increase sleep time between retries. By default 2.

2
backoff_max float

Maximum time to sleep between retries. By default 10 seconds.

10
backoff_jitter bool

If True, adds jitter to sleep time. By default True.

True
Source code in src/aiosalesforce/retries/policy.py
class RetryPolicy(RetryBase):
    """
    Policy for retrying requests.

    Parameters
    ----------
    response_rules : list[ResponseRule], optional
        Rules for retrying requests based on their responses.
    exception_rules : list[ExceptionRule], optional
        Rules for retrying requests after an exception.
    max_retries : int, optional
        Maximum total number of retries. By default 3.
    timeout : float, optional
        Maximum time to retry. By default 60 seconds.
        Timeout is best effort and may exceed the specified time
        by up to `backoff_max` seconds.
    backoff_base : float, optional
        Base time to sleep between retries. By default 0.5 seconds.
    backoff_factor : float, optional
        Factor to increase sleep time between retries. By default 2.
    backoff_max : float, optional
        Maximum time to sleep between retries. By default 10 seconds.
    backoff_jitter : bool, optional
        If True, adds jitter to sleep time. By default True.

    """

    def create_context(self) -> RetryContext:
        """
        Create a new retry context.

        Retry context is used to handle retries for a single request.

        """
        return RetryContext(
            response_rules=self.response_rules,
            exception_rules=self.exception_rules,
            max_retries=self.max_retries,
            timeout=self.timeout,
            backoff_base=self.backoff_base,
            backoff_factor=self.backoff_factor,
            backoff_max=self.backoff_max,
            backoff_jitter=self.backoff_jitter,
        )

create_context()

Create a new retry context.

Retry context is used to handle retries for a single request.

Source code in src/aiosalesforce/retries/policy.py
def create_context(self) -> RetryContext:
    """
    Create a new retry context.

    Retry context is used to handle retries for a single request.

    """
    return RetryContext(
        response_rules=self.response_rules,
        exception_rules=self.exception_rules,
        max_retries=self.max_retries,
        timeout=self.timeout,
        backoff_base=self.backoff_base,
        backoff_factor=self.backoff_factor,
        backoff_max=self.backoff_max,
        backoff_jitter=self.backoff_jitter,
    )

ExceptionRule

Bases: Generic[E]

Rule for deciding if a request should be retried after an exception.

Parameters:

Name Type Description Default
exc_type type[Exception]

Type of exception to retry.

required
func Callable[[Exception], Awaitable[bool] | bool] | None

Function or coroutine to determine if the request should be retried. By default the provided exception is always retried.

None
max_retries int

Maximum number of retries. By default 3.

3
Source code in src/aiosalesforce/retries/rules.py
class ExceptionRule(Generic[E]):
    """
    Rule for deciding if a request should be retried after an exception.

    Parameters
    ----------
    exc_type : type[Exception]
        Type of exception to retry.
    func : Callable[[Exception], Awaitable[bool]  |  bool] | None, optional
        Function or coroutine to determine if the request should be retried.
        By default the provided exception is always retried.
    max_retries : int, optional
        Maximum number of retries. By default 3.

    """

    exception_type: type[E]
    func: Callable[[E], Awaitable[bool] | bool]
    max_retries: int

    def __init__(
        self,
        exc_type: type[E],
        func: Callable[[E], Awaitable[bool] | bool] | None = None,
        /,
        max_retries: int = 3,
    ) -> None:
        if issubclass(exc_type, SalesforceError):
            raise ValueError(
                "aiosalesforce exceptions cannot be retried by aiosalesforce because "
                "they are raised at the end of the request lifecycle - after all "
                "retries have been exhausted. This could lead to an infinite loop. "
                "If you need to retry aiosalesforce exceptions, consider using "
                "ResponseRule instead."
            )
        if exc_type is Exception:
            raise ValueError("Retrying built-in Exception is not allowed.")
        self.exception_type = exc_type
        self.func = func or (lambda _: True)
        self.max_retries = max_retries

    async def should_retry(self, exc: E) -> bool:
        """
        Determine if the request should be retried.

        Parameters
        ----------
        exc : Exception
            Exception from the request.

        Returns
        -------
        bool
            True if the request should be retried, False otherwise.

        """
        if not isinstance(exc, self.exception_type):
            return False
        if inspect.iscoroutinefunction(self.func):
            return await self.func(exc)
        else:
            return await asyncio.to_thread(
                self.func,  # type: ignore
                exc,
            )

should_retry(exc) async

Determine if the request should be retried.

Parameters:

Name Type Description Default
exc Exception

Exception from the request.

required

Returns:

Type Description
bool

True if the request should be retried, False otherwise.

Source code in src/aiosalesforce/retries/rules.py
async def should_retry(self, exc: E) -> bool:
    """
    Determine if the request should be retried.

    Parameters
    ----------
    exc : Exception
        Exception from the request.

    Returns
    -------
    bool
        True if the request should be retried, False otherwise.

    """
    if not isinstance(exc, self.exception_type):
        return False
    if inspect.iscoroutinefunction(self.func):
        return await self.func(exc)
    else:
        return await asyncio.to_thread(
            self.func,  # type: ignore
            exc,
        )

ResponseRule

Rule for deciding if a request should be retried based its response.

Parameters:

Name Type Description Default
func Callable[[Response], Awaitable[bool] | bool]

Function or coroutine to determine if the request should be retried.

required
max_retries int

Maximum number of retries. By default 3.

3
Source code in src/aiosalesforce/retries/rules.py
class ResponseRule:
    """
    Rule for deciding if a request should be retried based its response.

    Parameters
    ----------
    func : Callable[[Response], Awaitable[bool] | bool]
        Function or coroutine to determine if the request should be retried.
    max_retries : int, optional
        Maximum number of retries. By default 3.

    """

    func: Callable[[Response], Awaitable[bool] | bool]
    max_retries: int

    def __init__(
        self,
        func: Callable[[Response], Awaitable[bool] | bool],
        /,
        max_retries: int = 3,
    ) -> None:
        self.func = func
        self.max_retries = max_retries

    async def should_retry(self, response: Response) -> bool:
        """
        Determine if the request should be retried.

        Parameters
        ----------
        response : Response
            Response from the request.

        Returns
        -------
        bool
            True if the request should be retried, False otherwise.

        """
        if inspect.iscoroutinefunction(self.func):
            return await self.func(response)
        else:
            return await asyncio.to_thread(
                self.func,  # type: ignore
                response,
            )

should_retry(response) async

Determine if the request should be retried.

Parameters:

Name Type Description Default
response Response

Response from the request.

required

Returns:

Type Description
bool

True if the request should be retried, False otherwise.

Source code in src/aiosalesforce/retries/rules.py
async def should_retry(self, response: Response) -> bool:
    """
    Determine if the request should be retried.

    Parameters
    ----------
    response : Response
        Response from the request.

    Returns
    -------
    bool
        True if the request should be retried, False otherwise.

    """
    if inspect.iscoroutinefunction(self.func):
        return await self.func(response)
    else:
        return await asyncio.to_thread(
            self.func,  # type: ignore
            response,
        )

RetryContext

Bases: RetryBase

Context for handling retries for a single request.

Source code in src/aiosalesforce/retries/policy.py
class RetryContext(RetryBase):
    """Context for handling retries for a single request."""

    __slots__ = ("start", "retry_count")

    start: float
    retry_count: RetryCounts

    def __init__(
        self,
        response_rules: list[ResponseRule],
        exception_rules: list[ExceptionRule],
        max_retries: int,
        timeout: float,
        backoff_base: float,
        backoff_factor: float,
        backoff_max: float,
        backoff_jitter: bool,
    ) -> None:
        super().__init__(
            response_rules=response_rules,
            exception_rules=exception_rules,
            max_retries=max_retries,
            timeout=timeout,
            backoff_base=backoff_base,
            backoff_factor=backoff_factor,
            backoff_max=backoff_max,
            backoff_jitter=backoff_jitter,
        )

        self.start = time.time()
        self.retry_count = {
            "total": 0,
            "response": {rule: 0 for rule in self.response_rules},
            "exception": {rule: 0 for rule in self.exception_rules},
        }

    async def send_request_with_retries(
        self,
        httpx_client: AsyncClient,
        event_bus: EventBus,
        semaphore: asyncio.Semaphore,
        request: Request,
    ) -> Response:
        """
        Send a request and retry it if necessary in accordance with the retry policy.

        Does not guarantee that the returned response is OK (status code < 300),
        only that the request was retried according to the policy.

        Emits the following events:\n
        * RetryEvent if the request is retried
        * RestApiCallConsumptionEvent for each request
            that did not raise an exception

        Parameters
        ----------
        httpx_client : AsyncClient
            HTTP client to send the request.
        event_bus : EventBus
            Event bus to publish events.
        semaphore : asyncio.Semaphore
            Semaphore to limit the number of simultaneous requests.
        request : Request
            Request to send.

        Returns
        -------
        Response
            Response from the request.
            Not guaranteed to be OK (status code < 300).

        """
        while True:
            try:
                async with semaphore:
                    response = await httpx_client.send(request)
            except Exception as exc:
                if await self.should_retry(exc):
                    await asyncio.gather(
                        self.sleep(),
                        event_bus.publish_event(
                            RetryEvent(
                                type="retry",
                                attempt=self.retry_count["total"],
                                request=request,
                                exception=exc,
                            )
                        ),
                    )
                    continue
                raise
            await event_bus.publish_event(
                RestApiCallConsumptionEvent(
                    type="rest_api_call_consumption",
                    response=response,
                    count=1,
                )
            )
            if not response.is_success and await self.should_retry(response):
                await asyncio.gather(
                    self.sleep(),
                    event_bus.publish_event(
                        RetryEvent(
                            type="retry",
                            attempt=self.retry_count["total"],
                            request=request,
                            response=response,
                        )
                    ),
                )
                continue
            return response

    async def should_retry(self, value: Response | Exception) -> bool:
        """
        Determine if the request should be retried.

        If the request should be retried, the total retry count and the retry count
        for the rule responsible for the retry are incremented.

        Parameters
        ----------
        value : Response | Exception
            Response or Exception from the request.

        Returns
        -------
        bool
            True if the request should be retried, False otherwise.

        """
        if (
            self.retry_count["total"] >= self.max_retries
            or time.time() - self.start > self.timeout
        ):
            return False
        condition: bool = False
        match value:
            case Response():
                condition = await self.__evaluate_response_rules(value)
            case Exception():
                condition = await self.__evaluate_exception_rules(value)
            case _:  # pragma: no cover
                raise TypeError("Value must be a Response or an Exception")
        if condition:
            self.retry_count["total"] += 1
        return condition

    async def __evaluate_response_rules(self, response: Response) -> bool:
        for rule in self.response_rules:
            if self.retry_count["response"][
                rule
            ] < rule.max_retries and await rule.should_retry(response):
                self.retry_count["response"][rule] += 1
                return True
        return False

    async def __evaluate_exception_rules(self, exception: Exception) -> bool:
        for rule in self.exception_rules:
            if self.retry_count["exception"][rule] >= rule.max_retries:
                return False
            if await rule.should_retry(exception):
                self.retry_count["exception"][rule] += 1
                return True
        return False

    async def sleep(self) -> None:
        """Sleep between retries based on the backoff policy."""
        # (total - 1) because this is called after incrementing the total count
        sleep_time = min(
            self.backoff_base
            * (self.backoff_factor ** (self.retry_count["total"] - 1)),
            self.backoff_max,
        )
        if self.backoff_jitter:
            sleep_time = random.uniform(0, sleep_time)  # noqa: S311
        await asyncio.sleep(sleep_time)

send_request_with_retries(httpx_client, event_bus, semaphore, request) async

Send a request and retry it if necessary in accordance with the retry policy.

Does not guarantee that the returned response is OK (status code < 300), only that the request was retried according to the policy.

Emits the following events:

  • RetryEvent if the request is retried
  • RestApiCallConsumptionEvent for each request that did not raise an exception

Parameters:

Name Type Description Default
httpx_client AsyncClient

HTTP client to send the request.

required
event_bus EventBus

Event bus to publish events.

required
semaphore Semaphore

Semaphore to limit the number of simultaneous requests.

required
request Request

Request to send.

required

Returns:

Type Description
Response

Response from the request. Not guaranteed to be OK (status code < 300).

Source code in src/aiosalesforce/retries/policy.py
async def send_request_with_retries(
    self,
    httpx_client: AsyncClient,
    event_bus: EventBus,
    semaphore: asyncio.Semaphore,
    request: Request,
) -> Response:
    """
    Send a request and retry it if necessary in accordance with the retry policy.

    Does not guarantee that the returned response is OK (status code < 300),
    only that the request was retried according to the policy.

    Emits the following events:\n
    * RetryEvent if the request is retried
    * RestApiCallConsumptionEvent for each request
        that did not raise an exception

    Parameters
    ----------
    httpx_client : AsyncClient
        HTTP client to send the request.
    event_bus : EventBus
        Event bus to publish events.
    semaphore : asyncio.Semaphore
        Semaphore to limit the number of simultaneous requests.
    request : Request
        Request to send.

    Returns
    -------
    Response
        Response from the request.
        Not guaranteed to be OK (status code < 300).

    """
    while True:
        try:
            async with semaphore:
                response = await httpx_client.send(request)
        except Exception as exc:
            if await self.should_retry(exc):
                await asyncio.gather(
                    self.sleep(),
                    event_bus.publish_event(
                        RetryEvent(
                            type="retry",
                            attempt=self.retry_count["total"],
                            request=request,
                            exception=exc,
                        )
                    ),
                )
                continue
            raise
        await event_bus.publish_event(
            RestApiCallConsumptionEvent(
                type="rest_api_call_consumption",
                response=response,
                count=1,
            )
        )
        if not response.is_success and await self.should_retry(response):
            await asyncio.gather(
                self.sleep(),
                event_bus.publish_event(
                    RetryEvent(
                        type="retry",
                        attempt=self.retry_count["total"],
                        request=request,
                        response=response,
                    )
                ),
            )
            continue
        return response

should_retry(value) async

Determine if the request should be retried.

If the request should be retried, the total retry count and the retry count for the rule responsible for the retry are incremented.

Parameters:

Name Type Description Default
value Response | Exception

Response or Exception from the request.

required

Returns:

Type Description
bool

True if the request should be retried, False otherwise.

Source code in src/aiosalesforce/retries/policy.py
async def should_retry(self, value: Response | Exception) -> bool:
    """
    Determine if the request should be retried.

    If the request should be retried, the total retry count and the retry count
    for the rule responsible for the retry are incremented.

    Parameters
    ----------
    value : Response | Exception
        Response or Exception from the request.

    Returns
    -------
    bool
        True if the request should be retried, False otherwise.

    """
    if (
        self.retry_count["total"] >= self.max_retries
        or time.time() - self.start > self.timeout
    ):
        return False
    condition: bool = False
    match value:
        case Response():
            condition = await self.__evaluate_response_rules(value)
        case Exception():
            condition = await self.__evaluate_exception_rules(value)
        case _:  # pragma: no cover
            raise TypeError("Value must be a Response or an Exception")
    if condition:
        self.retry_count["total"] += 1
    return condition

sleep() async

Sleep between retries based on the backoff policy.

Source code in src/aiosalesforce/retries/policy.py
async def sleep(self) -> None:
    """Sleep between retries based on the backoff policy."""
    # (total - 1) because this is called after incrementing the total count
    sleep_time = min(
        self.backoff_base
        * (self.backoff_factor ** (self.retry_count["total"] - 1)),
        self.backoff_max,
    )
    if self.backoff_jitter:
        sleep_time = random.uniform(0, sleep_time)  # noqa: S311
    await asyncio.sleep(sleep_time)