It is natural that we would like to employ progress bars in our programs to show the progress of tasks. tqdm is one of my favorite progressing bar tools in Python. It could be easily incorporated to Python using trange to replace range or using tqdm.tqdm to wrap iterators, in order to show progress bars for a for loop.
Multiprocessing tasks should also have progress bars to show the progress. However, the incorporation of tqdm and multiprocessing was not well documented in Python. In this blog post, I would like to present several ways of using multiprocessing with tqdm.
Python Multiprocessing
Let’s first take a look at some of the basic class methods in Python multiprocessing library. The commonly used multiprocessing.Pool methods could be broadly categorized as apply and map. apply is applying some arguments for a function. map is a higher level abstraction for apply, applying each element in an iterable for a same function.
More specifically, the commonly used multiprocessing.Pool methods are:
apply_async
map
map_async
imap
imap_unordered
apply_async and map_async return “future results” immediately, we would need to collect the results using get. map, although it is running functions in parallel, blocks the return of the results until they are ready. imap is a lazier version of map, but it will return an iterator for the processes. imap_unordered is similar to imap, but the execution and returned result order of imap_unordered does not necessarily follow the order of arguments provided.
Based on the nature of these methods, apply_async, imap, imap_unordered are naturally compatible with tqdm to show progress bars.
Python Multiprocessing tqdm Examples
Many Small Processes
Sometimes, the entire task consists of many small processes, each of which does not take too much time to finish. The number of processes is much larger than the number of processes we could assign to the multiprocessing.Pool. We would like to monitor the progress of the entire task using one progressing bar.
The bottom line is not modifying the functions we would like to run using multiprocessing and tqdm. In the following examples, I have implemented the examples of using apply_async, imap, imap_unordered with tqdm for functions that take one single argument or multiple arguments.
result_list_tqdm = [] for result in tqdm(pool.imap_unordered(func=func, iterable=argument_list), total=len(argument_list)): result_list_tqdm.append(result)
jobs = [pool.apply_async(func=func, args=(*argument,)) ifisinstance(argument, tuple) else pool.apply_async(func=func, args=(argument,)) for argument in argument_list] pool.close() result_list_tqdm = [] for job in tqdm(jobs): result_list_tqdm.append(job.get())
# imap, imap_unordered # It only support functions with one dynamic argument func = func_single_argument argument_list = [random.randint(0, 100) for _ inrange(num_jobs)] print("Running imap multiprocessing for single-argument functions ...") result_list = run_imap_multiprocessing(func=func, argument_list=argument_list, num_processes=num_processes) assert result_list == argument_list print("Running imap_unordered multiprocessing for single-argument functions ...") result_list = run_imap_unordered_multiprocessing(func=func, argument_list=argument_list, num_processes=num_processes) # partial functions (one dynamic argument, one or more than one fixed arguments) partial_func = partial(func_multiple_argument, m=10) print("Running imap multiprocessing for single-argument partial functions ...") result_list = run_imap_multiprocessing(func=partial_func, argument_list=argument_list, num_processes=num_processes) print("Running imap_unordered multiprocessing for single-argument partial functions ...") result_list = run_imap_unordered_multiprocessing(func=partial_func, argument_list=argument_list, num_processes=num_processes) # Since it is unordered, this assertion might not be valid # assert result_list == argument_list
# apply_async # One dynamic argument func = func_single_argument argument_list = [random.randint(0, 100) for _ inrange(num_jobs)] print("Running apply_async multiprocessing for single-argument functions ...") result_list = run_apply_async_multiprocessing(func=func, argument_list=argument_list, num_processes=num_processes) assert result_list == argument_list # More than one dynamic arguments func = func_multiple_argument argument_list = [(random.randint(0, 100), random.randint(0, 100)) for _ inrange(num_jobs)] print("Running apply_async multiprocessing for multi-argument functions ...") result_list = run_apply_async_multiprocessing(func=func, argument_list=argument_list, num_processes=num_processes) assert result_list == argument_list # partial functions (multiple dynamic arguments, one or more than one fixed arguments) partial_func = partial(func_multiple_argument, x=1, y=2, z=3) # Giving some arguments for kwargs print("Running apply_async multiprocessing for multi-argument partial functions ...") result_list = run_apply_async_multiprocessing(func=partial_func, argument_list=argument_list, num_processes=num_processes) assert result_list == argument_list
if __name__ == "__main__":
main()
During the execution of the multiprocessing program, we could see that the 100 jobs were run in 10 batches whose batch size is 10. Each batch roughly takes 0.5 seconds, and the total execution time is roughly 5 seconds.
Sometimes, the entire task consists of few large processes, each of which takes long time to finish. We would like to monitor each of the progress using multiple progress bars.
In the following examples, I have implemented the examples of using apply_async with tqdm. imap and imap_unordered should also work similarly.
import time import random from tqdm import tqdm from multiprocessing import Pool, freeze_support, RLock
deffunc(pid, n):
tqdm_text = "#" + "{}".format(pid).zfill(3)
current_sum = 0 with tqdm(total=n, desc=tqdm_text, position=pid+1) as pbar: for i inrange(1, n+1): current_sum += i time.sleep(0.05) pbar.update(1) return current_sum
Note that this trick does not work for tqdm >= 4.40.0. Not sure whether it is a bug or not.
Conclusions
imap and imap_unordered could be used with tqdm for some simple multiprocessing tasks for a single function which takes a single dynamic argument. For one single or multiple functions which might take multiple dynamic arguments, we should use apply_async with tqdm.