Introduction
Because breaking tasks into pieces and scheduling to run them asynchronously introduce runtime overheads, if the asynchronous program does not have too much I/O time to be saved from asyncio
, the performance of executing those tasks asynchronously will be worse than executing those tasks synchronously. This means asynchronous I/O is the most important part of single-thread asynchronous application.
The question is how does asyncio
save I/O time? A quick metaphor will be as follows. I have a candy factory and there are three production lines producing candies one by one. The candies from the production line will drop to the ground at the end of the production line, but dropping the candies to the ground will contaminate the candies. Unfortunately, I only got one worker to collect candies into the box. What’s his best strategy to collect as many candies as possible? Typically, he could have two strategies.
- Collecting candies at one production line at one time.
- Placing a box at the end of each production line. Rotating between the production lines from time to time and collecting the candies from the boxes.
Obviously, smart people will choose the second strategy. Basically, this first strategy is the single-thread synchronous application, and the second strategy is the single-thread asynchronous application.
In this blog post, I would like to discuss the asynchronous of IO for Python asynchronous programming using asyncio
.
Asynchronous Sleep
A common I/O-bound task includes the sleep
function. Using the synchronous and blocking time.sleep
, we would have to wait and be blocked for whatever seconds we specified before it is returned.
In asyncio
, we also have an asynchronous version of the sleep
coroutine function, asyncio.sleep
. By scheduling the coroutine to the event loop, during the sleep duration, we are not blocked from doing something else. Here is how asyncio.sleep
is implemented.
async def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay <= 0:
await __sleep0()
return result
if loop is None:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return await future
finally:
h.cancel()
Surprisingly, it is very simple. We create a Future
and use a callback
to set the value for this Future
after some sleep duration delay
. Once the result of Future
got set, the sleep is over. During the the sleep duration delay
, the event loop could schedule some other callbacks
to do other tasks. It should be noted that such sleep duration is not precise in practice, due to the nature of event loop scheduling. For instance, if somehow the event loop scheduled a callback
that takes extremely long to finish during the sleep duration, the actual sleep time would be much longer than what we were expecting. For example,
import asyncio
import time
async def block_func(block_duration: float = 5) -> None:
# Blocking the event loop for 5 seconds
time.sleep(block_duration)
async def sleep_func(sleep_duration: float = 1) -> None:
start_time = time.time()
await asyncio.sleep(delay=sleep_duration)
end_time = time.time()
elapsed_time = end_time - start_time
print("Expected asynchronous sleep duration: {:.2f}s.".format(sleep_duration))
print("Actual asynchronous sleep duration: {:.2f}s.".format(elapsed_time))
block_duration = 5
sleep_duration = 1
loop = asyncio.get_event_loop()
print("="*50)
print("Scheduled blocking first...")
print("-"*50)
loop.run_until_complete(
asyncio.gather(
block_func(block_duration),
sleep_func(sleep_duration),
)
)
print("="*50)
print("Scheduled sleeping first...")
print("-"*50)
loop.run_until_complete(
asyncio.gather(
sleep_func(sleep_duration),
block_func(block_duration),
)
)
print("="*50)
$ python long_sleep.py
==================================================
Scheduled blocking first...
--------------------------------------------------
Expected asynchronous sleep duration: 1.00s.
Actual asynchronous sleep duration: 1.00s.
==================================================
Scheduled sleeping first...
--------------------------------------------------
Expected asynchronous sleep duration: 1.00s.
Actual asynchronous sleep duration: 5.00s.
==================================================
In this particular example, if we first schedule asynchronous sleep and then schedule a block execution, the finish of sleep has to wait until the the block execution finishes. This is also because the callback scheduling is sophisticated and not smart. Not sure if it is theoretically possible to have a smart event loop implementation.
Asynchronous Read
Although we have seen how sleep
is scheduled asynchronously in the event loop, this does not help us understand how other IO-bounded tasks were scheduled very much. For example, if a server receives messages from multiple clients, how does it schedule the callback
s asynchronously?
This is a simple server that could asynchronously reading messages from and sending messages to multiple clients asynchronously.
import asyncio
async def handle_connection(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
addr = writer.get_extra_info("peername")
while message := await reader.read(100):
text = message.decode()
print(f"Received {text!r} from {addr!r}")
print(f"Sending {text!r}")
writer.write(message)
await writer.drain()
if text == "quit\n":
break
print("Closing the connection")
writer.close()
async def main() -> None:
server = await asyncio.start_server(handle_connection, "127.0.0.1", 8888)
addr = server.sockets[0].getsockname() if server.sockets else "unknown"
print(f"Serving on {addr}")
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
The documentation of asyncio.start_server
says “The client_connected_cb
callback is called whenever a new client connection is established. It receives a (reader, writer) pair as two arguments, instances of the StreamReader
and StreamWriter
classes.” Therefore, the connection to the server is not blocking, allowing multiple clients connecting to the sever. If the connection were synchronous and blocking, only one connection could be established.
The data reading and writing to the streams have to be asynchronous as well. Different clients use different sockets in the same port. Each file from the client sent to the server socket will have an EOF
(end-of-file), which is an indication of the end of transfer. If the data reading and writing is synchronous, the network connection between the server and one client is poor, even if the file being transferred is small, it might take a long time to receive the EOF
, thus blocking the entire thread.
Using the asynchronous StreamReader.read
method, we could switch between the multiple connections to the clients. Let’s check how it is achieved.
When we start an server, a StreamReader
will be created and a StreamReaderProtocol
will wrap the the StreamReader
.
async def start_server(client_connected_cb, host=None, port=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Start a socket server, call back for each client connected.
The first parameter, `client_connected_cb`, takes two parameters:
client_reader, client_writer. client_reader is a StreamReader
object, while client_writer is a StreamWriter object. This
parameter can either be a plain callback function or a coroutine;
if it is a coroutine, it will be automatically converted into a
Task.
The rest of the arguments are all the usual arguments to
loop.create_server() except protocol_factory; most common are
positional host and port, with various optional keyword arguments
following. The return value is the same as loop.create_server().
Additional optional keyword arguments are loop (to set the event loop
instance to use) and limit (to set the buffer limit passed to the
StreamReader).
The return value is the same as loop.create_server(), i.e. a
Server object which can be used to stop the service.
"""
if loop is None:
loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
def factory():
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, client_connected_cb,
loop=loop)
return protocol
return await loop.create_server(factory, host, port, **kwds)
The StreamReader.read
method is implemented as follows. What it does basically is wait until the some data have been received via self._wait_for_data
and the return at most n
bytes of the data.
async def read(self, n=-1):
"""Read up to `n` bytes from the stream.
If n is not provided, or set to -1, read until EOF and return all read
bytes. If the EOF was received and the internal buffer is empty, return
an empty bytes object.
If n is zero, return empty bytes object immediately.
If n is positive, this function try to read `n` bytes, and may return
less or equal bytes than requested, but at least one byte. If EOF was
received before any byte is read, this function returns empty byte
object.
Returned value is not limited with limit, configured at stream
creation.
If stream was paused, this function will automatically resume it if
needed.
"""
if self._exception is not None:
raise self._exception
if n == 0:
return b''
if n < 0:
# This used to just loop creating a new waiter hoping to
# collect everything in self._buffer, but that would
# deadlock if the subprocess sends more than self.limit
# bytes. So just call self.read(self._limit) until EOF.
blocks = []
while True:
block = await self.read(self._limit)
if not block:
break
blocks.append(block)
return b''.join(blocks)
if not self._buffer and not self._eof:
await self._wait_for_data('read')
# This will work right even if buffer is less than n bytes
data = bytes(self._buffer[:n])
del self._buffer[:n]
self._maybe_resume_transport()
return data
The StreamReader._wait_for_data
coroutine is basically waiting for a Future
self._waiter
to be set.
async def _wait_for_data(self, func_name):
"""Wait until feed_data() or feed_eof() is called.
If stream was paused, automatically resume it.
"""
# StreamReader uses a future to link the protocol feed_data() method
# to a read coroutine. Running two read coroutines at the same time
# would have an unexpected behaviour. It would not possible to know
# which coroutine would get the next data.
if self._waiter is not None:
raise RuntimeError(
f'{func_name}() called while another coroutine is '
f'already waiting for incoming data')
assert not self._eof, '_wait_for_data after EOF'
# Waiting for data while paused will make deadlock, so prevent it.
# This is essential for readexactly(n) for case when n > self._limit.
if self._paused:
self._paused = False
self._transport.resume_reading()
self._waiter = self._loop.create_future()
try:
await self._waiter
finally:
self._waiter = None
So who is going to set the value for self._waiter
? It is StreamReader.feed_data
which calls StreamReader._wakeup_waiter
to set the value for self._waiter
.
def feed_data(self, data):
assert not self._eof, 'feed_data after feed_eof'
if not data:
return
self._buffer.extend(data)
self._wakeup_waiter()
if (self._transport is not None and
not self._paused and
len(self._buffer) > 2 * self._limit):
try:
self._transport.pause_reading()
except NotImplementedError:
# The transport can't be paused.
# We'll just have to buffer all data.
# Forget the transport so we don't keep trying.
self._transport = None
else:
self._paused = True
def _wakeup_waiter(self):
"""Wakeup read*() functions waiting for data or EOF."""
waiter = self._waiter
if waiter is not None:
self._waiter = None
if not waiter.cancelled():
waiter.set_result(None)
Before the value of self._waiter
is set, the data has been extended to self._buffer
so that StreamReader.read
could safely read the buffer.
So the final question is who will call StreamReader.feed_data
? Remember the StreamReader
is wrapped in a StreamReaderProtocol
.
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
"""Helper class to adapt between Protocol and StreamReader.
(This is a helper class instead of making StreamReader itself a
Protocol subclass, because the StreamReader has other potential
uses, and to prevent the user of the StreamReader to accidentally
call inappropriate methods of the protocol.)
"""
_source_traceback = None
def __init__(self, stream_reader, client_connected_cb=None, loop=None):
super().__init__(loop=loop)
if stream_reader is not None:
self._stream_reader_wr = weakref.ref(stream_reader)
self._source_traceback = stream_reader._source_traceback
else:
self._stream_reader_wr = None
if client_connected_cb is not None:
# This is a stream created by the `create_server()` function.
# Keep a strong reference to the reader until a connection
# is established.
self._strong_reader = stream_reader
self._reject_connection = False
self._stream_writer = None
self._transport = None
self._client_connected_cb = client_connected_cb
self._over_ssl = False
self._closed = self._loop.create_future()
When StreamReaderProtocol.data_received
is called, it actually calls StreamReader.feed_data
.
def data_received(self, data):
reader = self._stream_reader
if reader is not None:
reader.feed_data(data)
But who is calling StreamReaderProtocol.data_received
? According to the base class protocols.Protocol
for StreamReaderProtocol
.
class Protocol(BaseProtocol):
"""Interface for stream protocol.
The user should implement this interface. They can inherit from
this class but don't need to. The implementations here do
nothing (they don't raise exceptions).
When the user wants to requests a transport, they pass a protocol
factory to a utility function (e.g., EventLoop.create_connection()).
When the connection is made successfully, connection_made() is
called with a suitable transport object. Then data_received()
will be called 0 or more times with data (bytes) received from the
transport; finally, connection_lost() will be called exactly once
with either an exception object or None as an argument.
State machine of calls:
start -> CM [-> DR*] [-> ER?] -> CL -> end
* CM: connection_made()
* DR: data_received()
* ER: eof_received()
* CL: connection_lost()
"""
Essentially the state machine is
start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end
The data got received for one or more than one times, it could be less, equal, or larger than the number of bytes the user wanted. Once the user want to wait for the data via StreamReader.read
, most likely the most recent data collected from StreamReaderProtocol.data_received
will be the data returned. The StreamReaderProtocol.data_received
is called at certain rate and the buffer got extended accordingly, it is how asynchronous read becomes possible.
Conclusions
The fundamentals of asynchronous IO in asynchronous programming are sometimes trivial if we understand the system we are working on very well. However, it could still be complicated to implement at the low-level.