Asyncio

This module contains various useful functions and classes for handling common and awkward tasks in Asyncio.

async aiuti.asyncio.gather_excs(aws: ~typing.Iterable[~typing.Awaitable[~typing.Any]], only: ~typing.Type[~aiuti.asyncio.E] = <class 'BaseException'>) AsyncGenerator[E, None]

Gather the given awaitables and yield any exceptions they raise.

This is useful when spawning a lot of tasks and you only need to know if any of them error.

>>> async def x():
...     await aio.sleep(0.01)
>>> async def e():
...     await aio.sleep(0.01)
...     raise RuntimeError("This failed")
>>> async def show_errors(excs: AYields[BaseException]):
...     async for exc in excs:
...         print(type(exc).__name__ + ':', *exc.args)
>>> run = aio.get_event_loop().run_until_complete
>>> run(show_errors(gather_excs([x(), e()])))
RuntimeError: This failed

A filter can be applied to the specific kind of exception to look for. Here, no ValueErrors were raised so there is no output:

>>> run(show_errors(gather_excs([x(), e()], only=ValueError)))

Switch it to RuntimeError or a parent class and it will show again:

>>> run(show_errors(gather_excs([x(), e()], only=RuntimeError)))
RuntimeError: This failed
Parameters:
  • aws – Awaitables to gather

  • only – Optional specific type of exceptions to filter on and yield

async aiuti.asyncio.raise_first_exc(aws: ~typing.Iterable[~typing.Awaitable[~typing.Any]], only: ~typing.Type[BaseException] = <class 'BaseException'>) None

Gather the given awaitables using gather_excs() and raise the first exception encountered.

This is useful when you don’t need the results of any tasks but need to know if there was at least one exception.

>>> async def x():
...     await aio.sleep(0.01)
>>> async def e():
...     await aio.sleep(0.01)
...     raise RuntimeError("This failed")
>>> run = aio.get_event_loop().run_until_complete
>>> run(raise_first_exc([x(), e()]))
Traceback (most recent call last):
...
RuntimeError: This failed

A filter can be applied to the specific kind of exception to look for. Here, no ValueErrors were raised so there is no output:

>>> run(raise_first_exc([x(), e()], only=ValueError))

Switch it to RuntimeError or a parent class and it will raise again:

>>> run(raise_first_exc([x(), e()], only=RuntimeError))
Traceback (most recent call last):
...
RuntimeError: This failed
Parameters:
  • aws – Awaitables to gather

  • only – Optional specific type of exceptions to filter on and raise

async aiuti.asyncio.to_async_iter(iterable: Iterable[T]) AsyncGenerator[T, None]

Convert the given iterable from synchronous to asynchrounous by iterating it in a background thread.

This can be useful for interacting with libraries for APIs that do not have an asynchronous version yet.

To demonstrate this, let’s create a function using Requests which makes a synchronous calls to the free {JSON} Placeholder API and then yields the results:

>>> import requests
>>> def user_names() -> Yields[str]:
...     for uid in range(3):
...         url = f'https://jsonplaceholder.typicode.com/users/{uid + 1}'
...         yield requests.get(url).json()['name']

Now, we can convert this to an async iterator which won’t block the event loop:

>>> async def print_user_names():
...     async for name in to_async_iter(user_names()):
...         print(name)
>>> aio.get_event_loop().run_until_complete(print_user_names())
Leanne Graham
Ervin Howell
Clementine Bauch
aiuti.asyncio.to_sync_iter(iterable: AsyncIterable[T], *, loop: Optional[AbstractEventLoop] = None) Generator[T, None, None]

Convert the given iterable from asynchronous to synchrounous by by using a background thread running a new event loop to iterate it.

This can be useful for providing a synchronous interface to a new async library for backwards compatibility.

To demonstrate this, let’s create a function using HTTPX which makes asynchronous calls to the free {JSON} Placeholder API and then yields the results:

>>> import httpx
>>> async def user_name(uid: int) -> str:
...     url = f'https://jsonplaceholder.typicode.com/users/{uid + 1}'
...     async with httpx.AsyncClient() as client:
...         resp = await client.get(url)
...         return resp.json()['name']
>>> async def user_names() -> AYields[str]:
...     for name in await aio.gather(*map(user_name, range(3))):
...         yield name

Now, we can convert this to a synchronous iterator and show the results:

>>> for name in to_sync_iter(user_names()):
...     print(name)
Leanne Graham
Ervin Howell
Clementine Bauch
Parameters:
  • iterable – Asynchonrous iterable to process

  • loop – Optional specific loop to use to process the iterable

aiuti.asyncio.threadsafe_async_cache(func: None = None, *, cache: Optional[MutableMapping[Tuple[Any, ...], Any]] = None) Callable[[_AsyncFunc], _AsyncFunc]
aiuti.asyncio.threadsafe_async_cache(func: _AsyncFunc, *, cache: Optional[MutableMapping[Tuple[Any, ...], Any]] = None) _AsyncFunc

Simple thread-safe asynchronous cache decorator which ensures that the decorated/wrapped function is only ever called once for a given set of inputs even across different event loops in multiple threads.

To demonstrate this, we’ll define an async function which sleeps to simulate work and then returns the square of the input:

>>> import asyncio as aio
>>> @threadsafe_async_cache
... async def square(x: int):
...     await aio.sleep(0.1)
...     print("Squaring:", x)
...     return x * x

Then we’ll define a function that will use a new event loop to call the function 10 times in parallel:

>>> def call_func(x: int):
...     loop = aio.new_event_loop()
...     aio.set_event_loop(loop)
...     return loop.run_until_complete(aio.gather(*(
...         square(x) for _ in range(10)
...     )))

And finally we’ll call that function 10 times in parallel across 10 threads:

>>> with ThreadPoolExecutor(max_workers=10) as pool:
...     results = list(pool.map(call_func, [2] * 10))
Squaring: 2

As can be seen above, the function was only called once despite being invoked 100 times across 10 threads and 10 event loops. This can be confirmed by counting the total number of results:

>>> len([x for row in results for x in row])
100

Additionally, a custom mutable mapping can be provided for the cache to further customizize the implementation. For example, we can use the lru-dict library to limit the size of the cache:

>>> from lru import LRU
>>> cache = LRU(size=3)
>>> @threadsafe_async_cache(cache=cache)
... async def double(x: int) -> int:
...     await aio.sleep(0.1)
...     print("Doubling:", x)
...     return x * 2
>>> run = aio.get_event_loop().run_until_complete
>>> assert run(double(1)) == 2
Doubling: 1
>>> assert run(double(1)) == 2  # Cached
>>> assert run(double(2)) == 4
Doubling: 2
>>> assert run(double(2)) == 4  # Cached
>>> assert run(double(3)) == 6
Doubling: 3
>>> assert run(double(3)) == 6  # Cached
>>> assert run(double(4)) == 8  # Pushes 1 out of the cache
Doubling: 4
>>> assert run(double(4)) == 8  # Cached
>>> assert run(double(1)) == 2  # No longer cached!
Doubling: 1

Warning

The default cache is a simple dictionary and as such has no max size and can very easily be the source of memory leaks. I typically use this to wrap object methods on a per-instance basis during intilaztion so that the cache only lives as long as the object.

aiuti.asyncio.buffer_until_timeout(*, timeout: float = 1) Callable[[_BufferFunc[T]], BufferAsyncCalls[T]]
aiuti.asyncio.buffer_until_timeout(func: Callable[[Set[T]], Awaitable[None]], *, timeout: float = 1) BufferAsyncCalls[T]

Async function decorator/wrapper which buffers the arg passed in each call. After a given timeout has passed since the last call, the function is invoked on the event loop that the function was decorated nearest.

>>> @buffer_until_timeout(timeout=0.1)
... async def buffer(args: Set[int]):
...     print("Buffered:", args)
>>> for i in range(5):
...     buffer(i)
>>> aio.get_event_loop().run_until_complete(aio.sleep(0.5))
Buffered: {0, 1, 2, 3, 4}

Another function can also wait for all elements to be processed before continueing:

>>> async def after_buffered():
...     await buffer.wait()
...     print("All buffered!")
>>> for i in range(5):
...     buffer(i)
>>> aio.get_event_loop().run_until_complete(after_buffered())
Buffered: {0, 1, 2, 3, 4}
All buffered!
Parameters:
  • func – Function to wrap

  • timeout – Number of seconds to wait after the most recent call before executing the function.

Returns:

Non-async function which will buffer the given argument

class aiuti.asyncio.BufferAsyncCalls(func: Callable[[Set[T]], Awaitable[None]], *, timeout: float = 1)

Wrapper around an async callable to buffer inputs before calling it after a given timeout. This class should not be initialized directly, users should use buffer_until_timeout() instead.

Warning

While this object is thread-safe and arguments can be added from anywhere, the loop this object was created in must be running for it to work.

func

Wrapped function which should take a set and will be called once inputs are buffered and timeout is reached

timeout

Timeout in seconds to wait after the last element from the queue before calling the function

loop

Event loop that will be used to get args from the queue and execute the function

q: aio.Queue[AsyncIterable[T]]

Asyncio queue which new arguments will be placed in Arguments are passed as async iterables (the most complex way of providing arguments) to ensure all elements are processed on time while avoiding deadlocks.

event

Asyncio event that is set when all current arguments have been processed and cleared whenever a new one is added.

await_(_arg: Awaitable[T]) None

Schedule the given awaitable to be be put onto the queue after it has been awaited.

>>> @buffer_until_timeout(timeout=0.1)
... async def buffer(args: Set[int]):
...     print("Buffered:", args)
>>> async def delay(x: T) -> T:
...     await aio.sleep(0)
...     return x
>>> for i in range(5):
...     buffer.await_(delay(i))
>>> aio.get_event_loop().run_until_complete(buffer.wait())
Buffered: {0, 1, 2, 3, 4}
map(_args: Iterable[T]) None

Place an iterable of args onto the queue to be processed.

>>> @buffer_until_timeout(timeout=0.1)
... async def buffer(args: Set[int]):
...     print("Buffered:", args)
>>> buffer.map(range(5))
>>> aio.get_event_loop().run_until_complete(buffer.wait())
Buffered: {0, 1, 2, 3, 4}
amap(_args: AsyncIterable[T]) None

Schedule an async iterable of args to be put onto the queue.

>>> @buffer_until_timeout(timeout=0.1)
... async def buffer(args: Set[int]):
...     print("Buffered:", args)
>>> buffer.amap(to_async_iter(range(5)))
>>> aio.get_event_loop().run_until_complete(buffer.wait())
Buffered: {0, 1, 2, 3, 4}
async wait(*, cancel: bool = True) None

Wait for the event to be set indicating that all arguments currently in the queue have been processed.

Parameters:

cancel – If True (the default) and there is a task currently waiting for a new item from the queue, cancel it so that the queued arguments are processed immediately instead of after the timeout. Settings this to False will force the full timeout to be met before the function is called if it hasn’t been already.

async wait_from_anywhere(*, cancel: bool = True) None

Wrapper around wait() which uses ensure_aw() to handle waiting from a possibly different event loop.

aiuti.asyncio.async_background_batcher(func: None = None, *, max_batch_size: int = 256, max_concurrent_batches: int = 5, batch_timeout: float = 0.05) Callable[[Callable[[Iterable[Tuple[str, A_contra]]], AsyncIterable[Tuple[str, Union[R_co, Exception]]]]], _AsyncBackgroundBatcherProto[A_contra, R_co]]
aiuti.asyncio.async_background_batcher(func: Callable[[Iterable[Tuple[str, A_contra]]], AsyncIterable[Tuple[str, Union[R_co, Exception]]]], *, max_batch_size: int = 256, max_concurrent_batches: int = 5, batch_timeout: float = 0.05) _AsyncBackgroundBatcherProto[A_contra, R_co]

Decorator version of AsyncBackgroundBatcher which also handles automatically creating a new batcher for individual event loops allowing declaration outside a running event loop.

Here is the example from AsyncBackgroundBatcher rewritten using the decorator syntax:

>>> @async_background_batcher(max_batch_size=2)
... async def add_1(
...     batch: Iterable[Tuple[str, int]],
... ) -> AsyncIterable[Tuple[str, int]]:
...     print("Starting batch:", batch)
...     for key, value in batch:
...         await aio.sleep(0.05)
...         yield key, value + 1
...     print("Finished batch:", batch)

Now we can directly call the add_1 function before the event loop has started, which can be more convenient:

>>> loop = aio.new_event_loop()
>>> aio.set_event_loop(loop)
>>> loop.run_until_complete(
...     aio.gather(
...         add_1(1),
...         add_1(2),
...         add_1(3),
...         add_1(4, key='fourth'),
...     )
... )
Starting batch: [('1', 1), ('2', 2)]
Starting batch: [('3', 3), ('fourth', 4)]
Finished batch: [('1', 1), ('2', 2)]
Finished batch: [('3', 3), ('fourth', 4)]
[2, 3, 4, 5]

Note

This method has a slight amount of extra overhead due to the per-argument loop batcher lookups. Initializing an AsyncBackgroundBatcher directly inside a running event loop can avoid this, but most applications will not notice.

class aiuti.asyncio.AsyncBackgroundBatcher(func: Callable[[Iterable[Tuple[str, A_contra]]], AsyncIterable[Tuple[str, Union[R_co, Exception]]]], *, max_batch_size: int = 256, max_concurrent_batches: int = 5, batch_timeout: float = 0.05)

Allow single async executions of a function to be batched in the background and submitted together with individual results being properly returned to the original callers/waiters.

A good usecase for this is single-row database lookups/mutations that can be submitted in bulk to reduce the number of round-trips to the server but are often easier to write as seperate, per-row tasks. Using this class for background batching allows the best of both worlds.

Here is a simple example which adds one to the given input.

First, we’ll define the batch function which is asynchronous and takes each batch as an iterable of tuples containing each task key and argument. The batch function should then yield the results as tuples containing the task key and the result. Because the key is returned, results can be yielded in any order. To indicate an error occurred, simply yield an Exception object instead of a normal result.

>>> async def add_1(
...     batch: Iterable[Tuple[str, int]],
... ) -> AsyncIterable[Tuple[str, int]]:
...     print("Starting batch:", batch)
...     for key, value in batch:
...         await aio.sleep(0.05)
...         yield key, value + 1
...     print("Finished batch:", batch)

To utilize the batched execution, first create an instance of AsyncBackgroundBatcher with the batch function and any additional configuration kwargs. Then, the returned object can be called for a single execution. To pass a custom key, utilize the key kwarg:

>>> async def main():
...     batched = AsyncBackgroundBatcher(add_1, max_batch_size=2)
...     return await aio.gather(
...         batched(1),
...         batched(2),
...         batched(3),
...         batched(4, key='fourth'),
...     )
>>> aio.run(main())
Starting batch: [('1', 1), ('2', 2)]
Starting batch: [('3', 3), ('fourth', 4)]
Finished batch: [('1', 1), ('2', 2)]
Finished batch: [('3', 3), ('fourth', 4)]
[2, 3, 4, 5]

As can be seen above, the batches were started in order and then ran concurrently with the individual results returned to the original caller.

Warning

Awaits after the final yield may not execute as the processing of each batch is spun off as a background task and will not prevent the event loop from closing.

Parameters:
  • func – Batch execution function

  • max_batch_size – Maximum size for each batch passed to func

  • max_concurrent_batches – Maximum number of concurrent executions of func

  • batch_timeout – Number of seconds to wait for more elements before an incomplete batch is processed

func: Callable[[Iterable[Tuple[str, A_contra]]], AsyncIterable[Tuple[str, Union[R_co, Exception]]]]

Async batch execution function which takes the task argument tuples and yields task result tuples

max_batch_size: int

Maximum number of elements in each batch. Can be safely mutated after initialization.

batch_timeout: float

Number of seconds to wait for more elements before an incomplete batch is processed

async aiuti.asyncio.ensure_aw(aw: Awaitable[T], loop: AbstractEventLoop) T

Return an awaitable for the running loop which will ensure the given awaitable is evaluated in the given loop. This is useful for waiting on events or tasks defined in another thread’s event loop.

Note

The situation in which this function is necessary should be avoided when possible, but sometimes it is not always possible.

>>> e = aio.Event()
>>> e.set()
>>> e_loop = aio.get_event_loop()
>>> new_loop = aio.new_event_loop()
>>> e_loop is not new_loop
True
>>> def task(): return e_loop.create_task(e.wait())
>>> run = new_loop.run_until_complete
>>> run(task())  # ValueError: Belongs to a different loop
Traceback (most recent call last):
...
ValueError: ...

If the target loop isn’t running, it will be run directly using another thread:

>>> run(ensure_aw(task(), e_loop))
True

If the target loop is running, it will be used as is using run_aw_threadsafe() internally:

>>> stop = loop_in_thread(e_loop)
>>> run(ensure_aw(task(), e_loop))
True
>>> stop()  # Cleanup the loop
aiuti.asyncio.loop_in_thread(loop: AbstractEventLoop) Callable[[], None]

Start the given loop in a thread, let it run indefinitely, and return a function which can be called to signal the loop to stop.

>>> from time import sleep
>>> loop = aio.get_event_loop()
>>> stop = loop_in_thread(loop)
>>> loop.is_running()  # Running in background thread
True
>>> loop.call_soon_threadsafe(print, "called")  
<Handle print('called')>
>>> sleep(0.3)  # Give the loop a chance to process
called
>>> stop()  # Signal loop to stop
>>> loop.is_running()  # No longer running
False
async aiuti.asyncio.run_aw_threadsafe(aw: Awaitable[T], loop: AbstractEventLoop) T

Thin wrapper around asyncio.run_coroutine_threadsafe() to handle any awaitable.

Warning

This does not extra handling of any event loop conflicts. Use ensure_aw() for that.

class aiuti.asyncio.DaemonTask(coro, *, loop=None, name=None)

Custom asyncio.Task which is meant to run forever and therefore doesn’t warn when it is still pending at loop shutdown.

add_done_callback(fn, /, *, context=None)

Add a callback to be run when the future becomes done.

The callback is called with a single argument - the future object. If the future is already done when this is called, the callback is scheduled with call_soon.

all_tasks()

Return a set of all tasks for an event loop.

By default all tasks for the current event loop are returned.

cancel()

Request that this task cancel itself.

This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop. The coroutine then has a chance to clean up or even deny the request using try/except/finally.

Unlike Future.cancel, this does not guarantee that the task will be cancelled: the exception might be caught and acted upon, delaying cancellation of the task or preventing cancellation completely. The task may also return a value or raise a different exception.

Immediately after this method is called, Task.cancelled() will not return True (unless the task was already cancelled). A task will be marked as cancelled when the wrapped coroutine terminates with a CancelledError exception (even if cancel() was not called).

cancelled()

Return True if the future was cancelled.

current_task()

Return the currently running task in an event loop or None.

By default the current task for the current event loop is returned.

None is returned when called not in the context of a Task.

done()

Return True if the future is done.

Done means either that a result / exception are available, or that the future was cancelled.

exception()

Return the exception that was set on this future.

The exception (or None if no exception was set) is returned only if the future is done. If the future has been cancelled, raises CancelledError. If the future isn’t done yet, raises InvalidStateError.

get_loop()

Return the event loop the Future is bound to.

get_stack(*, limit=None)

Return the list of stack frames for this task’s coroutine.

If the coroutine is not done, this returns the stack where it is suspended. If the coroutine has completed successfully or was cancelled, this returns an empty list. If the coroutine was terminated by an exception, this returns the list of traceback frames.

The frames are always ordered from oldest to newest.

The optional limit gives the maximum number of frames to return; by default all available frames are returned. Its meaning differs depending on whether a stack or a traceback is returned: the newest frames of a stack are returned, but the oldest frames of a traceback are returned. (This matches the behavior of the traceback module.)

For reasons beyond our control, only one stack frame is returned for a suspended coroutine.

print_stack(*, limit=None, file=None)

Print the stack or traceback for this task’s coroutine.

This produces output similar to that of the traceback module, for the frames retrieved by get_stack(). The limit argument is passed to get_stack(). The file argument is an I/O stream to which the output is written; by default output is written to sys.stderr.

remove_done_callback(fn, /)

Remove all instances of a callback from the “call when done” list.

Returns the number of callbacks removed.

result()

Return the result this future represents.

If the future has been cancelled, raises CancelledError. If the future’s result isn’t yet available, raises InvalidStateError. If the future is done and has an exception set, this exception is raised.

set_exception(exception, /)

Mark the future done and set an exception.

If the future is already done when this method is called, raises InvalidStateError.

set_result(result, /)

Mark the future done and set its result.

If the future is already done when this method is called, raises InvalidStateError.