Python AsyncIO: Asynchronous IO

Introduction

Because breaking tasks into pieces and scheduling to run them asynchronously introduce runtime overheads, if the asynchronous program does not have too much I/O time to be saved from asyncio, the performance of executing those tasks asynchronously will be worse than executing those tasks synchronously. This means asynchronous I/O is the most important part of single-thread asynchronous application.

The question is how does asyncio save I/O time? A quick metaphor will be as follows. I have a candy factory and there are three production lines producing candies one by one. The candies from the production line will drop to the ground at the end of the production line, but dropping the candies to the ground will contaminate the candies. Unfortunately, I only got one worker to collect candies into the box. What’s his best strategy to collect as many candies as possible? Typically, he could have two strategies.

  • Collecting candies at one production line at one time.
  • Placing a box at the end of each production line. Rotating between the production lines from time to time and collecting the candies from the boxes.

Obviously, smart people will choose the second strategy. Basically, this first strategy is the single-thread synchronous application, and the second strategy is the single-thread asynchronous application.

In this blog post, I would like to discuss the asynchronous of IO for Python asynchronous programming using asyncio.

Asynchronous Sleep

A common I/O-bound task includes the sleep function. Using the synchronous and blocking time.sleep, we would have to wait and be blocked for whatever seconds we specified before it is returned.

In asyncio, we also have an asynchronous version of the sleep coroutine function, asyncio.sleep. By scheduling the coroutine to the event loop, during the sleep duration, we are not blocked from doing something else. Here is how asyncio.sleep is implemented.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay <= 0:
await __sleep0()
return result

if loop is None:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)

future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return await future
finally:
h.cancel()

Surprisingly, it is very simple. We create a Future and use a callback to set the value for this Future after some sleep duration delay. Once the result of Future got set, the sleep is over. During the the sleep duration delay, the event loop could schedule some other callbacks to do other tasks. It should be noted that such sleep duration is not precise in practice, due to the nature of event loop scheduling. For instance, if somehow the event loop scheduled a callback that takes extremely long to finish during the sleep duration, the actual sleep time would be much longer than what we were expecting. For example,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import asyncio
import time

async def block_func(block_duration: float = 5) -> None:

# Blocking the event loop for 5 seconds
time.sleep(block_duration)

async def sleep_func(sleep_duration: float = 1) -> None:

start_time = time.time()
await asyncio.sleep(delay=sleep_duration)
end_time = time.time()
elapsed_time = end_time - start_time

print("Expected asynchronous sleep duration: {:.2f}s.".format(sleep_duration))
print("Actual asynchronous sleep duration: {:.2f}s.".format(elapsed_time))

block_duration = 5
sleep_duration = 1

loop = asyncio.get_event_loop()

print("="*50)
print("Scheduled blocking first...")
print("-"*50)
loop.run_until_complete(
asyncio.gather(
block_func(block_duration),
sleep_func(sleep_duration),
)
)
print("="*50)
print("Scheduled sleeping first...")
print("-"*50)
loop.run_until_complete(
asyncio.gather(
sleep_func(sleep_duration),
block_func(block_duration),
)
)
print("="*50)
1
2
3
4
5
6
7
8
9
10
11
12
$ python long_sleep.py 
==================================================
Scheduled blocking first...
--------------------------------------------------
Expected asynchronous sleep duration: 1.00s.
Actual asynchronous sleep duration: 1.00s.
==================================================
Scheduled sleeping first...
--------------------------------------------------
Expected asynchronous sleep duration: 1.00s.
Actual asynchronous sleep duration: 5.00s.
==================================================

In this particular example, if we first schedule asynchronous sleep and then schedule a block execution, the finish of sleep has to wait until the the block execution finishes. This is also because the callback scheduling is sophisticated and not smart. Not sure if it is theoretically possible to have a smart event loop implementation.

Asynchronous Read

Although we have seen how sleep is scheduled asynchronously in the event loop, this does not help us understand how other IO-bounded tasks were scheduled very much. For example, if a server receives messages from multiple clients, how does it schedule the callbacks asynchronously?

This is a simple server that could asynchronously reading messages from and sending messages to multiple clients asynchronously.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio

async def handle_connection(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
addr = writer.get_extra_info("peername")
while message := await reader.read(100):
text = message.decode()
print(f"Received {text!r} from {addr!r}")
print(f"Sending {text!r}")
writer.write(message)
await writer.drain()
if text == "quit\n":
break

print("Closing the connection")
writer.close()

async def main() -> None:
server = await asyncio.start_server(handle_connection, "127.0.0.1", 8888)
addr = server.sockets[0].getsockname() if server.sockets else "unknown"
print(f"Serving on {addr}")
async with server:
await server.serve_forever()

if __name__ == "__main__":

asyncio.run(main())

The documentation of asyncio.start_server says “The client_connected_cb callback is called whenever a new client connection is established. It receives a (reader, writer) pair as two arguments, instances of the StreamReader and StreamWriter classes.” Therefore, the connection to the server is not blocking, allowing multiple clients connecting to the sever. If the connection were synchronous and blocking, only one connection could be established.

The data reading and writing to the streams have to be asynchronous as well. Different clients use different sockets in the same port. Each file from the client sent to the server socket will have an EOF (end-of-file), which is an indication of the end of transfer. If the data reading and writing is synchronous, the network connection between the server and one client is poor, even if the file being transferred is small, it might take a long time to receive the EOF, thus blocking the entire thread.

Using the asynchronous StreamReader.read method, we could switch between the multiple connections to the clients. Let’s check how it is achieved.

When we start an server, a StreamReader will be created and a StreamReaderProtocol will wrap the the StreamReader.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async def start_server(client_connected_cb, host=None, port=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Start a socket server, call back for each client connected.
The first parameter, `client_connected_cb`, takes two parameters:
client_reader, client_writer. client_reader is a StreamReader
object, while client_writer is a StreamWriter object. This
parameter can either be a plain callback function or a coroutine;
if it is a coroutine, it will be automatically converted into a
Task.
The rest of the arguments are all the usual arguments to
loop.create_server() except protocol_factory; most common are
positional host and port, with various optional keyword arguments
following. The return value is the same as loop.create_server().
Additional optional keyword arguments are loop (to set the event loop
instance to use) and limit (to set the buffer limit passed to the
StreamReader).
The return value is the same as loop.create_server(), i.e. a
Server object which can be used to stop the service.
"""
if loop is None:
loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)

def factory():
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, client_connected_cb,
loop=loop)
return protocol

return await loop.create_server(factory, host, port, **kwds)

The StreamReader.read method is implemented as follows. What it does basically is wait until the some data have been received via self._wait_for_data and the return at most n bytes of the data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async def read(self, n=-1):
"""Read up to `n` bytes from the stream.
If n is not provided, or set to -1, read until EOF and return all read
bytes. If the EOF was received and the internal buffer is empty, return
an empty bytes object.
If n is zero, return empty bytes object immediately.
If n is positive, this function try to read `n` bytes, and may return
less or equal bytes than requested, but at least one byte. If EOF was
received before any byte is read, this function returns empty byte
object.
Returned value is not limited with limit, configured at stream
creation.
If stream was paused, this function will automatically resume it if
needed.
"""

if self._exception is not None:
raise self._exception

if n == 0:
return b''

if n < 0:
# This used to just loop creating a new waiter hoping to
# collect everything in self._buffer, but that would
# deadlock if the subprocess sends more than self.limit
# bytes. So just call self.read(self._limit) until EOF.
blocks = []
while True:
block = await self.read(self._limit)
if not block:
break
blocks.append(block)
return b''.join(blocks)

if not self._buffer and not self._eof:
await self._wait_for_data('read')

# This will work right even if buffer is less than n bytes
data = bytes(self._buffer[:n])
del self._buffer[:n]

self._maybe_resume_transport()
return data

The StreamReader._wait_for_data coroutine is basically waiting for a Future self._waiter to be set.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
async def _wait_for_data(self, func_name):
"""Wait until feed_data() or feed_eof() is called.
If stream was paused, automatically resume it.
"""
# StreamReader uses a future to link the protocol feed_data() method
# to a read coroutine. Running two read coroutines at the same time
# would have an unexpected behaviour. It would not possible to know
# which coroutine would get the next data.
if self._waiter is not None:
raise RuntimeError(
f'{func_name}() called while another coroutine is '
f'already waiting for incoming data')

assert not self._eof, '_wait_for_data after EOF'

# Waiting for data while paused will make deadlock, so prevent it.
# This is essential for readexactly(n) for case when n > self._limit.
if self._paused:
self._paused = False
self._transport.resume_reading()

self._waiter = self._loop.create_future()
try:
await self._waiter
finally:
self._waiter = None

So who is going to set the value for self._waiter? It is StreamReader.feed_data which calls StreamReader._wakeup_waiter to set the value for self._waiter.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def feed_data(self, data):
assert not self._eof, 'feed_data after feed_eof'

if not data:
return

self._buffer.extend(data)
self._wakeup_waiter()

if (self._transport is not None and
not self._paused and
len(self._buffer) > 2 * self._limit):
try:
self._transport.pause_reading()
except NotImplementedError:
# The transport can't be paused.
# We'll just have to buffer all data.
# Forget the transport so we don't keep trying.
self._transport = None
else:
self._paused = True
1
2
3
4
5
6
7
def _wakeup_waiter(self):
"""Wakeup read*() functions waiting for data or EOF."""
waiter = self._waiter
if waiter is not None:
self._waiter = None
if not waiter.cancelled():
waiter.set_result(None)

Before the value of self._waiter is set, the data has been extended to self._buffer so that StreamReader.read could safely read the buffer.

So the final question is who will call StreamReader.feed_data? Remember the StreamReader is wrapped in a StreamReaderProtocol.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
"""Helper class to adapt between Protocol and StreamReader.
(This is a helper class instead of making StreamReader itself a
Protocol subclass, because the StreamReader has other potential
uses, and to prevent the user of the StreamReader to accidentally
call inappropriate methods of the protocol.)
"""

_source_traceback = None

def __init__(self, stream_reader, client_connected_cb=None, loop=None):
super().__init__(loop=loop)
if stream_reader is not None:
self._stream_reader_wr = weakref.ref(stream_reader)
self._source_traceback = stream_reader._source_traceback
else:
self._stream_reader_wr = None
if client_connected_cb is not None:
# This is a stream created by the `create_server()` function.
# Keep a strong reference to the reader until a connection
# is established.
self._strong_reader = stream_reader
self._reject_connection = False
self._stream_writer = None
self._transport = None
self._client_connected_cb = client_connected_cb
self._over_ssl = False
self._closed = self._loop.create_future()

When StreamReaderProtocol.data_received is called, it actually calls StreamReader.feed_data.

1
2
3
4
def data_received(self, data):
reader = self._stream_reader
if reader is not None:
reader.feed_data(data)

But who is calling StreamReaderProtocol.data_received? According to the base class protocols.Protocol for StreamReaderProtocol.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Protocol(BaseProtocol):
"""Interface for stream protocol.
The user should implement this interface. They can inherit from
this class but don't need to. The implementations here do
nothing (they don't raise exceptions).
When the user wants to requests a transport, they pass a protocol
factory to a utility function (e.g., EventLoop.create_connection()).
When the connection is made successfully, connection_made() is
called with a suitable transport object. Then data_received()
will be called 0 or more times with data (bytes) received from the
transport; finally, connection_lost() will be called exactly once
with either an exception object or None as an argument.
State machine of calls:
start -> CM [-> DR*] [-> ER?] -> CL -> end
* CM: connection_made()
* DR: data_received()
* ER: eof_received()
* CL: connection_lost()
"""

Essentially the state machine is

1
2
3
4
start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end

The data got received for one or more than one times, it could be less, equal, or larger than the number of bytes the user wanted. Once the user want to wait for the data via StreamReader.read, most likely the most recent data collected from StreamReaderProtocol.data_received will be the data returned. The StreamReaderProtocol.data_received is called at certain rate and the buffer got extended accordingly, it is how asynchronous read becomes possible.

Conclusions

The fundamentals of asynchronous IO in asynchronous programming are sometimes trivial if we understand the system we are working on very well. However, it could still be complicated to implement at the low-level.

References

Author

Lei Mao

Posted on

08-30-2020

Updated on

08-30-2020

Licensed under


Comments