Control Python AsyncIO Coroutine Interactively

Introduction

Python asyncio concurrency are very good for I/O-bound tasks in Python with less overhead compared to threading methods. In some rare cases, since Python is a scripting language, we would like to run asyncio concurrency interactively in REPL (read-eval-print loop), the Python interactive shell. Because Python asyncio concurrency are single-process and single-thread, while they are being executed, we could not do anything in REPL but wait.

In this blog post, I would like to discuss how to use threading to control asyncio concurrency interactively.

AsyncIO Concurrency

AsyncIO Base

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def some_func(number):
await asyncio.sleep(3)
return number

async def gather_func():
# Schedule three calls *concurrently*:
results = await asyncio.gather(
some_func(1),
some_func(2),
some_func(3),
)
return results

# Use asyncio high-level API
results = asyncio.run(gather_func())

print(results)

If running this script in the REPL, we would have to wait three seconds after executing results = asyncio.run(gather_func()) before we could execute print(results). In some use cases, if it is requesting something from a remote server, it might have a chance to take forever, and we have to press Ctrl + C to kill.

AsyncIO Timeout

If we don’t like waiting too long for the asyncio concurrency, we have options to timeout.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio

async def some_func(number):
await asyncio.sleep(3)
return number

async def gather_func():
# Schedule three calls *concurrently*:
results = await asyncio.gather(
some_func(1),
some_func(2),
some_func(3),
)
return results

async def main():
# Schedule three calls *concurrently*:
try:
results = await asyncio.wait_for(gather_func(), timeout=None) # Try toggle timeout between 1 and None.
except asyncio.TimeoutError:
print("timeout!")
return results

# Use asyncio high-level API
results = asyncio.run(main())

print(results)

If running this script in the REPL, even if we used asyncio.wait_for with timeout, we could not do anything but wait during the asyncio concurrency. In addition, we have to set a fixed timeout value before we start the asyncio concurrency. We could not change our mind during waiting, unless we press Ctrl + C to kill.

AsyncIO + Thread

If we have two Python threads, one thread is responsible for the asyncio event loop which is take care of the concurrency schedules at the low-level, and the other one thread is responsible for the user interactive activities while still having control access to the asyncio concurrency.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import asyncio
from threading import Thread
import time

def endless_event_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

async def some_func(number):
await asyncio.sleep(3)
return number

async def gather_func():
# Schedule three calls *concurrently*:
results = await asyncio.gather(
some_func(1),
some_func(2),
some_func(3),
)
return results

loop = asyncio.new_event_loop()

t = Thread(target=endless_event_loop, args=(loop,))
# Start an endless event loop
t.start()

# Return concurrent.futures.Future instead of asycnio.Future
future = asyncio.run_coroutine_threadsafe(gather_func(), loop)

# Cancel asyncio concurrency anytime!
# future.cancel()

while not future.done():
time.sleep(1)
print("Concurrency is done: {}".format(future.done()))

results = future.result()

print(results)

# Ron Frederick has thought of a smart way to kill the event loop and the thread using callback from another thread.
# https://github.com/ronf/asyncssh/issues/295#issuecomment-659143796
# Without this, the thread running the event loop will run forever, and the program could not exit normally.
loop.call_soon_threadsafe(loop.stop)

Now, we could check and cancel asyncio concurrency anytime during its execution. However, there are a drawback of this method. The concurrent.futures.Future represents the status of coroutine running in the event loop. The coroutine might contain many tasks, but we will not be able to have access to any of them individually in another thread.

To overcome this, instead of merging multiple tasks into one single task, we could call run_coroutine_threadsafe multiple times for individual coroutines and get multiple futures to control them individually.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio
from threading import Thread
from concurrent.futures import CancelledError
import time

def endless_event_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

async def some_func(number):
await asyncio.sleep(3)
return number

loop = asyncio.new_event_loop()

t = Thread(target=endless_event_loop, args=(loop,))
# Start an endless event loop
t.start()

# Return concurrent.futures.Future instead of asycnio.Future
futures = [asyncio.run_coroutine_threadsafe(coroutine, loop) for coroutine in [some_func(1), some_func(2), some_func(3)]]

# Cancel any asyncio concurrency anytime!
futures[0].cancel()

for i, future in enumerate(futures):
try:
print("Task {} result: {}".format(i, future.result()))
except CancelledError:
print("Task {} cancelled".format(i))

# Ron Frederick has thought of a smart way to kill the event loop and the thread using callback from another thread.
# https://github.com/ronf/asyncssh/issues/295#issuecomment-659143796
# Without this, the thread running the event loop will run forever, and the program could not exit normally.
loop.call_soon_threadsafe(loop.stop)

Object Oriented Programming Version

Please see Ron Frederick’s implementation in our discussion on GitHub.

Acknowledgement

I would like to thank Ron Frederick for the active responses to my questions and the constructive feedbacks.

References

Author

Lei Mao

Posted on

07-15-2020

Updated on

07-15-2020

Licensed under


Comments