Python asyncio is a library for efficient single-thread concurrent applications. In my last blog post “Python AsyncIO Event Loop”, we have understood what an event loop is in Python asyncio by looking at the Python source code. This seems to be effective to understand how Python asyncio works.
In this blog post, I would like to take one step further and discuss the mechanisms of the three key asyncio awaitables, including Coroutine, Future, and Task, by looking at the Python source code again.
Coroutine
Starting from Python 3.5, coroutine functions are defined using async def and Coroutine objects are created by calling coroutine functions. The abstracted class of Coroutine is just as follows. It does not have method overloading because the derived class and method overload is generated by Python interpreter for the coroutine functions defined using async def. The key method for Coroutineclass is send. It is trying to mimic the behavior of trampoline.
@abstractmethod defsend(self, value): """Send a value into the coroutine. Return next yielded value or raise StopIteration. """ raise StopIteration
@abstractmethod defthrow(self, typ, val=None, tb=None): """Raise an exception in the coroutine. Return next yielded value or raise StopIteration. """ if val isNone: if tb isNone: raise typ val = typ() if tb isnotNone: val = val.with_traceback(tb) raise val
“Fortunately”, Python asynciocoroutine was once implemented using a @asyncio.coroutine decorator on a Python generator in Python 3.4. Hopefully the logic of the coroutine in Python 3.5+ is similar to the coroutine in Python 3.4 that it yields sub coroutine upon calling.
A typical coroutine could be implemented using a Python generator just like the follows.
loop = asyncio.get_event_loop() # Blocking call which returns when the display_date() coroutine is done loop.run_until_complete(display_date(loop)) loop.close()
defcoroutine(func): """Decorator to mark coroutines. If the coroutine is not yielded from before it is destroyed, an error message is logged. """ warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead', DeprecationWarning, stacklevel=2) if inspect.iscoroutinefunction(func): # In Python 3.5 that's all we need to do for coroutines # defined with "async def". return func
if inspect.isgeneratorfunction(func): coro = func else: @functools.wraps(func) defcoro(*args, **kw): res = func(*args, **kw) if (base_futures.isfuture(res) or inspect.isgenerator(res) or isinstance(res, CoroWrapper)): res = yieldfrom res else: # If 'res' is an awaitable, run it. try: await_meth = res.__await__ except AttributeError: pass else: ifisinstance(res, collections.abc.Awaitable): res = yieldfrom await_meth() return res
coro = types.coroutine(coro) ifnot _DEBUG: wrapper = coro else: @functools.wraps(func) defwrapper(*args, **kwds): w = CoroWrapper(coro(*args, **kwds), func=func) if w._source_traceback: del w._source_traceback[-1] # Python < 3.5 does not implement __qualname__ # on generator objects, so we set it manually. # We use getattr as some callables (such as # functools.partial may lack __qualname__). w.__name__ = getattr(func, '__name__', None) w.__qualname__ = getattr(func, '__qualname__', None) return w
wrapper._is_coroutine = _is_coroutine # For iscoroutinefunction(). return wrapper
Without looking into the details, this @asyncio.coroutine decorator almost does not change the generator at all, since most likely wrapper $\approx$ coro.
When we tried to run coroutine with loop.run_until_complete, we see from the comment that if the argument is a coroutine then it would be converted to a Task in the first place, and loop.run_until_complete is actually scheduling Tasks. So we would look into Task shortly.
Future
Future has closed relationship with Task, so let’s look at Future first.
Future use has an event loop. By default, it is the event loop in the main thread.
classFuture: """This class is *almost* compatible with concurrent.futures.Future. Differences: - This class is not thread-safe. - result() and exception() do not take a timeout argument and raise an exception when the future isn't done yet. - Callbacks registered with add_done_callback() are always called via the event loop's call_soon(). - This class is not compatible with the wait() and as_completed() methods in the concurrent.futures package. (In Python 3.4 or later we may be able to unify the implementations.) """
# Class variables serving as defaults for instance variables. _state = _PENDING _result = None _exception = None _loop = None _source_traceback = None
# This field is used for a dual purpose: # - Its presence is a marker to declare that a class implements # the Future protocol (i.e. is intended to be duck-type compatible). # The value must also be not-None, to enable a subclass to declare # that it is not compatible by setting this to None. # - It is set by __iter__() below so that Task._step() can tell # the difference between # `await Future()` or`yield from Future()` (correct) vs. # `yield Future()` (incorrect). _asyncio_future_blocking = False
__log_traceback = False
def__init__(self, *, loop=None): """Initialize the future. The optional event_loop argument allows explicitly setting the event loop object used by the future. If it's not provided, the future uses the default event loop. """ if loop isNone: self._loop = events.get_event_loop() else: self._loop = loop self._callbacks = [] if self._loop.get_debug(): self._source_traceback = format_helpers.extract_stack( sys._getframe(1))
_repr_info = base_futures._future_repr_info
The key method of Future is future.set_result. Let’s check what will happen if we call future.set_result.
1 2 3 4 5 6 7 8 9 10
defset_result(self, result): """Mark the future done and set its result. If the future is already done when this method is called, raises InvalidStateError. """ if self._state != _PENDING: raise exceptions.InvalidStateError(f'{self._state}: {self!r}') self._result = result self._state = _FINISHED self.__schedule_callbacks()
1 2 3 4 5 6 7 8 9 10 11 12
def__schedule_callbacks(self): """Internal: Ask the event loop to call all callbacks. The callbacks are scheduled to be called as soon as possible. Also clears the callback list. """ callbacks = self._callbacks[:] ifnot callbacks: return
self._callbacks[:] = [] for callback, ctx in callbacks: self._loop.call_soon(callback, self, context=ctx)
Once future.set_result is called, it would trigger self.__schedule_callbacks asking the even loop to call all the callbacks related to the Future as soon as possible. These Future related callbacks are added or removed by future.add_done_callback or future.remove_done_callback. If no Future related callbacks, no more callbacks are scheduled in the event loop.
So we have known what will happen after the Future got result. What happens when the Future is scheduled in the event loop?
From the last blog post “Python AsyncIO Event Loop”, we have seen the Future was scheduled into the event loop via loop.ensure_future. “If the argument is a Future, it is returned directly.” So when the Future is scheduled in the event loop, there is almost no callback scheduled, until the future.set_result is called. (I said almost no callback because there is a default callback_run_until_complete_cb added as we have seen in the last blog post.)
defensure_future(coro_or_future, *, loop=None): """Wrap a coroutine or an awaitable in a future. If the argument is a Future, it is returned directly. """ if coroutines.iscoroutine(coro_or_future): if loop isNone: loop = events.get_event_loop() task = loop.create_task(coro_or_future) if task._source_traceback: del task._source_traceback[-1] return task elif futures.isfuture(coro_or_future): if loop isnotNoneand loop isnot futures._get_loop(coro_or_future): raise ValueError('The future belongs to a different loop than ' 'the one specified as the loop argument') return coro_or_future elif inspect.isawaitable(coro_or_future): return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) else: raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 'required')
Task
Because _PyFuture = Future, Task is just a derived class of Future. The task of a Task is to wrap a coroutine in a Future.
classTask(futures._PyFuture): # Inherit Python Task implementation # from a Python Future implementation.
"""A coroutine wrapped in a Future."""
# An important invariant maintained while a Task not done: # # - Either _fut_waiter is None, and _step() is scheduled; # - or _fut_waiter is some Future, and _step() is *not* scheduled. # # The only transition from the latter to the former is through # _wakeup(). When _fut_waiter is not None, one of its callbacks # must be _wakeup().
# If False, don't log a message if the task is destroyed whereas its # status is still pending _log_destroy_pending = True
def__init__(self, coro, *, loop=None, name=None): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] ifnot coroutines.iscoroutine(coro): # raise after Future.__init__(), attrs are required for __del__ # prevent logging for pending task in __del__ self._log_destroy_pending = False raise TypeError(f"a coroutine was expected, got {coro!r}")
if name isNone: self._name = f'Task-{_task_name_counter()}' else: self._name = str(name)
In the constructor, we see that the Task schedules a callbackself.__step in the event loop. The task.__step is a long method, but we should just pay attention to the try block and the else block since these two are the ones mostly likely to be executed.
_enter_task(self._loop, self) # Call either coro.throw(exc) or coro.send(None). try: if exc isNone: # We use the `send` method directly, because coroutines # don't have `__iter__` and `__next__` methods. result = coro.send(None) else: result = coro.throw(exc) except StopIteration as exc: if self._must_cancel: # Task is cancelled right before coro stops. self._must_cancel = False super().cancel(msg=self._cancel_message) else: super().set_result(exc.value) except exceptions.CancelledError as exc: # Save the original exception so we can chain it later. self._cancelled_exc = exc super().cancel() # I.e., Future.cancel(self). except (KeyboardInterrupt, SystemExit) as exc: super().set_exception(exc) raise except BaseException as exc: super().set_exception(exc) else: blocking = getattr(result, '_asyncio_future_blocking', None) if blocking isnotNone: # Yielded Future must come from Future.__iter__(). if futures._get_loop(result) isnot self._loop: new_exc = RuntimeError( f'Task {self!r} got Future ' f'{result!r} attached to a different loop') self._loop.call_soon( self.__step, new_exc, context=self._context) elif blocking: if result is self: new_exc = RuntimeError( f'Task cannot await on itself: {self!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) else: result._asyncio_future_blocking = False result.add_done_callback( self.__wakeup, context=self._context) self._fut_waiter = result if self._must_cancel: if self._fut_waiter.cancel( msg=self._cancel_message): self._must_cancel = False else: new_exc = RuntimeError( f'yield was used instead of yield from ' f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context)
elif result isNone: # Bare yield relinquishes control for one event loop iteration. self._loop.call_soon(self.__step, context=self._context) elif inspect.isgenerator(result): # Yielding a generator is just wrong. new_exc = RuntimeError( f'yield was used instead of yield from for ' f'generator in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) else: # Yielding something else is an error. new_exc = RuntimeError(f'Task got bad yield: {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) finally: _leave_task(self._loop, self) self = None# Needed to break cycles when an exception occurs.
Here we see the coroutine.send method again. Each time we call coroutine.send in the try block, we get a result. In the else blcok, we always have another self._loop.call_soon call. We do this in a trampoline fashion until Coroutine runs out of results to send.
The flavor of the wrapping of Task to Coroutine is somewhat similar to trampoline. Every time we call coroutine.send, we got some returned values and scheduled another callback.
Conclusion
The implementation of asyncio is complicated and I don’t expect I could know all the details. But trying to understand more about the low-level design might be useful for implementing low-level asyncio libraries and prevent stupid mistakes in high-level asyncio applications.
The key to scheduling the key asyncio awaitables, Coroutine, Future, and Task, are that the awaitables are all wrapped into Future in some way under the hood of asyncio interface.
Python AsyncIO Awaitables: Coroutine, Future, and Task