how.wtf

How to wait for all threads to finish in Python

· Thomas Taylor

waiting for threads to finish using concurrent futures in Python

For a side project, I needed to wait for multiple threads to complete before proceeding with the next step. In other words, I needed threads to behave like the Promise.all() functionality of JavaScript.

Wait for threads in Python

In the sections below, we’ll discuss different methods for waitin on threads to complete in Python.

Wait for all threads to complete using start / join

In simple terms, we can create the threads, append them to a list, then start and join them.

For example:

 1import time
 2from threading import Thread
 3
 4
 5def work(interval_seconds):
 6    print(f"sleeping for {interval_seconds} seconds")
 7    time.sleep(interval_seconds)
 8    print(f"slept for {interval_seconds} seconds")
 9
10
11threads = [
12    Thread(target=work, args=(1,)),
13    Thread(target=work, args=(2,)),
14    Thread(target=work, args=(1,)),
15]
16
17for t in threads:
18    t.start()
19
20for t in threads:
21    t.join()

Output:

1sleeping for 1 seconds
2sleeping for 2 seconds
3sleeping for 1 seconds
4slept for 1 seconds
5slept for 1 seconds
6slept for 2 seconds

This works well and may be all you need; however, there are alternatives.

Wait for all threads to complete using a ThreadPoolExecutor

In Python 3.2, concurrent.futures was released. The purpose was to provide a simple high-level API for asynchronously executing callables.

There were two classes released:

Both implement the same interface, the Executor class, so the examples provided below will work for either. For the purposes of this tutorial, I’ll use the ThreadPoolExecutor.

Here is the same example from above, but using the ThreadPoolExecutor:

 1import time
 2from concurrent.futures import ThreadPoolExecutor
 3
 4
 5def work(interval_seconds):
 6    print(f"sleeping for {interval_seconds} seconds")
 7    time.sleep(interval_seconds)
 8    print(f"slept for {interval_seconds} seconds")
 9
10
11with ThreadPoolExecutor(max_workers=2) as e:
12    e.submit(work, 1)
13    e.submit(work, 2)
14    e.submit(work, 1)

Output:

1sleeping for 1 seconds
2sleeping for 2 seconds
3slept for 1 seconds
4sleeping for 1 seconds
5slept for 1 seconds
6slept for 2 seconds

The executor provides a .map function for convenience as well:

 1import time
 2from concurrent.futures import ThreadPoolExecutor
 3
 4
 5def work(interval_seconds):
 6    print(f"sleeping for {interval_seconds} seconds")
 7    time.sleep(interval_seconds)
 8    print(f"slept for {interval_seconds} seconds")
 9
10
11with ThreadPoolExecutor(max_workers=2) as e:
12    e.map(work, [1, 2, 3])

Wait for all threads to complete and return their results

Firing an forgetting is useful, but returning results was my use-case. Using ThreadPoolExecutor or ProcessPoolExecutor makes this real easy.

 1import time
 2from concurrent.futures import ThreadPoolExecutor, as_completed
 3
 4
 5def work(interval_seconds, order):
 6    print(f"sleeping for {interval_seconds} seconds")
 7    time.sleep(interval_seconds)
 8    print(f"slept for {interval_seconds} seconds")
 9    return f"task {order}"
10
11
12with ThreadPoolExecutor(max_workers=2) as e:
13    futures = []
14    for x in [(1, 1), (2, 2), (1, 3)]:
15        futures.append(e.submit(work, x[0], x[1]))
16
17for future in as_completed(futures):
18    print(future.result())

In the example, I added a new parameter named order that represents when the item was queued.

Output:

1sleeping for 1 seconds
2sleeping for 2 seconds
3slept for 1 seconds
4sleeping for 1 seconds
5slept for 2 seconds
6slept for 1 seconds
7task 2
8task 1
9task 3

Wait for all threads to complete and return their results in order

In the last example, the results where returned out of order. Just a small tweak enables the futures to be in order:

 1import time
 2from concurrent.futures import ThreadPoolExecutor, as_completed
 3
 4
 5def work(interval_seconds, order):
 6    print(f"sleeping for {interval_seconds} seconds")
 7    time.sleep(interval_seconds)
 8    print(f"slept for {interval_seconds} seconds")
 9    return f"task {order}"
10
11
12with ThreadPoolExecutor(max_workers=2) as e:
13    futures = []
14    for x in [(1, 1), (2, 2), (1, 3)]:
15        futures.append(e.submit(work, x[0], x[1]))
16
17for future in futures:
18    print(future.result())

Output:

1sleeping for 1 seconds
2sleeping for 2 seconds
3slept for 1 seconds
4sleeping for 1 seconds
5slept for 2 seconds
6slept for 1 seconds
7task 1
8task 2
9task 3

Promise.all and Promise.allSettled using threads

For the last topic, I want to replicate the interface that NodeJS provides: Promise.all() and Promise.allSettled().

If you’re unfamiliar, await Promise.all() waits for all the fulfilled promises to complete. If a rejection occurs, it throws an exception. Promise.allSettled() waits for all promises to be rejected or fulfilled without throwing an exception.

To replicate the functionality using concurrent.futures, I created a Job dataclass that has three properties:

  1. The Callable (ie. function) that will be called for the job
  2. The Tuple of positional arguments that will be passed to the job
  3. The dict[str, Any] of keyword arguments that will be passed to the job
 1import concurrent.futures
 2import time
 3
 4from dataclasses import dataclass
 5from typing import Callable, Any, Tuple, Optional
 6
 7
 8def work(interval_seconds, order):
 9    print(f"sleeping for {interval_seconds} seconds")
10    time.sleep(interval_seconds)
11    print(f"slept for {interval_seconds} seconds")
12    return f"task {order}"
13
14
15@dataclass
16class Job:
17    func: Callable[..., Any]
18    args: Tuple[Any, ...] = ()
19    kwargs: dict[str, Any] = None
20
21    def execute(self):
22        return self.func(*self.args, **(self.kwargs or {}))
23
24
25def all(jobs):
26    with concurrent.futures.ThreadPoolExecutor() as executor:
27        futures = [executor.submit(job.execute) for job in jobs]
28        return [f.result() for f in futures]
29
30
31jobs = [
32    Job(func=work, args=(1, 1)),
33    Job(func=work, args=(2, 2)),
34    Job(func=work, args=(1, 3)),
35]
36
37results = all(jobs)
38print(results)

The all(jobs) function accepts a list of jobs to complete.

Output:

1sleeping for 1 seconds
2sleeping for 2 seconds
3sleeping for 1 seconds
4slept for 1 seconds
5slept for 1 seconds
6slept for 2 seconds
7['task 1', 'task 2', 'task 3']

We can extend the prior example by adding a all_settled function and a JobResult dataclass:

 1import concurrent.futures
 2import time
 3
 4from dataclasses import dataclass
 5from typing import Callable, Any, Tuple, Optional
 6
 7
 8def work(interval_seconds, order):
 9    print(f"sleeping for {interval_seconds} seconds")
10    time.sleep(interval_seconds)
11    print(f"slept for {interval_seconds} seconds")
12    return f"task {order}"
13
14
15@dataclass
16class JobResult:
17    status: str
18    value: Optional[Any] = None
19    reason: Optional[Exception] = None
20
21
22@dataclass
23class Job:
24    func: Callable[..., Any]
25    args: Tuple[Any, ...] = ()
26    kwargs: dict[str, Any] = None
27
28    def execute(self):
29        return self.func(*self.args, **(self.kwargs or {}))
30
31
32def all(jobs):
33    with concurrent.futures.ThreadPoolExecutor() as executor:
34        futures = [executor.submit(job.execute) for job in jobs]
35        return [f.result() for f in futures]
36
37
38def all_settled(jobs):
39    with concurrent.futures.ThreadPoolExecutor() as executor:
40        futures = [executor.submit(job.execute) for job in jobs]
41        concurrent.futures.wait(futures)
42
43        results = []
44        for future in futures:
45            if future.exception():
46                results.append(JobResult(status="rejected", reason=future.exception()))
47            else:
48                results.append(JobResult(status="fulfilled", value=future.result()))
49        return results
50
51
52jobs = [
53    Job(func=work, args=(1, 1)),
54    Job(func=work, args=(2, 2)),
55    Job(func=work, args=(1, 3)),
56]
57
58results = all_settled(jobs)
59for r in results:
60    print(r)

Output:

1sleeping for 1 seconds
2sleeping for 2 seconds
3sleeping for 1 seconds
4slept for 1 seconds
5slept for 1 seconds
6slept for 2 seconds
7JobResult(status='fulfilled', value='task 1', reason=None)
8JobResult(status='fulfilled', value='task 2', reason=None)
9JobResult(status='fulfilled', value='task 3', reason=None)

#python  

Reply to this post by email ↪