Python graphlib Topological Sort

Introduction

Python graphlib was officially introduced in Python 3.9. The graphlib.TopologicalSorter provides functionality to topologically sort a graph of hashable nodes.

In this blog post, I would like to quickly discuss how to use graphlib.TopologicalSorter to topologically sort a graph and run parallel processing for the nodes being sorted.

Topological Sort Examples

Simple Examples

The following two simple examples are derived from the graphlib.TopologicalSorter examples and they should be easy to follow.

Prebuilt Graph

simple_topological_sort_1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from graphlib import TopologicalSorter


def main():

graph = {"D": {"B", "C"}, "C": {"A"}, "B": {"A"}}
ts = TopologicalSorter(graph)
sorted_nodes = tuple(ts.static_order())
print("Sorted Nodes: ")
print(sorted_nodes)


if __name__ == "__main__":

main()
1
2
3
$ python simple_topological_sort_1.py 
Sorted Nodes:
('A', 'C', 'B', 'D')

Building Graph to Topological Sorter

simple_topological_sort_2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from graphlib import TopologicalSorter


def main():

ts = TopologicalSorter()
ts.add(3, 2, 1)
ts.add(1, 0)
sorted_nodes = tuple(ts.static_order())
print("Sorted Nodes: ")
print(sorted_nodes)


if __name__ == "__main__":

main()
1
2
3
$ python simple_topological_sort_2.py 
Sorted Nodes:
(2, 0, 1, 3)

Topological Sort With Parallel Processing

The Python graphlib.TopologicalSorter also allows returning the nodes at different levels during sorting. Because the nodes at the same level could usually be processed in parallel, it is possible to seamlessly integrate parallel processing into topological sort.

With is_active, get_ready, and done, it is easy to traverse the nodes while the nodes are being topologically sorted. In fact, static_order is actually implemented using an implementation that is similar to the following one.

staging_topological_sort.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from graphlib import TopologicalSorter


def main():

graph = {"D": {"B", "C"}, "C": {"A"}, "B": {"A"}}
ts = TopologicalSorter(graph)
sorted_nodes = []
ts.prepare()
while ts.is_active():
for node in ts.get_ready():
sorted_nodes.append(node)
ts.done(node)
sorted_nodes = tuple(sorted_nodes)
print("Sorted Nodes: ")
print(sorted_nodes)


if __name__ == "__main__":

main()
1
2
3
$ python staging_topological_sort.py 
Sorted Nodes:
('A', 'C', 'B', 'D')

In the above implementation, we did not do too much to the nodes being returned by get_ready at each stage. However, in practice, it is possible that there are many nodes being returned get_ready at each stage and we have to do lots of heavy work for each node. Then we could take the advantage of multiprocessing or threading to process the nodes .

Topological Sort With Threading Parallel Processing

threading_processing_topological_sort.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from graphlib import TopologicalSorter
import threading, queue
import time


def worker(task_queue, finalized_tasks_queue, visited_map):
while True:
item = task_queue.get()
# Do some work.
time.sleep(0.01)
if item in visited_map:
visited_map[item] = True
finalized_tasks_queue.put(item)
task_queue.task_done()


def main():

num_threads = 4
graph = {"D": {"B", "C"}, "C": {"A"}, "B": {"A"}}
visited_map = {"A": False, "B": False, "C": False, "D": False}
task_queue = queue.Queue()
finalized_tasks_queue = queue.Queue()

for _ in range(num_threads):
thread = threading.Thread(target=worker,
args=(task_queue, finalized_tasks_queue,
visited_map),
daemon=True)
thread.start()

ts = TopologicalSorter(graph)
ts.prepare()
while ts.is_active():
print("Queued Nodes: ")
for node in ts.get_ready():
task_queue.put(node)
print(node)
node = finalized_tasks_queue.get()
print("Finalized Nodes: ")
print(node)
ts.done(node)
finalized_tasks_queue.task_done()

task_queue.join()
finalized_tasks_queue.join()

for node in visited_map:
assert visited_map[node] == True


if __name__ == "__main__":

main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ python threading_processing_topological_sort.py 
Queued Nodes:
A
Finalized Nodes:
A
Queued Nodes:
C
B
Finalized Nodes:
C
Queued Nodes:
Finalized Nodes:
B
Queued Nodes:
D
Finalized Nodes:
D

Topological Sort With Multiprocessing Parallel Processing

multiprocessing_processing_topological_sort.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from graphlib import TopologicalSorter
import multiprocessing
import time


def worker(task_queue, finalized_tasks_queue, visited_map):
while True:
item = task_queue.get()
# Do some work.
time.sleep(0.01)
if item in visited_map:
visited_map[item] = True
finalized_tasks_queue.put(item)
task_queue.task_done()


def main():

num_processes = 4
processes = []
graph = {"D": {"B", "C"}, "C": {"A"}, "B": {"A"}}
visited_map = {"A": False, "B": False, "C": False, "D": False}
# Multiprocess-safe dictionary.
manager = multiprocessing.Manager()
visited_map = manager.dict(visited_map)
task_queue = multiprocessing.JoinableQueue()
finalized_tasks_queue = multiprocessing.JoinableQueue()

for _ in range(num_processes):
process = multiprocessing.Process(target=worker,
args=(task_queue,
finalized_tasks_queue,
visited_map),
daemon=True)
processes.append(process)
process.start()

ts = TopologicalSorter(graph)
ts.prepare()
while ts.is_active():
print("Queued Nodes: ")
for node in ts.get_ready():
task_queue.put(node)
print(node)
node = finalized_tasks_queue.get()
print("Finalized Nodes: ")
print(node)
ts.done(node)
finalized_tasks_queue.task_done()

task_queue.join()
finalized_tasks_queue.join()

for node in visited_map:
assert visited_map[node] == True


if __name__ == "__main__":

main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ python multiprocessing_processing_topological_sort.py 
Queued Nodes:
A
Finalized Nodes:
A
Queued Nodes:
C
B
Finalized Nodes:
B
Queued Nodes:
Finalized Nodes:
C
Queued Nodes:
D
Finalized Nodes:
D

References

Author

Lei Mao

Posted on

09-21-2022

Updated on

09-21-2022

Licensed under


Comments