How to wait for all threads to finish in Python
For a side project, I needed to wait for multiple threads to complete before proceeding with the next step. In other words, I needed threads to behave like the Promise.all()
functionality of JavaScript.
Wait for threads in Python
In the sections below, we’ll discuss different methods for waitin on threads to complete in Python.
Wait for all threads to complete using start / join
In simple terms, we can create the threads, append them to a list, then start
and join
them.
For example:
1import time
2from threading import Thread
3
4
5def work(interval_seconds):
6 print(f"sleeping for {interval_seconds} seconds")
7 time.sleep(interval_seconds)
8 print(f"slept for {interval_seconds} seconds")
9
10
11threads = [
12 Thread(target=work, args=(1,)),
13 Thread(target=work, args=(2,)),
14 Thread(target=work, args=(1,)),
15]
16
17for t in threads:
18 t.start()
19
20for t in threads:
21 t.join()
Output:
1sleeping for 1 seconds
2sleeping for 2 seconds
3sleeping for 1 seconds
4slept for 1 seconds
5slept for 1 seconds
6slept for 2 seconds
This works well and may be all you need; however, there are alternatives.
Wait for all threads to complete using a ThreadPoolExecutor
In Python 3.2, concurrent.futures was released. The purpose was to provide a simple high-level API for asynchronously executing callables.
There were two classes released:
ThreadPoolExecutor
for performing asynchronous executions using threadsProcessPoolExecutor
for performing asynchronous executions using processes
Both implement the same interface, the Executor
class, so the examples provided below will work for either. For the purposes of this tutorial, I’ll use the ThreadPoolExecutor
.
Here is the same example from above, but using the ThreadPoolExecutor
:
1import time
2from concurrent.futures import ThreadPoolExecutor
3
4
5def work(interval_seconds):
6 print(f"sleeping for {interval_seconds} seconds")
7 time.sleep(interval_seconds)
8 print(f"slept for {interval_seconds} seconds")
9
10
11with ThreadPoolExecutor(max_workers=2) as e:
12 e.submit(work, 1)
13 e.submit(work, 2)
14 e.submit(work, 1)
Output:
1sleeping for 1 seconds
2sleeping for 2 seconds
3slept for 1 seconds
4sleeping for 1 seconds
5slept for 1 seconds
6slept for 2 seconds
The executor
provides a .map
function for convenience as well:
1import time
2from concurrent.futures import ThreadPoolExecutor
3
4
5def work(interval_seconds):
6 print(f"sleeping for {interval_seconds} seconds")
7 time.sleep(interval_seconds)
8 print(f"slept for {interval_seconds} seconds")
9
10
11with ThreadPoolExecutor(max_workers=2) as e:
12 e.map(work, [1, 2, 3])
Wait for all threads to complete and return their results
Firing an forgetting is useful, but returning results was my use-case. Using ThreadPoolExecutor
or ProcessPoolExecutor
makes this real easy.
1import time
2from concurrent.futures import ThreadPoolExecutor, as_completed
3
4
5def work(interval_seconds, order):
6 print(f"sleeping for {interval_seconds} seconds")
7 time.sleep(interval_seconds)
8 print(f"slept for {interval_seconds} seconds")
9 return f"task {order}"
10
11
12with ThreadPoolExecutor(max_workers=2) as e:
13 futures = []
14 for x in [(1, 1), (2, 2), (1, 3)]:
15 futures.append(e.submit(work, x[0], x[1]))
16
17for future in as_completed(futures):
18 print(future.result())
In the example, I added a new parameter named order
that represents when the item was queued.
- The first job takes 1 second and returns 1
- The second job takes 2 seconds and returns 2
- The third job takes 1 seconds and returns 3
Output:
1sleeping for 1 seconds
2sleeping for 2 seconds
3slept for 1 seconds
4sleeping for 1 seconds
5slept for 2 seconds
6slept for 1 seconds
7task 2
8task 1
9task 3
Wait for all threads to complete and return their results in order
In the last example, the results where returned out of order. Just a small tweak enables the futures to be in order:
1import time
2from concurrent.futures import ThreadPoolExecutor, as_completed
3
4
5def work(interval_seconds, order):
6 print(f"sleeping for {interval_seconds} seconds")
7 time.sleep(interval_seconds)
8 print(f"slept for {interval_seconds} seconds")
9 return f"task {order}"
10
11
12with ThreadPoolExecutor(max_workers=2) as e:
13 futures = []
14 for x in [(1, 1), (2, 2), (1, 3)]:
15 futures.append(e.submit(work, x[0], x[1]))
16
17for future in futures:
18 print(future.result())
Output:
1sleeping for 1 seconds
2sleeping for 2 seconds
3slept for 1 seconds
4sleeping for 1 seconds
5slept for 2 seconds
6slept for 1 seconds
7task 1
8task 2
9task 3
Promise.all and Promise.allSettled using threads
For the last topic, I want to replicate the interface that NodeJS provides: Promise.all()
and Promise.allSettled()
.
If you’re unfamiliar, await Promise.all()
waits for all the fulfilled promises to complete. If a rejection occurs, it throws an exception. Promise.allSettled()
waits for all promises to be rejected or fulfilled without throwing an exception.
To replicate the functionality using concurrent.futures
, I created a Job
dataclass
that has three properties:
- The
Callable
(ie. function) that will be called for the job - The
Tuple
of positional arguments that will be passed to the job - The
dict[str, Any]
of keyword arguments that will be passed to the job
1import concurrent.futures
2import time
3
4from dataclasses import dataclass
5from typing import Callable, Any, Tuple, Optional
6
7
8def work(interval_seconds, order):
9 print(f"sleeping for {interval_seconds} seconds")
10 time.sleep(interval_seconds)
11 print(f"slept for {interval_seconds} seconds")
12 return f"task {order}"
13
14
15@dataclass
16class Job:
17 func: Callable[..., Any]
18 args: Tuple[Any, ...] = ()
19 kwargs: dict[str, Any] = None
20
21 def execute(self):
22 return self.func(*self.args, **(self.kwargs or {}))
23
24
25def all(jobs):
26 with concurrent.futures.ThreadPoolExecutor() as executor:
27 futures = [executor.submit(job.execute) for job in jobs]
28 return [f.result() for f in futures]
29
30
31jobs = [
32 Job(func=work, args=(1, 1)),
33 Job(func=work, args=(2, 2)),
34 Job(func=work, args=(1, 3)),
35]
36
37results = all(jobs)
38print(results)
The all(jobs)
function accepts a list of jobs to complete.
Output:
1sleeping for 1 seconds
2sleeping for 2 seconds
3sleeping for 1 seconds
4slept for 1 seconds
5slept for 1 seconds
6slept for 2 seconds
7['task 1', 'task 2', 'task 3']
We can extend the prior example by adding a all_settled
function and a JobResult
dataclass
:
1import concurrent.futures
2import time
3
4from dataclasses import dataclass
5from typing import Callable, Any, Tuple, Optional
6
7
8def work(interval_seconds, order):
9 print(f"sleeping for {interval_seconds} seconds")
10 time.sleep(interval_seconds)
11 print(f"slept for {interval_seconds} seconds")
12 return f"task {order}"
13
14
15@dataclass
16class JobResult:
17 status: str
18 value: Optional[Any] = None
19 reason: Optional[Exception] = None
20
21
22@dataclass
23class Job:
24 func: Callable[..., Any]
25 args: Tuple[Any, ...] = ()
26 kwargs: dict[str, Any] = None
27
28 def execute(self):
29 return self.func(*self.args, **(self.kwargs or {}))
30
31
32def all(jobs):
33 with concurrent.futures.ThreadPoolExecutor() as executor:
34 futures = [executor.submit(job.execute) for job in jobs]
35 return [f.result() for f in futures]
36
37
38def all_settled(jobs):
39 with concurrent.futures.ThreadPoolExecutor() as executor:
40 futures = [executor.submit(job.execute) for job in jobs]
41 concurrent.futures.wait(futures)
42
43 results = []
44 for future in futures:
45 if future.exception():
46 results.append(JobResult(status="rejected", reason=future.exception()))
47 else:
48 results.append(JobResult(status="fulfilled", value=future.result()))
49 return results
50
51
52jobs = [
53 Job(func=work, args=(1, 1)),
54 Job(func=work, args=(2, 2)),
55 Job(func=work, args=(1, 3)),
56]
57
58results = all_settled(jobs)
59for r in results:
60 print(r)
Output:
1sleeping for 1 seconds
2sleeping for 2 seconds
3sleeping for 1 seconds
4slept for 1 seconds
5slept for 1 seconds
6slept for 2 seconds
7JobResult(status='fulfilled', value='task 1', reason=None)
8JobResult(status='fulfilled', value='task 2', reason=None)
9JobResult(status='fulfilled', value='task 3', reason=None)