Lei Mao bio photo

Lei Mao

Machine Learning, Artificial Intelligence, Computer Science.

Twitter Facebook LinkedIn GitHub   G. Scholar E-Mail RSS

Introduction

Python asyncio is a library for efficient single-thread concurrent applications. Ever since I started to use it, unlike Python multiprocessing and threading, it has been like a mysterious black box to me. Although I could still use asyncio for some simple high-level concurrent applications by taking advantage of the open source asyncio libraries, such as asyncssh and httpx, I have no idea how those asyncio libraries were implemented from scratch. To understand the asyncio mechanism, it might be necessary to look at its low level implementation details.


Event loop is the core of Python asyncio. Every coroutine, Future, or Task would be scheduled as callback and be executed by an event loop. In this blog post, I would like to look into Python event loop at the low-level implementation superficially.

Event Loop

Although asyncio widely uses coroutine, Future, or Task, it is not necessary to use them in order to run tasks on an event loop. Event loop ultimately runs scheduled callbacks. To see this, let’s check the implementation of loop.run_forever from Python 3.8.

    def run_forever(self):
        """Run until stop() is called."""
        self._check_closed()
        self._check_running()
        self._set_coroutine_origin_tracking(self._debug)
        self._thread_id = threading.get_ident()

        old_agen_hooks = sys.get_asyncgen_hooks()
        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                               finalizer=self._asyncgen_finalizer_hook)
        try:
            events._set_running_loop(self)
            while True:
                self._run_once()
                if self._stopping:
                    break
        finally:
            self._stopping = False
            self._thread_id = None
            events._set_running_loop(None)
            self._set_coroutine_origin_tracking(False)
            sys.set_asyncgen_hooks(*old_agen_hooks)

Without going into the details, our gut feeling tells us that the key function call in the run_forever is the self._run_once(). The self._run_once function is described as follows.

    def _run_once(self):
        """Run one full iteration of the event loop.
        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """

This information is somewhat reflected in the loop.run_forever documentation. Event loop must have loops and run iteration by iteration, otherwise its name would not have been event loop. But what exactly is an iteration of the event loop? Before checking the actual implementation, I imagined an iteration of even loop is a fixed finite length of time frame where callbacks could be executed. We could use a for/while loop in the iteration and the end of iteration could be determined by measuring the UNIX time at the end of each for/while execution. But this raises a problem. What if there is a callback that takes very long time to run in the for/while loop and keeps blocking the thread, then the fixed length of the time frame could not be guaranteed. It turns out that the design of an actual event loop iteration in Python is somewhat similar but more delicate.


All the scheduled callbacks for the current event loop iteration are placed in self._ready. By looking at the implementation superficially, it seems that we have a (heap/priority) queue of scheduled callbacks, some of which might have been delayed and canceled. Although the loop.run_forever runs forever, it does have timeout for each event loop iteration. For the “call later” callbacks that are scheduled to run after the current UNIX time, they are not ready so they will not be put into the self._ready.

        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        timeout = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            when = self._scheduled[0]._when
            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

        event_list = self._selector.select(timeout)
        self._process_events(event_list)

        # Handle 'later' callbacks that are ready.
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)

Only the callbacks in the self._ready will be executed in order.

        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        # Note: We run all currently scheduled callbacks, but not any
        # callbacks scheduled by callbacks run this time around --
        # they will be run the next time (after another I/O poll).
        # Use an idiom that is thread-safe without using locks.
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

This means that in an event loop iteration, the number of callbacks being executed is dynamically determined. It does not have fixed time frame, it does not have an fixed number of callbacks to run. Everything is dynamically scheduled and thus is very flexible.


Notice that this self._run_once is only called in the loop.run_forever method, but not others. Let’s further check the more commonly used method loop.run_until_complete which is being called by asyncio.run under the hood.

    def run_until_complete(self, future):
        """Run until the Future is done.
        If the argument is a coroutine, it is wrapped in a Task.
        WARNING: It would be disastrous to call run_until_complete()
        with the same coroutine twice -- it would wrap it in two
        different Tasks and that can't be good.
        Return the Future's result, or raise its exception.
        """
        self._check_closed()
        self._check_running()

        new_task = not futures.isfuture(future)
        future = tasks.ensure_future(future, loop=self)
        if new_task:
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False

        future.add_done_callback(_run_until_complete_cb)
        try:
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                # The coroutine raised a BaseException. Consume the exception
                # to not log a warning, the caller doesn't have access to the
                # local task.
                future.exception()
            raise
        finally:
            future.remove_done_callback(_run_until_complete_cb)
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')

        return future.result()

The most prominent function call is self.run_forever() surprisingly. But where are the Future scheduled as callbacks in the event loop. tasks.ensure_future which takes both Future and loop as inputs scheduled the callbacks. In the tasks.ensure_future, it calls loop.create_task(coro_or_future) to set the callback schedules in the event loop. Also note that there is additional callback _run_until_complete_cb added to the event loop so that the self.run_forever() will not actually run forever.

def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.
    If the argument is a Future, it is returned directly.
    """
    if coroutines.iscoroutine(coro_or_future):
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
    elif futures.isfuture(coro_or_future):
        if loop is not None and loop is not futures._get_loop(coro_or_future):
            raise ValueError('The future belongs to a different loop than '
                             'the one specified as the loop argument')
        return coro_or_future
    elif inspect.isawaitable(coro_or_future):
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    else:
        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
                        'required')

The loop.create_task is a public interface and the documentation could be found from the Python website.

    def create_task(self, coro, *, name=None):
        """Schedule a coroutine object.
        Return a task object.
        """
        self._check_closed()
        if self._task_factory is None:
            task = tasks.Task(coro, loop=self, name=name)
            if task._source_traceback:
                del task._source_traceback[-1]
        else:
            task = self._task_factory(self, coro)
            tasks._set_task_name(task, name)

        return task

Conclusions

Although we did not go through all the code about the event loop, we have become more knowledgeable about how a Python event loop executes call backs.