asyncio is a library for efficient single-thread concurrent applications. Ever since I started to use it, unlike Python
threading, it has been like a mysterious black box to me. Although I could still use
asyncio for some simple high-level concurrent applications by taking advantage of the open source
asyncio libraries, such as
httpx, I have no idea how those
asyncio libraries were implemented from scratch. To understand the
asyncio mechanism, it might be necessary to look at its low level implementation details.
Event loop is the core of Python
Task would be scheduled as
callback and be executed by an event loop. In this blog post, I would like to look into Python event loop at the low-level implementation superficially.
asyncio widely uses
Task, it is not necessary to use them in order to run tasks on an event loop. Event loop ultimately runs scheduled
callbacks. To see this, let’s check the implementation of
loop.run_forever from Python 3.8.
def run_forever(self): """Run until stop() is called.""" self._check_closed() self._check_running() self._set_coroutine_origin_tracking(self._debug) self._thread_id = threading.get_ident() old_agen_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook) try: events._set_running_loop(self) while True: self._run_once() if self._stopping: break finally: self._stopping = False self._thread_id = None events._set_running_loop(None) self._set_coroutine_origin_tracking(False) sys.set_asyncgen_hooks(*old_agen_hooks)
Without going into the details, our gut feeling tells us that the key function call in the
run_forever is the
self._run_once function is described as follows.
def _run_once(self): """Run one full iteration of the event loop. This calls all currently ready callbacks, polls for I/O, schedules the resulting callbacks, and finally schedules 'call_later' callbacks. """
This information is somewhat reflected in the loop.run_forever documentation. Event loop must have loops and run iteration by iteration, otherwise its name would not have been event loop. But what exactly is an iteration of the event loop? Before checking the actual implementation, I imagined an iteration of even loop is a fixed finite length of time frame where
callbacks could be executed. We could use a
while loop in the iteration and the end of iteration could be determined by measuring the UNIX time at the end of each
while execution. But this raises a problem. What if there is a
callback that takes very long time to run in the
while loop and keeps blocking the thread, then the fixed length of the time frame could not be guaranteed. It turns out that the design of an actual event loop iteration in Python is somewhat similar but more delicate.
All the scheduled
callbacks for the current event loop iteration are placed in
self._ready. By looking at the implementation superficially, it seems that we have a (heap/priority) queue of scheduled
callbacks, some of which might have been delayed and canceled. Although the
loop.run_forever runs forever, it does have timeout for each event loop iteration. For the “call later”
callbacks that are scheduled to run after the current UNIX time, they are not ready so they will not be put into the
sched_count = len(self._scheduled) if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # Remove delayed calls that were cancelled if their number # is too high new_scheduled =  for handle in self._scheduled: if handle._cancelled: handle._scheduled = False else: new_scheduled.append(handle) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Remove delayed calls that were cancelled from head of queue. while self._scheduled and self._scheduled._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False timeout = None if self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. when = self._scheduled._when timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) event_list = self._selector.select(timeout) self._process_events(event_list) # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle)
callbacks in the
self._ready will be executed in order.
# This is the only place where callbacks are actually *called*. # All other places just add them to ready. # Note: We run all currently scheduled callbacks, but not any # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if handle._cancelled: continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) finally: self._current_handle = None else: handle._run() handle = None # Needed to break cycles when an exception occurs.
This means that in an event loop iteration, the number of
callbacks being executed is dynamically determined. It does not have fixed time frame, it does not have an fixed number of
callbacks to run. Everything is dynamically scheduled and thus is very flexible.
Notice that this
self._run_once is only called in the
loop.run_forever method, but not others. Let’s further check the more commonly used method loop.run_until_complete which is being called by
asyncio.run under the hood.
def run_until_complete(self, future): """Run until the Future is done. If the argument is a coroutine, it is wrapped in a Task. WARNING: It would be disastrous to call run_until_complete() with the same coroutine twice -- it would wrap it in two different Tasks and that can't be good. Return the Future's result, or raise its exception. """ self._check_closed() self._check_running() new_task = not futures.isfuture(future) future = tasks.ensure_future(future, loop=self) if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False future.add_done_callback(_run_until_complete_cb) try: self.run_forever() except: if new_task and future.done() and not future.cancelled(): # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn't have access to the # local task. future.exception() raise finally: future.remove_done_callback(_run_until_complete_cb) if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result()
The most prominent function call is
self.run_forever() surprisingly. But where are the
Future scheduled as
callbacks in the event loop.
tasks.ensure_future which takes both
loop as inputs scheduled the
callbacks. In the
tasks.ensure_future, it calls
loop.create_task(coro_or_future) to set the
callback schedules in the event loop. Also note that there is additional
_run_until_complete_cb added to the event loop so that the
self.run_forever() will not actually run forever.
def ensure_future(coro_or_future, *, loop=None): """Wrap a coroutine or an awaitable in a future. If the argument is a Future, it is returned directly. """ if coroutines.iscoroutine(coro_or_future): if loop is None: loop = events.get_event_loop() task = loop.create_task(coro_or_future) if task._source_traceback: del task._source_traceback[-1] return task elif futures.isfuture(coro_or_future): if loop is not None and loop is not futures._get_loop(coro_or_future): raise ValueError('The future belongs to a different loop than ' 'the one specified as the loop argument') return coro_or_future elif inspect.isawaitable(coro_or_future): return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) else: raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 'required')
loop.create_task is a public interface and the documentation could be found from the Python website.
def create_task(self, coro, *, name=None): """Schedule a coroutine object. Return a task object. """ self._check_closed() if self._task_factory is None: task = tasks.Task(coro, loop=self, name=name) if task._source_traceback: del task._source_traceback[-1] else: task = self._task_factory(self, coro) tasks._set_task_name(task, name) return task
Although we did not go through all the code about the event loop, we have become more knowledgeable about how a Python event loop executes call backs.