Because breaking tasks into pieces and scheduling to run them asynchronously introduce runtime overheads, if the asynchronous program does not have too much I/O time to be saved from asyncio, the performance of executing those tasks asynchronously will be worse than executing those tasks synchronously. This means asynchronous I/O is the most important part of single-thread asynchronous application.
The question is how does asyncio save I/O time? A quick metaphor will be as follows. I have a candy factory and there are three production lines producing candies one by one. The candies from the production line will drop to the ground at the end of the production line, but dropping the candies to the ground will contaminate the candies. Unfortunately, I only got one worker to collect candies into the box. What’s his best strategy to collect as many candies as possible? Typically, he could have two strategies.
Collecting candies at one production line at one time.
Placing a box at the end of each production line. Rotating between the production lines from time to time and collecting the candies from the boxes.
Obviously, smart people will choose the second strategy. Basically, this first strategy is the single-thread synchronous application, and the second strategy is the single-thread asynchronous application.
In this blog post, I would like to discuss the asynchronous of IO for Python asynchronous programming using asyncio.
Asynchronous Sleep
A common I/O-bound task includes the sleep function. Using the synchronous and blocking time.sleep, we would have to wait and be blocked for whatever seconds we specified before it is returned.
In asyncio, we also have an asynchronous version of the sleep coroutine function, asyncio.sleep. By scheduling the coroutine to the event loop, during the sleep duration, we are not blocked from doing something else. Here is how asyncio.sleep is implemented.
asyncdefsleep(delay, result=None, *, loop=None): """Coroutine that completes after a given time (in seconds).""" if delay <= 0: await __sleep0() return result
if loop isNone: loop = events.get_running_loop() else: warnings.warn("The loop argument is deprecated since Python 3.8, " "and scheduled for removal in Python 3.10.", DeprecationWarning, stacklevel=2)
Surprisingly, it is very simple. We create a Future and use a callback to set the value for this Future after some sleep duration delay. Once the result of Future got set, the sleep is over. During the the sleep duration delay, the event loop could schedule some other callbacks to do other tasks. It should be noted that such sleep duration is not precise in practice, due to the nature of event loop scheduling. For instance, if somehow the event loop scheduled a callback that takes extremely long to finish during the sleep duration, the actual sleep time would be much longer than what we were expecting. For example,
In this particular example, if we first schedule asynchronous sleep and then schedule a block execution, the finish of sleep has to wait until the the block execution finishes. This is also because the callback scheduling is sophisticated and not smart. Not sure if it is theoretically possible to have a smart event loop implementation.
Asynchronous Read
Although we have seen how sleep is scheduled asynchronously in the event loop, this does not help us understand how other IO-bounded tasks were scheduled very much. For example, if a server receives messages from multiple clients, how does it schedule the callbacks asynchronously?
This is a simple server that could asynchronously reading messages from and sending messages to multiple clients asynchronously.
asyncdefhandle_connection(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: addr = writer.get_extra_info("peername") while message := await reader.read(100): text = message.decode() print(f"Received {text!r} from {addr!r}") print(f"Sending {text!r}") writer.write(message) await writer.drain() if text == "quit\n": break print("Closing the connection") writer.close()
asyncdefmain() -> None: server = await asyncio.start_server(handle_connection, "127.0.0.1", 8888) addr = server.sockets[0].getsockname() if server.sockets else"unknown" print(f"Serving on {addr}") asyncwith server: await server.serve_forever()
if __name__ == "__main__": asyncio.run(main())
The documentation of asyncio.start_server says “The client_connected_cb callback is called whenever a new client connection is established. It receives a (reader, writer) pair as two arguments, instances of the StreamReader and StreamWriter classes.” Therefore, the connection to the server is not blocking, allowing multiple clients connecting to the sever. If the connection were synchronous and blocking, only one connection could be established.
The data reading and writing to the streams have to be asynchronous as well. Different clients use different sockets in the same port. Each file from the client sent to the server socket will have an EOF (end-of-file), which is an indication of the end of transfer. If the data reading and writing is synchronous, the network connection between the server and one client is poor, even if the file being transferred is small, it might take a long time to receive the EOF, thus blocking the entire thread.
Using the asynchronous StreamReader.read method, we could switch between the multiple connections to the clients. Let’s check how it is achieved.
When we start an server, a StreamReader will be created and a StreamReaderProtocol will wrap the the StreamReader.
asyncdefstart_server(client_connected_cb, host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """Start a socket server, call back for each client connected. The first parameter, `client_connected_cb`, takes two parameters: client_reader, client_writer. client_reader is a StreamReader object, while client_writer is a StreamWriter object. This parameter can either be a plain callback function or a coroutine; if it is a coroutine, it will be automatically converted into a Task. The rest of the arguments are all the usual arguments to loop.create_server() except protocol_factory; most common are positional host and port, with various optional keyword arguments following. The return value is the same as loop.create_server(). Additional optional keyword arguments are loop (to set the event loop instance to use) and limit (to set the buffer limit passed to the StreamReader). The return value is the same as loop.create_server(), i.e. a Server object which can be used to stop the service. """ if loop isNone: loop = events.get_event_loop() else: warnings.warn("The loop argument is deprecated since Python 3.8, " "and scheduled for removal in Python 3.10.", DeprecationWarning, stacklevel=2)
The StreamReader.read method is implemented as follows. What it does basically is wait until the some data have been received via self._wait_for_data and the return at most n bytes of the data.
asyncdefread(self, n=-1): """Read up to `n` bytes from the stream. If n is not provided, or set to -1, read until EOF and return all read bytes. If the EOF was received and the internal buffer is empty, return an empty bytes object. If n is zero, return empty bytes object immediately. If n is positive, this function try to read `n` bytes, and may return less or equal bytes than requested, but at least one byte. If EOF was received before any byte is read, this function returns empty byte object. Returned value is not limited with limit, configured at stream creation. If stream was paused, this function will automatically resume it if needed. """
if self._exception isnotNone: raise self._exception
if n == 0: returnb''
if n < 0: # This used to just loop creating a new waiter hoping to # collect everything in self._buffer, but that would # deadlock if the subprocess sends more than self.limit # bytes. So just call self.read(self._limit) until EOF. blocks = [] whileTrue: block = await self.read(self._limit) ifnot block: break blocks.append(block) returnb''.join(blocks)
asyncdef_wait_for_data(self, func_name): """Wait until feed_data() or feed_eof() is called. If stream was paused, automatically resume it. """ # StreamReader uses a future to link the protocol feed_data() method # to a read coroutine. Running two read coroutines at the same time # would have an unexpected behaviour. It would not possible to know # which coroutine would get the next data. if self._waiter isnotNone: raise RuntimeError( f'{func_name}() called while another coroutine is ' f'already waiting for incoming data')
assertnot self._eof, '_wait_for_data after EOF'
# Waiting for data while paused will make deadlock, so prevent it. # This is essential for readexactly(n) for case when n > self._limit. if self._paused: self._paused = False self._transport.resume_reading()
deffeed_data(self, data): assertnot self._eof, 'feed_data after feed_eof'
ifnot data: return
self._buffer.extend(data) self._wakeup_waiter()
if (self._transport isnotNoneand not self._paused and len(self._buffer) > 2 * self._limit): try: self._transport.pause_reading() except NotImplementedError: # The transport can't be paused. # We'll just have to buffer all data. # Forget the transport so we don't keep trying. self._transport = None else: self._paused = True
1 2 3 4 5 6 7
def_wakeup_waiter(self): """Wakeup read*() functions waiting for data or EOF.""" waiter = self._waiter if waiter isnotNone: self._waiter = None ifnot waiter.cancelled(): waiter.set_result(None)
Before the value of self._waiter is set, the data has been extended to self._buffer so that StreamReader.read could safely read the buffer.
So the final question is who will call StreamReader.feed_data? Remember the StreamReader is wrapped in a StreamReaderProtocol.
classStreamReaderProtocol(FlowControlMixin, protocols.Protocol): """Helper class to adapt between Protocol and StreamReader. (This is a helper class instead of making StreamReader itself a Protocol subclass, because the StreamReader has other potential uses, and to prevent the user of the StreamReader to accidentally call inappropriate methods of the protocol.) """
_source_traceback = None
def__init__(self, stream_reader, client_connected_cb=None, loop=None): super().__init__(loop=loop) if stream_reader isnotNone: self._stream_reader_wr = weakref.ref(stream_reader) self._source_traceback = stream_reader._source_traceback else: self._stream_reader_wr = None if client_connected_cb isnotNone: # This is a stream created by the `create_server()` function. # Keep a strong reference to the reader until a connection # is established. self._strong_reader = stream_reader self._reject_connection = False self._stream_writer = None self._transport = None self._client_connected_cb = client_connected_cb self._over_ssl = False self._closed = self._loop.create_future()
defdata_received(self, data): reader = self._stream_reader if reader isnotNone: reader.feed_data(data)
But who is calling StreamReaderProtocol.data_received? According to the base class protocols.Protocol for StreamReaderProtocol.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
classProtocol(BaseProtocol): """Interface for stream protocol. The user should implement this interface. They can inherit from this class but don't need to. The implementations here do nothing (they don't raise exceptions). When the user wants to requests a transport, they pass a protocol factory to a utility function (e.g., EventLoop.create_connection()). When the connection is made successfully, connection_made() is called with a suitable transport object. Then data_received() will be called 0 or more times with data (bytes) received from the transport; finally, connection_lost() will be called exactly once with either an exception object or None as an argument. State machine of calls: start -> CM [-> DR*] [-> ER?] -> CL -> end * CM: connection_made() * DR: data_received() * ER: eof_received() * CL: connection_lost() """
The data got received for one or more than one times, it could be less, equal, or larger than the number of bytes the user wanted. Once the user want to wait for the data via StreamReader.read, most likely the most recent data collected from StreamReaderProtocol.data_received will be the data returned. The StreamReaderProtocol.data_received is called at certain rate and the buffer got extended accordingly, it is how asynchronous read becomes possible.
Conclusions
The fundamentals of asynchronous IO in asynchronous programming are sometimes trivial if we understand the system we are working on very well. However, it could still be complicated to implement at the low-level.