Python轮子:高性能异步协程库:asyncio

原文链接:http://www.juzicode.com/python-module-asyncio/

asyncio 是 Python 的异步 I/O 框架,基于事件循环和协程实现高效并发。适用于网络服务、高并发请求和实时数据流处理等场景,能显著提升 I/O 密集型任务的执行效率。

应用场景

  • Web 服务器:处理高并发客户端连接
  • 实时爬虫:并行抓取多个网页
  • 消息队列:异步消费和处理消息
  • 物联网设备:管理多个传感器的数据流

安装或导入

Python 3.4+ 内置库,直接导入使用:

import asyncio

基本用法

以下示例展示 asyncio 的核心操作,包括协程定义、任务调度和异步执行。

1. 定义异步函数

通过 async 关键字定义协程函数,await 用于暂停执行直到异步操作完成。asyncio.run() 用于启动事件循环。

#juzicode.com/VX公众号:juzicode
import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

asyncio.run(hello())

运行结果:

Hello
World

2. 并发执行任务

使用 asyncio.gather() 并行执行多个协程。事件循环会自动调度任务交替执行。

#juzicode.com/VX公众号:juzicode
import asyncio

async def task(n):
    print(f"Start task {n}")
    await asyncio.sleep(1)
    print(f"End task {n}")

async def main():
    await asyncio.gather(task(1), task(2))

asyncio.run(main())

运行结果:

Start task 1
Start task 2
End task 1
End task 2

3. 异步网络请求

通过 aiohttp 创建客户端会话,使用 async with 管理连接生命周期。响应内容通过 await 异步获取。

#juzicode.com/VX公众号:juzicode
import aiohttp
import asyncio


async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    html = await fetch("https://example.com")
    print(len(html))

asyncio.run(main())

运行结果:

1270

4. 定时任务控制

通过 while 循环实现周期性任务。使用 asyncio.create_task() 创建后台任务,通过 cancel() 方法终止执行。

#juzicode.com/VX公众号:juzicode
import asyncio
async def timer():
    while True:
        print("Tick")
        await asyncio.sleep(1)

async def main():
    task = asyncio.create_task(timer())
    await asyncio.sleep(3)
    task.cancel()

asyncio.run(main())

运行结果:

Tick
Tick
Tick

5. 任务超时处理

使用 wait_for() 设置协程执行时间上限。当子任务执行时间超过主任务设定的等待时长时,会触发 TimeoutError 异常。

#juzicode.com/VX公众号:juzicode
import asyncio
async def long_task():
    await asyncio.sleep(10)
    return "Done"

async def main():
    try:
        await asyncio.wait_for(long_task(), timeout=2)
    except asyncio.TimeoutError:
        print("Timeout!")

asyncio.run(main())

运行结果:

Timeout!

高级功能

以下示例展示 asyncio 的进阶用法,涵盖复杂任务管理和性能优化。

1. 协程间通信

使用 asyncio.Queue 实现生产者-消费者模式。put() 和 get() 方法均为异步操作,确保队列操作不阻塞事件循环。

#juzicode.com/VX公众号:juzicode
import asyncio

async def producer(q):
    for i in range(3):
        await q.put(i)
        await asyncio.sleep(0.5)

async def consumer(q):
    while True:
        item = await q.get()
        print(f"Got {item}")
        q.task_done()

async def main():
    q = asyncio.Queue()
    await asyncio.gather(producer(q), consumer(q))

asyncio.run(main())

运行结果:

Got 0
Got 1
Got 2

2. 同步原语

使用 Lock 确保共享资源的线程安全。async with 语句自动获取和释放锁,避免协程间的竞争条件。

#juzicode.com/VX公众号:juzicode
import asyncio

lock = asyncio.Lock()

async def safe_update():
    async with lock:
        print("Updating...")
        await asyncio.sleep(1)

async def main():
    await asyncio.gather(safe_update(), safe_update())

asyncio.run(main())

运行结果:

Updating...
Updating...

3. 协程取消

通过 task.cancel() 发送取消请求。协程内捕获 CancelledError 异常可实现资源清理,await task 确保取消操作完成。

#juzicode.com/VX公众号:juzicode
import asyncio

async def cancellable_task():
    try:
        while True:
            print("Running...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task cancelled")

async def main():
    task = asyncio.create_task(cancellable_task())
    await asyncio.sleep(2.5)
    task.cancel()
    await task

asyncio.run(main())

运行结果:

Running...
Running...
Running...
Task cancelled

4. 混合异步/同步

使用 loop.run_in_executor 在异步代码中执行同步函数。将 CPU 密集型任务委托给线程池,避免阻塞事件循环。

#juzicode.com/VX公众号:juzicode
import time
import asyncio

def sync_task():
    time.sleep(1)
    return "Sync Done"

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, sync_task)
    print(result)

asyncio.run(main())

运行结果:

Sync Done

5. 性能监控

通过事件循环的内置计时器测量协程执行时间。get_running_loop().time() 提供高精度时间戳,可用于性能分析。

#juzicode.com/VX公众号:juzicode
import asyncio

async def monitored_task():
    start = asyncio.get_running_loop().time()
    await asyncio.sleep(1)
    print(f"Cost: {asyncio.get_running_loop().time()-start:.2f}s")

asyncio.run(monitored_task())

运行结果:

Cost: 1.00s

总结

asyncio是基于事件的非阻塞 I/O 模型,有清晰的协程间协作机制,并且与异步生态库深度集成,非常适用于需要高吞吐量和低延迟的 I/O 密集型场景,通过合理的异步改造可提升程序性能。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注