functions - useful functions¶
Various useful functions and decorators.
- async retry(func, args: tuple = None, kws: dict = None, *, exec_timeout: int = None, retries: int = 1, retry_timeout: float = 0.5, multiplier: float = 0.1, max_retry_timeout: float = 10.0, exception_classes: Collection[type[Exception]] = RETRY_EXCEPTION_CLASSES, logger=None)[source]¶
Repeat an asynchronous operation if a specific exception occurs.
- Parameters:
func – async callable
args – function arguments
kws – function keyword arguments
exec_timeout – exec timeout (None for no timeout) for each function call
retries – max number of retries, 0 for infinite retries
retry_timeout – time between consequent tries
max_retry_timeout – max time between consequent tries
multiplier – retry_timeout multiplier for each try, the formula:
exception_classes – exception classes that the retry function should catch and retry
logger – you may pass a logger object to log tries
- Returns:
function result
- Raises:
StopIteration – if max number of retries reached and no exception was stored (rare)
A formula for wait time is sophisticated with increased wait time at each iteration to prevent spamming.
wait_time = min(max_retry_timeout, retry_timeout * (1 + multiplier)**n)
By default, it will catch and retry timeout errors, cancelled asyncio tasks and all errors subclassed from
RetryException.How to use the retry function:
async def call_something_async(a, b, c): ... await retry(call_something_async, (1, 2, 3), retries=10)
- retry_(**retry_params)[source]¶
Wrap a function in a retry function (decorator).
- Parameters:
retry_params – args for
retry()
Usage:
@retry_(retries=1) async def call_something_async(a, b, c): ...
- class RetryException[source]¶
Bases:
ExceptionBase class for retry catchable exception.
You may inherit an exception from this class to tell the retry function that your exception should be catchable.
- __new__(**kwargs)¶
- get_short_uid(n: int = 5) str[source]¶
Get a short uid string.
- Parameters:
n – unicode length
- Returns:
an uid hex string, n x 2 length
- timeout(t: float, /)[source]¶
Run asynchronous tasks with a timeout.
- Parameters:
t – timeout in seconds
It creates an async context block, so async calls inside this block must finish before the specified time.
async with timeout(1000): await do_something_asynchronous()
- not_implemented(message: str = None, /)[source]¶
Decorate a not implemented method or function so it raises NotImplementedError when called.
- Parameters:
message – optional message for NotImplementedError
Usage:
@not_implemented('This method is disabled.') async def call_something(self): ...
- async async_run_in_thread(f, args: tuple = None, kws: dict = None, max_timeout: float = None)[source]¶
Run a synchronous function in a separate thread as an async function.
- Parameters:
f – callable object
args – function arguments
kws – function keyword arguments
max_timeout – max execution time in seconds (None for no limit)
- Returns:
function result
- Raises:
ConcurrentTimeoutError – on execution timeout