API documentation

Decorators

function
batch(target_batch_size, max_waiting_time, max_processing_time=10.0, argument_type='list')

@batch is a decorator to cumulate function calls and process them in batches.

Not thread-safe.

The function to wrap must have arguments of type list or numpy.array which can be aggregated. It must return just a single value of the same type. The type has to be specified with the argument_type parameter of the decorator.

Parameters
  • target_batch_size (int) As soon as the collected function arguments reach target_batch_size, the wrapped function is called and the results are returned. Note that the function may also be called with longer arguments than target_batch_size.
  • max_waiting_time (float) Maximum waiting time in seconds before calling the underlying function although the target_batch_size hasn't been reached.
  • max_processing_time (float, optional) Maximum time in seconds for the processing itself (without waiting) before an asyncio.TimeoutError is raised. Note: It is strongly advised to set a reasonably strict timeout here in order not to create starving tasks which never finish in case something is wrong with the backend call.
  • argument_type (str, optional) The type of function argument used for batching. One of "list" or "numpy". Per default "list" is used, i.e. it is assumed that the input arguments to the wrapped functions are lists which can be concatenated. If set to "numpy" the arguments are assumed to be numpy arrays which can be concatenated by numpy.concatenate() along axis 0.
Raises
  • ValueError If the arguments target_batch_size or max_waiting time are not >= 0 or if the argument_type is invalid.
  • ValueError When calling the function with incorrect or inconsistent arguments.
  • asyncio.TimeoutError Is raised when calling the wrapped function takes longer than max_processing_time
Returns (callable(callable(...: ): callable(...: )))

A coroutine function which executed the wrapped function with batches of input arguments.

The wrapped function is called with concatenated arguments of multiple calls.

Notes

  • The decorator is not thread-safe, but only allows for concurrent access by async functions. The wrapped function may use multithreading, but only one thread at a time may call the function in order to avoid race conditions.
  • The return value of the wrapped function must be a single iterable.
  • All calls to the underlying function need to have the same number of positional arguments and the same keyword arguments. It also isn't possible to mix the two ways to pass an argument. The same argument always has to be passed either as keyword argument or as positional argument.
Example
>>> import asyncio
>>> import dike
...
...
>>> @dike.batch(target_batch_size=3, max_waiting_time=10)
... async def f(arg1, arg2):
...     print(f"arg1: {arg1}")
...     print(f"arg2: {arg2}")
...     return [10, 11, 12]
...
...
>>> async def main():
...     result = await asyncio.gather(
...         f([0], ["a"]),
...         f([1], ["b"]),
...         f([2], ["c"]),
...     )
...
...     print(f"Result: {result}")
...
...
>>> asyncio.run(main())
arg1: [0, 1, 2]
arg2: ['a', 'b', 'c']
Result: [[10], [11], [12]]
function
limit_jobs(limit)

Decorator to limit the number of concurrent calls to a coroutine function. Not thread-safe.

Parameters
  • limit (int) The maximum number of ongoing calls allowed at any time
Returns (callable(...: ))

The given coroutine function with added concurrency protection

Raises
  • TooManyCalls The decorated function raises a dike.ToomanyCalls exception if it is called while already running limit times concurrently.
  • ValueError If the decorator is applied to something else than an async def function

Note

The decorator is not thread-safe, but only allows for concurrent access by async functions. The wrapped function may use multithreading, but only one thread at a time may call the function in order to avoid race conditions.

Examples
>>> import dike
>>> import asyncio
>>> import httpx
>>> import dike
...
...
>>> @dike.limit_jobs(limit=2)
... async def web_request():
...     async with httpx.AsyncClient() as client:
...         response = await client.get("https://httpbin.org/status/200?sleep=100")
...     return response
...
...
>>> async def main():
...     responses = await asyncio.gather(
...         web_request(), web_request(), web_request(), return_exceptions=True
...     )
...     for r in responses:
...         if isinstance(r, dike._limit_jobs.TooManyCalls):
...             print("too many calls")
...         else:
...             print(r)
...
...
>>> asyncio.run(main())
<Response [200 OK]>
<Response [200 OK]>
too many calls
function
retry(attempts=None, exception_types=<class 'Exception'>, delay=None, backoff=1, log_exception_info=True)

Decorator to limit the number of concurrent calls to a coroutine function. Not thread-safe.

Parameters
  • attempts (int, optional) The maximum number of tries before re-raising the last exception. Per default there is no limit.
  • exception_types (Union(type of baseexception, (type of baseexception)), optional) The exception types which allow a retry. In case of other exceptions there is no retry.
  • delay (timedelta, optional) Delay between attempts. Per default there is no delay.
  • backoff (int, optional) Multiplier applied to the delay between attempts. Per default this is 1, so that the delay is constant and does not grow. E.g. use 2 to double the delay with each attempt.
  • log_exception_info (bool, optional) Wether to include the exception stacktrace when logging any failed attempts. Default: True
Returns (callable(callable(...: ): callable(...: )))

The given coroutine function with added exception handling and retry logic.

Raises
  • ValueError For invalid configuration arguments.
Examples
>>> import asyncio
>>> import httpx
>>> import datetime
>>> import dike
...
>>> @dike.retry(attempts=2, delay=datetime.timedelta(milliseconds=10))
... async def web_request():
...     async with httpx.AsyncClient() as client:
...         response = await client.get("https://httpbin.org/status/400")
...         if response.status_code != httpx.codes.OK:
...             raise RuntimeError("Request failed!")
...     return response
...
...
>>> async def main():
...     response = await web_request()
...     print(response)
...
...
>>> asyncio.run(main())
... # Log messages from two attempts:
... # WARNING:dike:Caught exception RuntimeError('Request failed!'). Retrying in 0.01s ...
... # Then:
Traceback (most recent call last):
    ...
RuntimeError: Request failed!

Exceptions

class
TooManyCalls()
Bases
Exception BaseException

Error raised by @limit_jobs when a call exceeds the preset limit.