Lei Mao bio photo

Lei Mao

Machine Learning, Artificial Intelligence, Computer Science.

Twitter Facebook LinkedIn GitHub   G. Scholar E-Mail RSS

Introduction

In my earlier blog post, we have discussed about how to use the progress bars tool tqdm for showing the progress of multiprocessing tasks. Since Python asyncio is an another way to run things concurrently, in addition to Python multiprocessing, it is also natural to extend the usage of tqdm to showing the progress of single-threaded asyncio asynchronous tasks.


In this blog post, I would like to briefly discuss about how to use tqdm for asyncio concurrent tasks.

Examples

Async Sleep

As the simplest example, We will run multiple sleep asynchronously and show the progress bar using tqdm.

# asyncio_sleep.py

from typing import List
import random
import asyncio
import time
import tqdm
import tqdm.asyncio


async def sleep_func(sleep_duration: float = 1) -> float:

    start_time = time.time()
    await asyncio.sleep(delay=sleep_duration)
    end_time = time.time()
    elapsed_time = end_time - start_time

    return elapsed_time


async def run_multiple_sleeps(sleep_durations: List[float]) -> List[float]:

    tasks = []
    for sleep_duration in sleep_durations:
        task = asyncio.create_task(sleep_func(sleep_duration=sleep_duration))
        tasks.append(task)

    actual_sleep_durations = [
        await f
        for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks))
    ]

    # Alternatively, using tqdm asyncio.as_completed wrapper.
    # total=len(tasks) now becomes optional.
    # actual_sleep_durations = [
    #     await f
    #     for f in tqdm.asyncio.tqdm.as_completed(tasks)
    # ]

    return actual_sleep_durations


if __name__ == "__main__":

    n = 10**5
    sleep_durations = [random.uniform(0, 5.0) for _ in range(n)]

    actual_sleep_durations = asyncio.run(
        run_multiple_sleeps(sleep_durations=sleep_durations))

We could see that all the sleep tasks were finished in 5 seconds.

$ python3 asyncio_sleep.py 
100%|████████████████████████████████| 100000/100000 [00:05<00:00, 17338.87it/s]

Async Download

We could use aiohttp to download asynchronously. Here I have an example for downloading PDBx files from RCSB PDB Bank asynchronously using aiohttp and tqdm.


The tqdm usages were almost the same as the async sleep example above.

Async For

tqdm shows that the progress bar could also be applied for async for.


However, there is a common misunderstanding that async for is running for loop concurrently. Actually async for is just the counter part of the synchronous for in asyncio, with the ability for the loop in question to give up control to the event loop for some other coroutine to run.


This means that async for will not run things concurrently and accelerate your application. For example, none of the following programs are running sleep concurrently.

from typing import List
import random
import asyncio
import time
import tqdm
import tqdm.asyncio


async def sleep_func(sleep_duration: float = 1) -> float:

    start_time = time.time()
    await asyncio.sleep(delay=sleep_duration)
    end_time = time.time()
    elapsed_time = end_time - start_time

    return elapsed_time


async def run_multiple_sleeps(sleep_durations: List[float]) -> List[float]:

    actual_sleep_durations = [
        await sleep_func(sleep_duration=sleep_duration)
        async for sleep_duration in tqdm.asyncio.tqdm(sleep_durations)
    ]

    return actual_sleep_durations


if __name__ == "__main__":

    n = 10**5
    sleep_durations = [random.uniform(0, 5.0) for _ in range(n)]

    actual_sleep_durations = asyncio.run(
        run_multiple_sleeps(sleep_durations=sleep_durations))
from typing import List, Generator
import random
import asyncio
import time
import tqdm
import tqdm.asyncio


async def fetch_sleep_duration(sleep_durations: List[float]) -> float:

    for sleep_duration in sleep_durations:

        await asyncio.sleep(delay=random.uniform(0, 0.0001))

        yield sleep_duration


async def sleep_func(sleep_duration: float = 1) -> float:

    start_time = time.time()
    await asyncio.sleep(delay=sleep_duration)
    end_time = time.time()
    elapsed_time = end_time - start_time

    return elapsed_time


async def run_multiple_sleeps(
        sleep_duration_generator: Generator[float, None, None]) -> List[float]:

    actual_sleep_durations = [
        await sleep_func(sleep_duration=sleep_duration)
        async for sleep_duration in tqdm.asyncio.tqdm(sleep_duration_generator)
    ]

    return actual_sleep_durations


if __name__ == "__main__":

    n = 10**5

    sleep_durations = [random.uniform(0, 5.0) for _ in range(n)]

    sleep_duration_generator = fetch_sleep_duration(
        sleep_durations=sleep_durations)

    actual_sleep_durations = asyncio.run(
        run_multiple_sleeps(sleep_duration_generator=sleep_duration_generator))

References