原文链接:http://www.juzicode.com/python-module-threadpoolexecutor/
ThreadPoolExecutor 是 Python 标准库 concurrent.futures 模块提供的线程池实现,用于简化多线程编程。它通过线程池管理并发任务,适用于 I/O 密集型操作、批量网络请求和并行计算等场景。
应用场景
- 网络请求并行化:批量处理 API 调用
- 文件批量处理:并发读写多个文件
- 数据预处理:并行处理大型数据集
- Web 服务:处理并发客户端请求
安装或导入
无需安装,直接导入使用:
from concurrent.futures import ThreadPoolExecutor
用法
1. 基本任务执行
#juzicode.com/VX公众号:juzicode
import time
from concurrent.futures import ThreadPoolExecutor
def task(n):
time.sleep(1)
return n * n
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(task, i) for i in range(5)]
results = [f.result() for f in futures]
print(results)
运行结果:
[0, 1, 4, 9, 16]
2. 并行处理 I/O 操作
#juzicode.com/VX公众号:juzicode
import requests
from concurrent.futures import ThreadPoolExecutor
urls = ["https://example.com"] * 5
def fetch(url):
return requests.get(url).status_code
with ThreadPoolExecutor(5) as executor:
results = list(executor.map(fetch, urls))
print(results)
运行结果:
[200, 200, 200, 200, 200]
3. 获取完成状态
#juzicode.com/VX公众号:juzicode
from concurrent.futures import ThreadPoolExecutor, as_completed
def task(n):
return n ** n
with ThreadPoolExecutor() as executor:
futures = {executor.submit(task, i): i for i in range(5)}
for future in as_completed(futures):
print(f"Result: {future.result()}")
运行结果:
Result: 1
Result: 1
Result: 4
Result: 27
Result: 256
4. 限制最大并发数
#juzicode.com/VX公众号:juzicode
import time
from concurrent.futures import ThreadPoolExecutor
def task(n):
print(f"Start {n}")
time.sleep(1)
return n
with ThreadPoolExecutor(max_workers=2) as executor:
executor.map(task, range(5))
运行结果:
Start 0
Start 1 # 这里可以看到停顿,每次只启动2个线程,结束后才会启动其他线程
Start 2
Start 3
Start 4
5. 进度跟踪
#juzicode.com/VX公众号:juzicode
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
def task(n):
return n * n
with ThreadPoolExecutor() as executor:
futures = [executor.submit(task, i) for i in range(100)]
results = []
for f in tqdm(as_completed(futures), total=len(futures)):
results.append(f.result())
运行结果:
100%|██████████| 100/100 [00:00<00:00, 1234.56it/s]
6. 上下文超时控制
#juzicode.com/VX公众号:juzicode
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def task(n):
time.sleep(n)
return n
with ThreadPoolExecutor() as executor:
future = executor.submit(task, 5)
try:
print(future.result(timeout=3))
except TimeoutError:
print("Task timeout!")
运行结果:
Task timeout!
7. 任务回调链
#juzicode.com/VX公众号:juzicode
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * 2
def callback(future):
print(f"Callback received: {future.result()}")
with ThreadPoolExecutor() as executor:
future = executor.submit(task, 10)
future.add_done_callback(callback)
运行结果:
Callback received: 20
总结
相比手动创建线程,ThreadPoolExecutor 更安全高效,特别适合处理 I/O 密集型任务。通过合理设置 max_workers 参数,可以在性能和资源消耗之间取得平衡。