跳到主要内容

asyncio 协程

问题

asyncio 的工作原理是什么?如何使用 async/await?事件循环是怎么回事?

答案

协程基础

import asyncio

async def fetch_data(url: str) -> str:
"""async 函数返回协程对象"""
print(f"开始请求 {url}")
await asyncio.sleep(1) # 模拟 IO 操作,释放控制权
print(f"完成请求 {url}")
return f"数据来自 {url}"

async def main():
# 串行执行
result1 = await fetch_data("url1")
result2 = await fetch_data("url2")
# 总耗时 ~2s

# 并发执行
result1, result2 = await asyncio.gather(
fetch_data("url1"),
fetch_data("url2"),
)
# 总耗时 ~1s

asyncio.run(main())

事件循环

事件循环是 asyncio 的核心,它:

  1. 维护一个任务队列
  2. 执行就绪的协程直到遇到 await
  3. 在 IO 完成时唤醒挂起的协程
  4. 不断循环直到所有任务完成

Task 与并发

import asyncio

async def task(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} done"

async def main():
# 创建 Task 实现并发
task1 = asyncio.create_task(task("A", 2))
task2 = asyncio.create_task(task("B", 1))

# 两个任务并发执行
result1 = await task1
result2 = await task2

# gather:收集多个协程的结果
results = await asyncio.gather(
task("C", 1),
task("D", 2),
task("E", 1),
)

# TaskGroup(3.11+,推荐):自动取消
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(task("F", 1))
t2 = tg.create_task(task("G", 2))
# 自动等待所有任务完成,任一失败则取消其余

asyncio.run(main())

超时与取消

import asyncio

async def slow_operation():
await asyncio.sleep(10)

async def main():
# 超时控制
try:
async with asyncio.timeout(3): # 3.11+
await slow_operation()
except TimeoutError:
print("超时了!")

# 取消任务
task = asyncio.create_task(slow_operation())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务被取消")

异步迭代与异步生成器

import asyncio

async def async_range(n: int):
"""异步生成器"""
for i in range(n):
await asyncio.sleep(0.1)
yield i

async def main():
# 异步 for 循环
async for i in async_range(5):
print(i)

# 异步推导式
values = [i async for i in async_range(5)]

常见面试问题

Q1: await 的作用是什么?

答案

await 做两件事:

  1. 暂停当前协程的执行,将控制权交还事件循环
  2. 等待被 await 的协程/Future 完成,然后恢复执行

只有在 async def 函数内才能使用 await,只能 await 可等待对象(协程、Task、Future)。

Q2: asyncio.gatherasyncio.TaskGroup 的区别?

答案

特性gatherTaskGroup(3.11+)
错误处理return_exceptions=True 收集任一失败自动取消其余
结构函数调用上下文管理器
取消行为需手动处理自动取消和清理
推荐度兼容旧版本3.11+ 推荐

Q3: 如何在同步代码中调用异步函数?

答案

import asyncio

async def async_func():
return "hello"

# 方式 1:asyncio.run()(顶层入口)
result = asyncio.run(async_func())

# 方式 2:在已有事件循环中
loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_func())

Q4: asyncio 中如何限制并发数?

答案

import asyncio

async def fetch(url: str, sem: asyncio.Semaphore):
async with sem: # 信号量限制并发
await asyncio.sleep(1)
return f"done: {url}"

async def main():
sem = asyncio.Semaphore(5) # 最多 5 个并发
tasks = [fetch(f"url_{i}", sem) for i in range(20)]
results = await asyncio.gather(*tasks)

相关链接