"""Commonly used functions."""
import asyncio
import ctypes
import os
from asyncio import sleep
from binascii import b2a_hex
from collections.abc import Collection, Coroutine
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError as ConcurrentTimeoutError
from functools import partial
from secrets import randbits
from typing import TypedDict
from uuid import UUID
__all__ = [
"retry",
"retry_",
"RETRY_EXCEPTION_CLASSES",
"terminate_thread",
"async_run_in_thread",
"async_",
"RetryParams",
"secure_uuid",
"not_implemented",
"timeout",
"get_short_uid",
"RetryException",
"suppress_exception",
]
[docs]class RetryException(Exception):
"""Base class for retry catchable exception.
You may inherit an exception from this class to tell the retry function that your exception should
be catchable.
"""
RETRY_EXCEPTION_CLASSES = frozenset(
[
ConnectionError,
TimeoutError,
ConcurrentTimeoutError,
asyncio.TimeoutError,
asyncio.CancelledError,
RetryException,
]
) #: default catchable exception classes for :py:func:`~kaiju_tools.functions.retry`
SUPRESS_EXCEPTION_CLASSES = tuple(
RETRY_EXCEPTION_CLASSES
) #: default catchable exception classes for :py:func:`~kaiju_tools.functions.supress_exception`
[docs]def get_short_uid(n: int = 5) -> str:
"""Get a short uid string.
:param n: unicode length
:returns: an uid hex string, n x 2 length
"""
return b2a_hex(os.urandom(n)).decode()
class RetryParams(TypedDict, total=False):
"""Parameters for the retry function."""
exec_timeout: int
retries: int
retry_timeout: float
multiplier: float
max_retry_timeout: float
exception_classes: Collection[str | Exception]
async def suppress_exception(
coro: Coroutine,
*,
exception_classes: tuple[type[Exception], ...] = SUPRESS_EXCEPTION_CLASSES,
exec_timeout: int = None,
default=None,
logger=None,
):
"""Suppress a coroutine exception and write a log.
:param coro: called coroutine
:param exception_classes: exception classes that the function will supress
:param exec_timeout: optional execution timeout
:param default: default value to return on suppressed exception
:param logger: logger instance, otherwise print is used
:return: coroutine result or default
How to use it:
.. code-block:: python
async def get_from_cache(key) -> Any:
...
cached = await supress_exception(get_from_cache('123')) # will return value or None
"""
try:
if exec_timeout:
async with timeout(exec_timeout):
return await coro
else:
return await coro
except Exception as exc:
if isinstance(exc, exception_classes):
if logger:
logger.error(str(exc), exc_info=exc)
else:
print(exc)
return default
[docs]async def retry(
func,
args: tuple = None,
kws: dict = None,
*,
exec_timeout: int = None,
retries: int = 1,
retry_timeout: float = 0.5,
multiplier: float = 0.1,
max_retry_timeout: float = 10.0,
exception_classes: Collection[type[Exception]] = RETRY_EXCEPTION_CLASSES,
logger=None,
):
"""Repeat an asynchronous operation if a specific exception occurs.
:param func: async callable
:param args: function arguments
:param kws: function keyword arguments
:param exec_timeout: exec timeout (None for no timeout) for each function call
:param retries: max number of retries, 0 for infinite retries
:param retry_timeout: time between consequent tries
:param max_retry_timeout: max time between consequent tries
:param multiplier: retry_timeout multiplier for each try, the formula:
:param exception_classes: exception classes that the retry function should catch and retry
:param logger: you may pass a logger object to log tries
:returns: function result
:raises StopIteration: if max number of retries reached and no exception was stored (rare)
A formula for wait time is sophisticated with increased wait time at each iteration to prevent spamming.
.. code-block:: python
wait_time = min(max_retry_timeout, retry_timeout * (1 + multiplier)**n)
By default, it will catch and retry timeout errors, cancelled asyncio tasks and all errors subclassed from
:py:class:`~kaiju_tools.functions.RetryException`.
How to use the retry function:
.. code-block:: python
async def call_something_async(a, b, c):
...
await retry(call_something_async, (1, 2, 3), retries=10)
"""
exc = None
modifier = 1.0 + multiplier
if args is None:
args = tuple()
if kws is None:
kws = {}
if retries <= 0:
retries = float("Inf")
while retries:
try:
if exec_timeout:
async with timeout(exec_timeout):
result = await func(*args, **kws)
else:
result = await func(*args, **kws)
except Exception as err:
if err.__class__ in exception_classes:
if logger:
logger.info("Retrying: %s", err)
exc = err
await sleep(retry_timeout)
retry_timeout = min(max_retry_timeout, retry_timeout * modifier)
retries -= 1
continue
raise
return result
if exc:
raise exc
raise StopIteration
[docs]def retry_(**retry_params):
"""Wrap a function in a retry function (decorator).
:param retry_params: args for :py:func:`~kaiju_tools.functions.retry`
Usage:
.. code-block:: python
@retry_(retries=1)
async def call_something_async(a, b, c):
...
"""
def wrapper(func):
def retry_func(*args, **kws):
return retry(func, args=args, kws=kws, **retry_params)
return retry_func
return wrapper
[docs]async def async_run_in_thread(f, args: tuple = None, kws: dict = None, max_timeout: float = None):
"""Run a synchronous function in a separate thread as an async function.
:param f: callable object
:param args: function arguments
:param kws: function keyword arguments
:param max_timeout: max execution time in seconds (None for no limit)
:return: function result
:raises ConcurrentTimeoutError: on execution timeout
"""
loop = asyncio.get_event_loop()
if args is None:
args = tuple()
if kws is None:
kws = {}
f = partial(f, *args, **kws)
with ThreadPoolExecutor(max_workers=1) as tp:
future = loop.run_in_executor(tp, f, *tuple())
try:
if max_timeout:
async with timeout(max_timeout):
result = await future
else:
result = await future
except ConcurrentTimeoutError:
tp.shutdown(wait=False)
for t in tp._threads: # noqa # pylint: disable=all
terminate_thread(t) # noqa: reasonable
raise
else:
return result
[docs]def async_(__f):
"""Wrap a synchronous function in an async thread (decorator)."""
def _wrapper(*args, **kws):
return async_run_in_thread(__f, *args, **kws)
return _wrapper
def terminate_thread(__thread):
"""Terminate a python thread from another thread.
Found it on stack overflow as an only real way to stop a stuck python thread.
https://code.activestate.com/recipes/496960-thread2-killable-threads/
Use with caution.
"""
if not __thread.isAlive():
return
exc = ctypes.py_object(SystemExit)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(__thread.ident), exc)
if res == 0:
raise ValueError("Nonexistent thread id.")
if res > 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(__thread.ident, None)
raise SystemError("PyThreadState_SetAsyncExc failed.")
[docs]def secure_uuid() -> UUID:
"""Get a secure version of random UUID."""
return UUID(int=randbits(128))
[docs]def not_implemented(message: str = None, /):
"""Decorate a not implemented method or function so it raises `NotImplementedError` when called.
:param message: optional message for `NotImplementedError`
Usage:
.. code-block:: python
@not_implemented('This method is disabled.')
async def call_something(self):
...
"""
def __params(_):
def _wrap(*_, **__):
raise NotImplementedError(message if message else "Not implemented.")
return _wrap
return __params
class _Timeout:
__slots__ = ("_timeout", "_loop", "_task", "_handler")
def __init__(self, _timeout: float, loop=None):
self._timeout = max(0.0, _timeout)
self._loop = loop
self._handler = None
async def __aenter__(self):
if self._loop is None:
loop = asyncio.get_running_loop()
else:
loop = self._loop
task = asyncio.current_task()
self._handler = loop.call_at(loop.time() + self._timeout, self._cancel_task, task)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is asyncio.CancelledError:
raise TimeoutError
if self._handler:
self._handler.cancel()
@staticmethod
def _cancel_task(task: asyncio.Task):
task.cancel()
[docs]def timeout(t: float, /):
"""Run asynchronous tasks with a timeout.
:param t: timeout in seconds
It creates an async context block, so async calls inside this block must finish before the specified time.
.. code-block:: python
async with timeout(1000):
await do_something_asynchronous()
"""
return _Timeout(t)