Lei Mao bio photo

Lei Mao

Machine Learning, Artificial Intelligence, Computer Science.

Twitter Facebook LinkedIn GitHub   G. Scholar E-Mail RSS

Introduction

Python asyncio is a library for efficient single-thread concurrent applications. In my last blog post “Python AsyncIO Event Loop”, we have understood what an event loop is in Python asyncio by looking at the Python source code. This seems to be effective to understand how Python asyncio works.


In this blog post, I would like to take one step further and discuss the mechanisms of the three key asyncio awaitables, including Coroutine, Future, and Task, by looking at the Python source code again.

Coroutine

Starting from Python 3.5, coroutine functions are defined using async def and Coroutine objects are created by calling coroutine functions. The abstracted class of Coroutine is just as follows. It does not have method overloading because the derived class and method overload is generated by Python interpreter for the coroutine functions defined using async def. The key method for Coroutineclass is send. It is trying to mimic the behavior of trampoline.

class Coroutine(Awaitable):

    __slots__ = ()

    @abstractmethod
    def send(self, value):
        """Send a value into the coroutine.
        Return next yielded value or raise StopIteration.
        """
        raise StopIteration

    @abstractmethod
    def throw(self, typ, val=None, tb=None):
        """Raise an exception in the coroutine.
        Return next yielded value or raise StopIteration.
        """
        if val is None:
            if tb is None:
                raise typ
            val = typ()
        if tb is not None:
            val = val.with_traceback(tb)
        raise val

    def close(self):
        """Raise GeneratorExit inside coroutine.
        """
        try:
            self.throw(GeneratorExit)
        except (GeneratorExit, StopIteration):
            pass
        else:
            raise RuntimeError("coroutine ignored GeneratorExit")

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Coroutine:
            return _check_methods(C, '__await__', 'send', 'throw', 'close')
        return NotImplemented

“Fortunately”, Python asyncio coroutine was once implemented using a @asyncio.coroutine decorator on a Python generator in Python 3.4. Hopefully the logic of the coroutine in Python 3.5+ is similar to the coroutine in Python 3.4 that it yields sub coroutine upon calling.


A typical coroutine could be implemented using a Python generator just like the follows.

import asyncio
import datetime

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)

loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

The @asyncio.coroutine decorator implementation is as follows.

def coroutine(func):
    """Decorator to mark coroutines.
    If the coroutine is not yielded from before it is destroyed,
    an error message is logged.
    """
    warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead',
                  DeprecationWarning,
                  stacklevel=2)
    if inspect.iscoroutinefunction(func):
        # In Python 3.5 that's all we need to do for coroutines
        # defined with "async def".
        return func

    if inspect.isgeneratorfunction(func):
        coro = func
    else:
        @functools.wraps(func)
        def coro(*args, **kw):
            res = func(*args, **kw)
            if (base_futures.isfuture(res) or inspect.isgenerator(res) or
                    isinstance(res, CoroWrapper)):
                res = yield from res
            else:
                # If 'res' is an awaitable, run it.
                try:
                    await_meth = res.__await__
                except AttributeError:
                    pass
                else:
                    if isinstance(res, collections.abc.Awaitable):
                        res = yield from await_meth()
            return res

    coro = types.coroutine(coro)
    if not _DEBUG:
        wrapper = coro
    else:
        @functools.wraps(func)
        def wrapper(*args, **kwds):
            w = CoroWrapper(coro(*args, **kwds), func=func)
            if w._source_traceback:
                del w._source_traceback[-1]
            # Python < 3.5 does not implement __qualname__
            # on generator objects, so we set it manually.
            # We use getattr as some callables (such as
            # functools.partial may lack __qualname__).
            w.__name__ = getattr(func, '__name__', None)
            w.__qualname__ = getattr(func, '__qualname__', None)
            return w

    wrapper._is_coroutine = _is_coroutine  # For iscoroutinefunction().
    return wrapper

Without looking into the details, this @asyncio.coroutine decorator almost does not change the generator at all, since most likely wrapper $\approx$ coro.


When we tried to run coroutine with loop.run_until_complete, we see from the comment that if the argument is a coroutine then it would be converted to a Task in the first place, and loop.run_until_complete is actually scheduling Tasks. So we would look into Task shortly.

Future

Future has closed relationship with Task, so let’s look at Future first.


Future use has an event loop. By default, it is the event loop in the main thread.

class Future:
    """This class is *almost* compatible with concurrent.futures.Future.
    Differences:
    - This class is not thread-safe.
    - result() and exception() do not take a timeout argument and
      raise an exception when the future isn't done yet.
    - Callbacks registered with add_done_callback() are always called
      via the event loop's call_soon().
    - This class is not compatible with the wait() and as_completed()
      methods in the concurrent.futures package.
    (In Python 3.4 or later we may be able to unify the implementations.)
    """

    # Class variables serving as defaults for instance variables.
    _state = _PENDING
    _result = None
    _exception = None
    _loop = None
    _source_traceback = None

    # This field is used for a dual purpose:
    # - Its presence is a marker to declare that a class implements
    #   the Future protocol (i.e. is intended to be duck-type compatible).
    #   The value must also be not-None, to enable a subclass to declare
    #   that it is not compatible by setting this to None.
    # - It is set by __iter__() below so that Task._step() can tell
    #   the difference between
    #   `await Future()` or`yield from Future()` (correct) vs.
    #   `yield Future()` (incorrect).
    _asyncio_future_blocking = False

    __log_traceback = False

    def __init__(self, *, loop=None):
        """Initialize the future.
        The optional event_loop argument allows explicitly setting the event
        loop object used by the future. If it's not provided, the future uses
        the default event loop.
        """
        if loop is None:
            self._loop = events.get_event_loop()
        else:
            self._loop = loop
        self._callbacks = []
        if self._loop.get_debug():
            self._source_traceback = format_helpers.extract_stack(
                sys._getframe(1))

    _repr_info = base_futures._future_repr_info

The key method of Future is future.set_result. Let’s check what will happen if we call future.set_result.

    def set_result(self, result):
        """Mark the future done and set its result.
        If the future is already done when this method is called, raises
        InvalidStateError.
        """
        if self._state != _PENDING:
            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
        self._result = result
        self._state = _FINISHED
        self.__schedule_callbacks()
    def __schedule_callbacks(self):
        """Internal: Ask the event loop to call all callbacks.
        The callbacks are scheduled to be called as soon as possible. Also
        clears the callback list.
        """
        callbacks = self._callbacks[:]
        if not callbacks:
            return

        self._callbacks[:] = []
        for callback, ctx in callbacks:
            self._loop.call_soon(callback, self, context=ctx)

Once future.set_result is called, it would trigger self.__schedule_callbacks asking the even loop to call all the callbacks related to the Future as soon as possible. These Future related callbacks are added or removed by future.add_done_callback or future.remove_done_callback. If no Future related callbacks, no more callbacks are scheduled in the event loop.


So we have known what will happen after the Future got result. What happens when the Future is scheduled in the event loop?


From the last blog post “Python AsyncIO Event Loop”, we have seen the Future was scheduled into the event loop via loop.ensure_future. “If the argument is a Future, it is returned directly.” So when the Future is scheduled in the event loop, there is almost no callback scheduled, until the future.set_result is called. (I said almost no callback because there is a default callback _run_until_complete_cb added as we have seen in the last blog post.)

def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.
    If the argument is a Future, it is returned directly.
    """
    if coroutines.iscoroutine(coro_or_future):
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
    elif futures.isfuture(coro_or_future):
        if loop is not None and loop is not futures._get_loop(coro_or_future):
            raise ValueError('The future belongs to a different loop than '
                             'the one specified as the loop argument')
        return coro_or_future
    elif inspect.isawaitable(coro_or_future):
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    else:
        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
                        'required')

Task

Because _PyFuture = Future, Task is just a derived class of Future. The task of a Task is to wrap a coroutine in a Future.

class Task(futures._PyFuture):  # Inherit Python Task implementation
                                # from a Python Future implementation.

    """A coroutine wrapped in a Future."""

    # An important invariant maintained while a Task not done:
    #
    # - Either _fut_waiter is None, and _step() is scheduled;
    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
    #
    # The only transition from the latter to the former is through
    # _wakeup().  When _fut_waiter is not None, one of its callbacks
    # must be _wakeup().

    # If False, don't log a message if the task is destroyed whereas its
    # status is still pending
    _log_destroy_pending = True

    def __init__(self, coro, *, loop=None, name=None):
        super().__init__(loop=loop)
        if self._source_traceback:
            del self._source_traceback[-1]
        if not coroutines.iscoroutine(coro):
            # raise after Future.__init__(), attrs are required for __del__
            # prevent logging for pending task in __del__
            self._log_destroy_pending = False
            raise TypeError(f"a coroutine was expected, got {coro!r}")

        if name is None:
            self._name = f'Task-{_task_name_counter()}'
        else:
            self._name = str(name)

        self._must_cancel = False
        self._fut_waiter = None
        self._coro = coro
        self._context = contextvars.copy_context()

        self._loop.call_soon(self.__step, context=self._context)
        _register_task(self)

In the constructor, we see that the Task schedules a callback self.__step in the event loop. The task.__step is a long method, but we should just pay attention to the try block and the else block since these two are the ones mostly likely to be executed.

    def __step(self, exc=None):
        if self.done():
            raise exceptions.InvalidStateError(
                f'_step(): already done: {self!r}, {exc!r}')
        if self._must_cancel:
            if not isinstance(exc, exceptions.CancelledError):
                exc = self._make_cancelled_error()
            self._must_cancel = False
        coro = self._coro
        self._fut_waiter = None

        _enter_task(self._loop, self)
        # Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                super().cancel(msg=self._cancel_message)
            else:
                super().set_result(exc.value)
        except exceptions.CancelledError as exc:
            # Save the original exception so we can chain it later.
            self._cancelled_exc = exc
            super().cancel()  # I.e., Future.cancel(self).
        except (KeyboardInterrupt, SystemExit) as exc:
            super().set_exception(exc)
            raise
        except BaseException as exc:
            super().set_exception(exc)
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if futures._get_loop(result) is not self._loop:
                    new_exc = RuntimeError(
                        f'Task {self!r} got Future '
                        f'{result!r} attached to a different loop')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)
                elif blocking:
                    if result is self:
                        new_exc = RuntimeError(
                            f'Task cannot await on itself: {self!r}')
                        self._loop.call_soon(
                            self.__step, new_exc, context=self._context)
                    else:
                        result._asyncio_future_blocking = False
                        result.add_done_callback(
                            self.__wakeup, context=self._context)
                        self._fut_waiter = result
                        if self._must_cancel:
                            if self._fut_waiter.cancel(
                                    msg=self._cancel_message):
                                self._must_cancel = False
                else:
                    new_exc = RuntimeError(
                        f'yield was used instead of yield from '
                        f'in task {self!r} with {result!r}')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)

            elif result is None:
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self.__step, context=self._context)
            elif inspect.isgenerator(result):
                # Yielding a generator is just wrong.
                new_exc = RuntimeError(
                    f'yield was used instead of yield from for '
                    f'generator in task {self!r} with {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
            else:
                # Yielding something else is an error.
                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
        finally:
            _leave_task(self._loop, self)
            self = None  # Needed to break cycles when an exception occurs.

Here we see the coroutine.send method again. Each time we call coroutine.send in the try block, we get a result. In the else blcok, we always have another self._loop.call_soon call. We do this in a trampoline fashion until Coroutine runs out of results to send.

Trampoline Function

import asyncio
import time

def trampoline(loop: asyncio.BaseEventLoop, name: str = "") -> None:
    current_time = time.time()
    print(current_time)
    loop.call_later(0.5, trampoline, loop, name)
    return current_time

loop = asyncio.get_event_loop()
loop.call_soon(trampoline, loop, "")
loop.call_later(5, loop.stop)

loop.run_forever()

The flavor of the wrapping of Task to Coroutine is somewhat similar to trampoline. Every time we call coroutine.send, we got some returned values and scheduled another callback.

Conclusion

The implementation of asyncio is complicated and I don’t expect I could know all the details. But trying to understand more about the low-level design might be useful for implementing low-level asyncio libraries and prevent stupid mistakes in high-level asyncio applications.


The key to scheduling the key asyncio awaitables, Coroutine, Future, and Task, are that the awaitables are all wrapped into Future in some way under the hood of asyncio interface.