Lei Mao bio photo

Lei Mao

Machine Learning, Artificial Intelligence, Computer Science.

Twitter Facebook LinkedIn GitHub   G. Scholar E-Mail RSS

Introduction

Because breaking tasks into pieces and scheduling to run them asynchronously introduce runtime overheads, if the asynchronous program does not have too much I/O time to be saved from asyncio, the performance of executing those tasks asynchronously will be worse than executing those tasks synchronously. This means asynchronous I/O is the most important part of single-thread asynchronous application.


The question is how does asyncio save I/O time? A quick metaphor will be as follows. I have a candy factory and there are three production lines producing candies one by one. The candies from the production line will drop to the ground at the end of the production line, but dropping the candies to the ground will contaminate the candies. Unfortunately, I only got one worker to collect candies into the box. What’s his best strategy to collect as many candies as possible? Typically, he could have two strategies.

  • Collecting candies at one production line at one time.
  • Placing a box at the end of each production line. Rotating between the production lines from time to time and collecting the candies from the boxes.

Obviously, smart people will choose the second strategy. Basically, this first strategy is the single-thread synchronous application, and the second strategy is the single-thread asynchronous application.


In this blog post, I would like to discuss the asynchronous of IO for Python asynchronous programming using asyncio.

Asynchronous Sleep

A common I/O-bound task includes the sleep function. Using the synchronous and blocking time.sleep, we would have to wait and be blocked for whatever seconds we specified before it is returned.


In asyncio, we also have an asynchronous version of the sleep coroutine function, asyncio.sleep. By scheduling the coroutine to the event loop, during the sleep duration, we are not blocked from doing something else. Here is how asyncio.sleep is implemented.

async def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result

    if loop is None:
        loop = events.get_running_loop()
    else:
        warnings.warn("The loop argument is deprecated since Python 3.8, "
                      "and scheduled for removal in Python 3.10.",
                      DeprecationWarning, stacklevel=2)

    future = loop.create_future()
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)
    try:
        return await future
    finally:
        h.cancel()

Surprisingly, it is very simple. We create a Future and use a callback to set the value for this Future after some sleep duration delay. Once the result of Future got set, the sleep is over. During the the sleep duration delay, the event loop could schedule some other callbacks to do other tasks. It should be noted that such sleep duration is not precise in practice, due to the nature of event loop scheduling. For instance, if somehow the event loop scheduled a callback that takes extremely long to finish during the sleep duration, the actual sleep time would be much longer than what we were expecting. For example,

import asyncio
import time

async def block_func(block_duration: float = 5) -> None:

    # Blocking the event loop for 5 seconds
    time.sleep(block_duration)

async def sleep_func(sleep_duration: float = 1) -> None:

    start_time = time.time()
    await asyncio.sleep(delay=sleep_duration)
    end_time = time.time()
    elapsed_time = end_time - start_time

    print("Expected asynchronous sleep duration: {:.2f}s.".format(sleep_duration))
    print("Actual asynchronous sleep duration: {:.2f}s.".format(elapsed_time))

block_duration = 5
sleep_duration = 1

loop = asyncio.get_event_loop()

print("="*50)
print("Scheduled blocking first...")
print("-"*50)
loop.run_until_complete(
    asyncio.gather(
        block_func(block_duration),
        sleep_func(sleep_duration),
    )
)
print("="*50)
print("Scheduled sleeping first...")
print("-"*50)
loop.run_until_complete(
    asyncio.gather(
        sleep_func(sleep_duration),
        block_func(block_duration),
    )
)
print("="*50)
$ python long_sleep.py 
==================================================
Scheduled blocking first...
--------------------------------------------------
Expected asynchronous sleep duration: 1.00s.
Actual asynchronous sleep duration: 1.00s.
==================================================
Scheduled sleeping first...
--------------------------------------------------
Expected asynchronous sleep duration: 1.00s.
Actual asynchronous sleep duration: 5.00s.
==================================================

In this particular example, if we first schedule asynchronous sleep and then schedule a block execution, the finish of sleep has to wait until the the block execution finishes. This is also because the callback scheduling is sophisticated and not smart. Not sure if it is theoretically possible to have a smart event loop implementation.

Asynchronous Read

Although we have seen how sleep is scheduled asynchronously in the event loop, this does not help us understand how other IO-bounded tasks were scheduled very much. For example, if a server receives messages from multiple clients, how does it schedule the callbacks asynchronously?


This is a simple server that could asynchronously reading messages from and sending messages to multiple clients asynchronously.

import asyncio

async def handle_connection(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
    addr = writer.get_extra_info("peername")
    while message := await reader.read(100):
        text = message.decode()
        print(f"Received {text!r} from {addr!r}")
        print(f"Sending {text!r}")
        writer.write(message)
        await writer.drain()
        if text == "quit\n":
            break
    
    print("Closing the connection")
    writer.close()

async def main() -> None:
    server = await asyncio.start_server(handle_connection, "127.0.0.1", 8888)
    addr = server.sockets[0].getsockname() if server.sockets else "unknown"
    print(f"Serving on {addr}")
    async with server:
        await server.serve_forever()

if __name__ == "__main__":
    
    asyncio.run(main())

The documentation of asyncio.start_server says “The client_connected_cb callback is called whenever a new client connection is established. It receives a (reader, writer) pair as two arguments, instances of the StreamReader and StreamWriter classes.” Therefore, the connection to the server is not blocking, allowing multiple clients connecting to the sever. If the connection were synchronous and blocking, only one connection could be established.


The data reading and writing to the streams have to be asynchronous as well. Different clients use different sockets in the same port. Each file from the client sent to the server socket will have an EOF (end-of-file), which is an indication of the end of transfer. If the data reading and writing is synchronous, the network connection between the server and one client is poor, even if the file being transferred is small, it might take a long time to receive the EOF, thus blocking the entire thread.


Using the asynchronous StreamReader.read method, we could switch between the multiple connections to the clients. Let’s check how it is achieved.


When we start an server, a StreamReader will be created and a StreamReaderProtocol will wrap the the StreamReader.

async def start_server(client_connected_cb, host=None, port=None, *,
                       loop=None, limit=_DEFAULT_LIMIT, **kwds):
    """Start a socket server, call back for each client connected.
    The first parameter, `client_connected_cb`, takes two parameters:
    client_reader, client_writer.  client_reader is a StreamReader
    object, while client_writer is a StreamWriter object.  This
    parameter can either be a plain callback function or a coroutine;
    if it is a coroutine, it will be automatically converted into a
    Task.
    The rest of the arguments are all the usual arguments to
    loop.create_server() except protocol_factory; most common are
    positional host and port, with various optional keyword arguments
    following.  The return value is the same as loop.create_server().
    Additional optional keyword arguments are loop (to set the event loop
    instance to use) and limit (to set the buffer limit passed to the
    StreamReader).
    The return value is the same as loop.create_server(), i.e. a
    Server object which can be used to stop the service.
    """
    if loop is None:
        loop = events.get_event_loop()
    else:
        warnings.warn("The loop argument is deprecated since Python 3.8, "
                      "and scheduled for removal in Python 3.10.",
                      DeprecationWarning, stacklevel=2)

    def factory():
        reader = StreamReader(limit=limit, loop=loop)
        protocol = StreamReaderProtocol(reader, client_connected_cb,
                                        loop=loop)
        return protocol

    return await loop.create_server(factory, host, port, **kwds)

The StreamReader.read method is implemented as follows. What it does basically is wait until the some data have been received via self._wait_for_data and the return at most n bytes of the data.

    async def read(self, n=-1):
        """Read up to `n` bytes from the stream.
        If n is not provided, or set to -1, read until EOF and return all read
        bytes. If the EOF was received and the internal buffer is empty, return
        an empty bytes object.
        If n is zero, return empty bytes object immediately.
        If n is positive, this function try to read `n` bytes, and may return
        less or equal bytes than requested, but at least one byte. If EOF was
        received before any byte is read, this function returns empty byte
        object.
        Returned value is not limited with limit, configured at stream
        creation.
        If stream was paused, this function will automatically resume it if
        needed.
        """

        if self._exception is not None:
            raise self._exception

        if n == 0:
            return b''

        if n < 0:
            # This used to just loop creating a new waiter hoping to
            # collect everything in self._buffer, but that would
            # deadlock if the subprocess sends more than self.limit
            # bytes.  So just call self.read(self._limit) until EOF.
            blocks = []
            while True:
                block = await self.read(self._limit)
                if not block:
                    break
                blocks.append(block)
            return b''.join(blocks)

        if not self._buffer and not self._eof:
            await self._wait_for_data('read')

        # This will work right even if buffer is less than n bytes
        data = bytes(self._buffer[:n])
        del self._buffer[:n]

        self._maybe_resume_transport()
        return data

The StreamReader._wait_for_data coroutine is basically waiting for a Future self._waiter to be set.

    async def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.
        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')

        assert not self._eof, '_wait_for_data after EOF'

        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:
            self._paused = False
            self._transport.resume_reading()

        self._waiter = self._loop.create_future()
        try:
            await self._waiter
        finally:
            self._waiter = None

So who is going to set the value for self._waiter? It is StreamReader.feed_data which calls StreamReader._wakeup_waiter to set the value for self._waiter.

    def feed_data(self, data):
        assert not self._eof, 'feed_data after feed_eof'

        if not data:
            return

        self._buffer.extend(data)
        self._wakeup_waiter()

        if (self._transport is not None and
                not self._paused and
                len(self._buffer) > 2 * self._limit):
            try:
                self._transport.pause_reading()
            except NotImplementedError:
                # The transport can't be paused.
                # We'll just have to buffer all data.
                # Forget the transport so we don't keep trying.
                self._transport = None
            else:
                self._paused = True
    def _wakeup_waiter(self):
        """Wakeup read*() functions waiting for data or EOF."""
        waiter = self._waiter
        if waiter is not None:
            self._waiter = None
            if not waiter.cancelled():
                waiter.set_result(None)

Before the value of self._waiter is set, the data has been extended to self._buffer so that StreamReader.read could safely read the buffer.


So the final question is who will call StreamReader.feed_data? Remember the StreamReader is wrapped in a StreamReaderProtocol.

class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
    """Helper class to adapt between Protocol and StreamReader.
    (This is a helper class instead of making StreamReader itself a
    Protocol subclass, because the StreamReader has other potential
    uses, and to prevent the user of the StreamReader to accidentally
    call inappropriate methods of the protocol.)
    """

    _source_traceback = None

    def __init__(self, stream_reader, client_connected_cb=None, loop=None):
        super().__init__(loop=loop)
        if stream_reader is not None:
            self._stream_reader_wr = weakref.ref(stream_reader)
            self._source_traceback = stream_reader._source_traceback
        else:
            self._stream_reader_wr = None
        if client_connected_cb is not None:
            # This is a stream created by the `create_server()` function.
            # Keep a strong reference to the reader until a connection
            # is established.
            self._strong_reader = stream_reader
        self._reject_connection = False
        self._stream_writer = None
        self._transport = None
        self._client_connected_cb = client_connected_cb
        self._over_ssl = False
        self._closed = self._loop.create_future()

When StreamReaderProtocol.data_received is called, it actually calls StreamReader.feed_data.

    def data_received(self, data):
        reader = self._stream_reader
        if reader is not None:
            reader.feed_data(data)

But who is calling StreamReaderProtocol.data_received? According to the base class protocols.Protocol for StreamReaderProtocol.

class Protocol(BaseProtocol):
    """Interface for stream protocol.
    The user should implement this interface.  They can inherit from
    this class but don't need to.  The implementations here do
    nothing (they don't raise exceptions).
    When the user wants to requests a transport, they pass a protocol
    factory to a utility function (e.g., EventLoop.create_connection()).
    When the connection is made successfully, connection_made() is
    called with a suitable transport object.  Then data_received()
    will be called 0 or more times with data (bytes) received from the
    transport; finally, connection_lost() will be called exactly once
    with either an exception object or None as an argument.
    State machine of calls:
      start -> CM [-> DR*] [-> ER?] -> CL -> end
    * CM: connection_made()
    * DR: data_received()
    * ER: eof_received()
    * CL: connection_lost()
    """

Essentially the state machine is

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

The data got received for one or more than one times, it could be less, equal, or larger than the number of bytes the user wanted. Once the user want to wait for the data via StreamReader.read, most likely the most recent data collected from StreamReaderProtocol.data_received will be the data returned. The StreamReaderProtocol.data_received is called at certain rate and the buffer got extended accordingly, it is how asynchronous read becomes possible.

Conclusions

The fundamentals of asynchronous IO in asynchronous programming are sometimes trivial if we understand the system we are working on very well. However, it could still be complicated to implement at the low-level.

References