Lei Mao bio photo

Lei Mao

Machine Learning, Artificial Intelligence, Computer Science.

Twitter Facebook LinkedIn GitHub   G. Scholar E-Mail RSS

Introduction

It is natural that we would like to employ progress bars in our programs to show the progress of tasks. tqdm is one of my favorite progressing bar tools in Python. It could be easily incorporated to Python using trange to replace range or using tqdm.tqdm to wrap iterators, in order to show progress bars for a for loop.


Multiprocessing tasks should also have progress bars to show the progress. However, the incorporation of tqdm and multiprocessing was not well documented in Python. In this blog post, I would like to present several ways of using multiprocessing with tqdm.

Python Multiprocessing

Let’s first take a look at some of the basic class methods in Python multiprocessing library. The commonly used multiprocessing.Pool methods could be broadly categorized as apply and map. apply is applying some arguments for a function. map is a higher level abstraction for apply, applying each element in an iterable for a same function.


More specifically, the commonly used multiprocessing.Pool methods are:

  • apply_async
  • map
  • map_async
  • imap
  • imap_unordered

apply_async and map_async return “future results” immediately, we would need to collect the results using get. map, although it is running functions in parallel, blocks the return of the results until they are ready. imap is a lazier version of map, but it will return an iterator for the processes. imap_unordered is similar to imap, but the execution and returned result order of imap_unordered does not necessarily follow the order of arguments provided.


Based on the nature of these methods, apply_async, imap, imap_unordered are naturally compatible with tqdm to show progress bars.

Python Multiprocessing tqdm Examples

Many Small Processes

Sometimes, the entire task consists of many small processes, each of which does not take too much time to finish. The number of processes is much larger than the number of processes we could assign to the multiprocessing.Pool. We would like to monitor the progress of the entire task using one progressing bar.


The bottom line is not modifying the functions we would like to run using multiprocessing and tqdm. In the following examples, I have implemented the examples of using apply_async, imap, imap_unordered with tqdm for functions that take one single argument or multiple arguments.

# multiprocess_examples_1.py

from tqdm import tqdm
from multiprocessing import Pool
from functools import partial
import time
import random

def func_single_argument(n):
    
    time.sleep(0.5)
    
    return n

def func_multiple_argument(n, m, *args, **kwargs):
    
    time.sleep(0.5)

    return n, m

def run_imap_multiprocessing(func, argument_list, num_processes):

    pool = Pool(processes=num_processes)

    result_list_tqdm = []
    for result in tqdm(pool.imap(func=func, iterable=argument_list), total=len(argument_list)):
        result_list_tqdm.append(result)

    return result_list_tqdm

def run_imap_unordered_multiprocessing(func, argument_list, num_processes):

    pool = Pool(processes=num_processes)

    result_list_tqdm = []
    for result in tqdm(pool.imap_unordered(func=func, iterable=argument_list), total=len(argument_list)):
        result_list_tqdm.append(result)

    return result_list_tqdm

def run_apply_async_multiprocessing(func, argument_list, num_processes):

    pool = Pool(processes=num_processes)

    jobs = [pool.apply_async(func=func, args=(*argument,)) if isinstance(argument, tuple) else pool.apply_async(func=func, args=(argument,)) for argument in argument_list]
    pool.close()
    result_list_tqdm = []
    for job in tqdm(jobs):
        result_list_tqdm.append(job.get())

    return result_list_tqdm

def main():

    num_processes = 10
    num_jobs = 100
    random_seed = 0
    random.seed(random_seed) 

    # imap, imap_unordered
    # It only support functions with one dynamic argument
    func = func_single_argument
    argument_list = [random.randint(0, 100) for _ in range(num_jobs)]
    print("Running imap multiprocessing for single-argument functions ...")
    result_list = run_imap_multiprocessing(func=func, argument_list=argument_list, num_processes=num_processes)
    assert result_list == argument_list
    print("Running imap_unordered multiprocessing for single-argument functions ...")
    result_list = run_imap_unordered_multiprocessing(func=func, argument_list=argument_list, num_processes=num_processes)
    # partial functions (one dynamic argument, one or more than one fixed arguments)
    partial_func = partial(func_multiple_argument, m=10)
    print("Running imap multiprocessing for single-argument partial functions ...")
    result_list = run_imap_multiprocessing(func=partial_func, argument_list=argument_list, num_processes=num_processes)
    print("Running imap_unordered multiprocessing for single-argument partial functions ...")
    result_list = run_imap_unordered_multiprocessing(func=partial_func, argument_list=argument_list, num_processes=num_processes)
    # Since it is unordered, this assertion might not be valid
    # assert result_list == argument_list

    # apply_async
    # One dynamic argument
    func = func_single_argument
    argument_list = [random.randint(0, 100) for _ in range(num_jobs)]
    print("Running apply_async multiprocessing for single-argument functions ...")
    result_list = run_apply_async_multiprocessing(func=func, argument_list=argument_list, num_processes=num_processes)
    assert result_list == argument_list
    # More than one dynamic arguments
    func = func_multiple_argument
    argument_list = [(random.randint(0, 100), random.randint(0, 100)) for _ in range(num_jobs)]
    print("Running apply_async multiprocessing for multi-argument functions ...")
    result_list = run_apply_async_multiprocessing(func=func, argument_list=argument_list, num_processes=num_processes)
    assert result_list == argument_list
    # partial functions (multiple dynamic arguments, one or more than one fixed arguments)
    partial_func = partial(func_multiple_argument, x=1, y=2, z=3) # Giving some arguments for kwargs
    print("Running apply_async multiprocessing for multi-argument partial functions ...")
    result_list = run_apply_async_multiprocessing(func=partial_func, argument_list=argument_list, num_processes=num_processes)
    assert result_list == argument_list

if __name__ == "__main__":

    main()

During the execution of the multiprocessing program, we could see that the 100 jobs were run in 10 batches whose batch size is 10. Each batch roughly takes 0.5 seconds, and the total execution time is roughly 5 seconds.

$ python multiprocess_examples_1.py 
Running imap multiprocessing for single-argument functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.97it/s]
Running imap_unordered multiprocessing for single-argument functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.96it/s]
Running imap multiprocessing for single-argument partial functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.97it/s]
Running imap_unordered multiprocessing for single-argument partial functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.97it/s]
Running apply_async multiprocessing for single-argument functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.97it/s]
Running apply_async multiprocessing for multi-argument functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.97it/s]
Running apply_async multiprocessing for multi-argument partial functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.96it/s]

Few Large Processes

Sometimes, the entire task consists of few large processes, each of which takes long time to finish. We would like to monitor each of the progress using multiple progress bars.


In the following examples, I have implemented the examples of using apply_async with tqdm. imap and imap_unordered should also work similarly.

# multiprocess_examples_2.py

import time
import random
from tqdm import tqdm
from multiprocessing import Pool, freeze_support, RLock

def func(pid, n):

    tqdm_text = "#" + "{}".format(pid).zfill(3)

    current_sum = 0
    with tqdm(total=n, desc=tqdm_text, position=pid+1) as pbar:
        for i in range(1, n+1):
            current_sum += i
            time.sleep(0.05)
            pbar.update(1)
    
    return current_sum

def main():

    freeze_support() # For Windows support

    num_processes = 10
    num_jobs = 30
    random_seed = 0
    random.seed(random_seed) 

    pool = Pool(processes=num_processes, initargs=(RLock(),), initializer=tqdm.set_lock)

    argument_list = [random.randint(0, 100) for _ in range(num_jobs)]

    jobs = [pool.apply_async(func, args=(i,n,)) for i, n in enumerate(argument_list)]
    pool.close()
    result_list = [job.get() for job in jobs]

    # Important to print these blanks
    print("\n" * (len(argument_list) + 1))

if __name__ == "__main__":

    main()

During execution, 10 progress bars corresponding to the 10 processes would update simultaneously.

$ python multiprocess_examples_2.py 

#000: 100%|█████████████████████████████████████| 49/49 [00:02<00:00, 19.95it/s]
#001: 100%|█████████████████████████████████████| 97/97 [00:04<00:00, 19.92it/s]
#002: 100%|█████████████████████████████████████| 53/53 [00:02<00:00, 19.94it/s]
#003: 100%|███████████████████████████████████████| 5/5 [00:00<00:00, 19.94it/s]
#004: 100%|█████████████████████████████████████| 33/33 [00:01<00:00, 19.95it/s]
#005: 100%|█████████████████████████████████████| 65/65 [00:03<00:00, 19.93it/s]
#006: 100%|█████████████████████████████████████| 62/62 [00:03<00:00, 19.91it/s]
#007: 100%|█████████████████████████████████████| 51/51 [00:02<00:00, 19.94it/s]
#008: 100%|███████████████████████████████████| 100/100 [00:05<00:00, 19.90it/s]
#009: 100%|█████████████████████████████████████| 38/38 [00:01<00:00, 19.95it/s]
#010: 100%|█████████████████████████████████████| 61/61 [00:03<00:00, 19.93it/s]
#011: 100%|█████████████████████████████████████| 45/45 [00:02<00:00, 19.91it/s]
#012: 100%|█████████████████████████████████████| 74/74 [00:03<00:00, 19.91it/s]
#013: 100%|█████████████████████████████████████| 27/27 [00:01<00:00, 19.89it/s]
#014: 100%|█████████████████████████████████████| 64/64 [00:03<00:00, 19.90it/s]
#015: 100%|█████████████████████████████████████| 17/17 [00:00<00:00, 19.87it/s]
#016: 100%|█████████████████████████████████████| 36/36 [00:01<00:00, 19.90it/s]
#017: 100%|█████████████████████████████████████| 17/17 [00:00<00:00, 19.91it/s]
#018: 100%|█████████████████████████████████████| 96/96 [00:04<00:00, 19.92it/s]
#019: 100%|█████████████████████████████████████| 12/12 [00:00<00:00, 19.92it/s]
#020: 100%|█████████████████████████████████████| 79/79 [00:03<00:00, 19.91it/s]
#021: 100%|█████████████████████████████████████| 32/32 [00:01<00:00, 19.89it/s]
#022: 100%|█████████████████████████████████████| 68/68 [00:03<00:00, 19.93it/s]
#023: 100%|█████████████████████████████████████| 90/90 [00:04<00:00, 19.93it/s]
#024: 100%|█████████████████████████████████████| 77/77 [00:03<00:00, 19.91it/s]
#025: 100%|█████████████████████████████████████| 18/18 [00:00<00:00, 19.90it/s]
#026: 100%|█████████████████████████████████████| 39/39 [00:01<00:00, 19.90it/s]
#027: 100%|█████████████████████████████████████| 12/12 [00:00<00:00, 19.90it/s]
#028: 100%|█████████████████████████████████████| 93/93 [00:04<00:00, 19.92it/s]
#029: 100%|███████████████████████████████████████| 9/9 [00:00<00:00, 19.89it/s]

Conclusions

imap and imap_unordered could be used with tqdm for some simple multiprocessing tasks for a single function which takes a single dynamic argument. For one single or multiple functions which might take multiple dynamic arguments, we should use apply_async with tqdm.

References