Python graphlib was officially introduced in Python 3.9. The graphlib.TopologicalSorter provides functionality to topologically sort a graph of hashable nodes.
In this blog post, I would like to quickly discuss how to use graphlib.TopologicalSorter to topologically sort a graph and run parallel processing for the nodes being sorted.
Topological Sort Examples
Simple Examples
The following two simple examples are derived from the graphlib.TopologicalSorter examples and they should be easy to follow.
The Python graphlib.TopologicalSorter also allows returning the nodes at different levels during sorting. Because the nodes at the same level could usually be processed in parallel, it is possible to seamlessly integrate parallel processing into topological sort.
With is_active, get_ready, and done, it is easy to traverse the nodes while the nodes are being topologically sorted. In fact, static_order is actually implemented using an implementation that is similar to the following one.
In the above implementation, we did not do too much to the nodes being returned by get_ready at each stage. However, in practice, it is possible that there are many nodes being returned get_ready at each stage and we have to do lots of heavy work for each node. Then we could take the advantage of multiprocessing or threading to process the nodes .
Topological Sort With Threading Parallel Processing
ts = TopologicalSorter(graph) ts.prepare() while ts.is_active(): print("Queued Nodes: ") for node in ts.get_ready(): task_queue.put(node) print(node) node = finalized_tasks_queue.get() print("Finalized Nodes: ") print(node) ts.done(node) finalized_tasks_queue.task_done()
task_queue.join() finalized_tasks_queue.join()
for node in visited_map: assert visited_map[node] == True
if __name__ == "__main__":
main()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
$ python threading_processing_topological_sort.py Queued Nodes: A Finalized Nodes: A Queued Nodes: C B Finalized Nodes: C Queued Nodes: Finalized Nodes: B Queued Nodes: D Finalized Nodes: D
Topological Sort With Multiprocessing Parallel Processing
for _ inrange(num_processes): process = multiprocessing.Process(target=worker, args=(task_queue, finalized_tasks_queue, visited_map), daemon=True) processes.append(process) process.start()
ts = TopologicalSorter(graph) ts.prepare() while ts.is_active(): print("Queued Nodes: ") for node in ts.get_ready(): task_queue.put(node) print(node) node = finalized_tasks_queue.get() print("Finalized Nodes: ") print(node) ts.done(node) finalized_tasks_queue.task_done()
task_queue.join() finalized_tasks_queue.join()
for node in visited_map: assert visited_map[node] == True
if __name__ == "__main__":
main()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
$ python multiprocessing_processing_topological_sort.py Queued Nodes: A Finalized Nodes: A Queued Nodes: C B Finalized Nodes: B Queued Nodes: Finalized Nodes: C Queued Nodes: D Finalized Nodes: D