内容目录
原文链接:http://www.juzicode.com/python-module-asyncio/
asyncio 是 Python 的异步 I/O 框架,基于事件循环和协程实现高效并发。适用于网络服务、高并发请求和实时数据流处理等场景,能显著提升 I/O 密集型任务的执行效率。
应用场景
- Web 服务器:处理高并发客户端连接
- 实时爬虫:并行抓取多个网页
- 消息队列:异步消费和处理消息
- 物联网设备:管理多个传感器的数据流
安装或导入
Python 3.4+ 内置库,直接导入使用:
import asyncio
基本用法
以下示例展示 asyncio 的核心操作,包括协程定义、任务调度和异步执行。
1. 定义异步函数
通过 async 关键字定义协程函数,await 用于暂停执行直到异步操作完成。asyncio.run() 用于启动事件循环。
#juzicode.com/VX公众号:juzicode
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(hello())
运行结果:
Hello
World
2. 并发执行任务
使用 asyncio.gather() 并行执行多个协程。事件循环会自动调度任务交替执行。
#juzicode.com/VX公众号:juzicode
import asyncio
async def task(n):
print(f"Start task {n}")
await asyncio.sleep(1)
print(f"End task {n}")
async def main():
await asyncio.gather(task(1), task(2))
asyncio.run(main())
运行结果:
Start task 1
Start task 2
End task 1
End task 2
3. 异步网络请求
通过 aiohttp 创建客户端会话,使用 async with 管理连接生命周期。响应内容通过 await 异步获取。
#juzicode.com/VX公众号:juzicode
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
html = await fetch("https://example.com")
print(len(html))
asyncio.run(main())
运行结果:
1270
4. 定时任务控制
通过 while 循环实现周期性任务。使用 asyncio.create_task() 创建后台任务,通过 cancel() 方法终止执行。
#juzicode.com/VX公众号:juzicode
import asyncio
async def timer():
while True:
print("Tick")
await asyncio.sleep(1)
async def main():
task = asyncio.create_task(timer())
await asyncio.sleep(3)
task.cancel()
asyncio.run(main())
运行结果:
Tick
Tick
Tick
5. 任务超时处理
使用 wait_for() 设置协程执行时间上限。当子任务执行时间超过主任务设定的等待时长时,会触发 TimeoutError 异常。
#juzicode.com/VX公众号:juzicode
import asyncio
async def long_task():
await asyncio.sleep(10)
return "Done"
async def main():
try:
await asyncio.wait_for(long_task(), timeout=2)
except asyncio.TimeoutError:
print("Timeout!")
asyncio.run(main())
运行结果:
Timeout!
高级功能
以下示例展示 asyncio 的进阶用法,涵盖复杂任务管理和性能优化。
1. 协程间通信
使用 asyncio.Queue 实现生产者-消费者模式。put() 和 get() 方法均为异步操作,确保队列操作不阻塞事件循环。
#juzicode.com/VX公众号:juzicode
import asyncio
async def producer(q):
for i in range(3):
await q.put(i)
await asyncio.sleep(0.5)
async def consumer(q):
while True:
item = await q.get()
print(f"Got {item}")
q.task_done()
async def main():
q = asyncio.Queue()
await asyncio.gather(producer(q), consumer(q))
asyncio.run(main())
运行结果:
Got 0
Got 1
Got 2
2. 同步原语
使用 Lock 确保共享资源的线程安全。async with 语句自动获取和释放锁,避免协程间的竞争条件。
#juzicode.com/VX公众号:juzicode
import asyncio
lock = asyncio.Lock()
async def safe_update():
async with lock:
print("Updating...")
await asyncio.sleep(1)
async def main():
await asyncio.gather(safe_update(), safe_update())
asyncio.run(main())
运行结果:
Updating...
Updating...
3. 协程取消
通过 task.cancel() 发送取消请求。协程内捕获 CancelledError 异常可实现资源清理,await task 确保取消操作完成。
#juzicode.com/VX公众号:juzicode
import asyncio
async def cancellable_task():
try:
while True:
print("Running...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task cancelled")
async def main():
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(2.5)
task.cancel()
await task
asyncio.run(main())
运行结果:
Running...
Running...
Running...
Task cancelled
4. 混合异步/同步
使用 loop.run_in_executor 在异步代码中执行同步函数。将 CPU 密集型任务委托给线程池,避免阻塞事件循环。
#juzicode.com/VX公众号:juzicode
import time
import asyncio
def sync_task():
time.sleep(1)
return "Sync Done"
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, sync_task)
print(result)
asyncio.run(main())
运行结果:
Sync Done
5. 性能监控
通过事件循环的内置计时器测量协程执行时间。get_running_loop().time() 提供高精度时间戳,可用于性能分析。
#juzicode.com/VX公众号:juzicode
import asyncio
async def monitored_task():
start = asyncio.get_running_loop().time()
await asyncio.sleep(1)
print(f"Cost: {asyncio.get_running_loop().time()-start:.2f}s")
asyncio.run(monitored_task())
运行结果:
Cost: 1.00s
总结
asyncio是基于事件的非阻塞 I/O 模型,有清晰的协程间协作机制,并且与异步生态库深度集成,非常适用于需要高吞吐量和低延迟的 I/O 密集型场景,通过合理的异步改造可提升程序性能。