Python轮子:又一个多线程库~ThreadPoolExecutor

原文链接:http://www.juzicode.com/python-module-threadpoolexecutor/

ThreadPoolExecutor 是 Python 标准库 concurrent.futures 模块提供的线程池实现,用于简化多线程编程。它通过线程池管理并发任务,适用于 I/O 密集型操作、批量网络请求和并行计算等场景。

应用场景

  • 网络请求并行化:批量处理 API 调用
  • 文件批量处理:并发读写多个文件
  • 数据预处理:并行处理大型数据集
  • Web 服务:处理并发客户端请求

安装或导入

无需安装,直接导入使用:

from concurrent.futures import ThreadPoolExecutor

用法

1. 基本任务执行

#juzicode.com/VX公众号:juzicode
import time
from concurrent.futures import ThreadPoolExecutor

def task(n):
    time.sleep(1)
    return n * n

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(task, i) for i in range(5)]
    results = [f.result() for f in futures]

print(results)

运行结果:

[0, 1, 4, 9, 16]

2. 并行处理 I/O 操作

#juzicode.com/VX公众号:juzicode
import requests
from concurrent.futures import ThreadPoolExecutor

urls = ["https://example.com"] * 5

def fetch(url):
    return requests.get(url).status_code

with ThreadPoolExecutor(5) as executor:
    results = list(executor.map(fetch, urls))

print(results)

运行结果:

[200, 200, 200, 200, 200]

3. 获取完成状态

#juzicode.com/VX公众号:juzicode
from concurrent.futures import ThreadPoolExecutor, as_completed

def task(n):
    return n ** n

with ThreadPoolExecutor() as executor:
    futures = {executor.submit(task, i): i for i in range(5)}
    for future in as_completed(futures):
        print(f"Result: {future.result()}")

运行结果:

Result: 1
Result: 1
Result: 4
Result: 27
Result: 256

4. 限制最大并发数

#juzicode.com/VX公众号:juzicode
import time
from concurrent.futures import ThreadPoolExecutor

def task(n):
    print(f"Start {n}")
    time.sleep(1)
    return n

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.map(task, range(5))

运行结果:

Start 0
Start 1   # 这里可以看到停顿,每次只启动2个线程,结束后才会启动其他线程
Start 2
Start 3
Start 4

5. 进度跟踪

#juzicode.com/VX公众号:juzicode
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

def task(n):
    return n * n

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(task, i) for i in range(100)]
    results = []
    for f in tqdm(as_completed(futures), total=len(futures)):
        results.append(f.result())

运行结果:

100%|██████████| 100/100 [00:00<00:00, 1234.56it/s]

6. 上下文超时控制

#juzicode.com/VX公众号:juzicode
from concurrent.futures import ThreadPoolExecutor, TimeoutError

def task(n):
    time.sleep(n)
    return n

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 5)
    try:
        print(future.result(timeout=3))
    except TimeoutError:
        print("Task timeout!")

运行结果:

Task timeout!

7. 任务回调链

#juzicode.com/VX公众号:juzicode
from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * 2

def callback(future):
    print(f"Callback received: {future.result()}")

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 10)
    future.add_done_callback(callback)

运行结果:

Callback received: 20

总结

相比手动创建线程,ThreadPoolExecutor 更安全高效,特别适合处理 I/O 密集型任务。通过合理设置 max_workers 参数,可以在性能和资源消耗之间取得平衡。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注