跳到主要内容

asyncio 常见陷阱

问题

使用 asyncio 开发时有哪些常见陷阱?如何避免?

答案

陷阱 1:忘记 await

# ❌ 忘记 await,协程没有执行
async def fetch_data():
return await httpx.get("https://api.example.com/data")

async def main():
result = fetch_data() # 返回的是协程对象,不是结果!
print(type(result)) # <class 'coroutine'>

# ✅ 正确
async def main():
result = await fetch_data()

陷阱 2:在 async 中调用同步阻塞函数

import time
import asyncio

# ❌ 阻塞整个事件循环
async def handler():
time.sleep(5) # 阻塞!其他协程都被卡住
requests.get(url) # 阻塞!

# ✅ 使用 run_in_executor 包装
async def handler():
loop = asyncio.get_event_loop()
# 在线程池中运行阻塞代码
await loop.run_in_executor(None, time.sleep, 5)
result = await loop.run_in_executor(None, requests.get, url)

# ✅ 更好:用异步库替代
async def handler():
await asyncio.sleep(5)
async with httpx.AsyncClient() as client:
result = await client.get(url)

陷阱 3:Task 被垃圾回收

# ❌ Task 没有被引用,可能被 GC 回收
async def main():
asyncio.create_task(background_work()) # 没有保存引用!

# ✅ 保持引用
async def main():
tasks = set()
task = asyncio.create_task(background_work())
tasks.add(task)
task.add_done_callback(tasks.discard)

陷阱 4:异常被吞掉

# ❌ Task 的异常不会自动抛出
async def risky():
raise ValueError("出错了")

async def main():
task = asyncio.create_task(risky())
await asyncio.sleep(1)
# 异常被静默忽略,只有在 await task 时才会抛出

# ✅ 总是 await 或添加回调
async def main():
task = asyncio.create_task(risky())

def handle_exception(t: asyncio.Task):
if t.exception():
logging.error(f"Task failed: {t.exception()}")

task.add_done_callback(handle_exception)

陷阱 5:用列表推导创建协程但不并发

# ❌ 串行执行
async def main():
results = []
for url in urls:
result = await fetch(url) # 一个一个等
results.append(result)

# ✅ 并发执行
async def main():
results = await asyncio.gather(*[fetch(url) for url in urls])

# ✅ 带并发限制
async def main():
sem = asyncio.Semaphore(10)

async def limited_fetch(url):
async with sem:
return await fetch(url)

results = await asyncio.gather(*[limited_fetch(url) for url in urls])

陷阱 6:共享可变状态

# ❌ 异步代码中的竞态条件
counter = 0

async def increment():
global counter
value = counter # 读
await asyncio.sleep(0) # 让出执行权
counter = value + 1 # 写(可能覆盖其他协程的写入)

# ✅ 使用 asyncio.Lock
lock = asyncio.Lock()

async def increment():
global counter
async with lock:
counter += 1

常见面试问题

Q1: asyncio.gather vs asyncio.TaskGroup?

答案

特性gatherTaskGroup(3.11+)
错误处理return_exceptions=True自动取消其他任务
结构化并发
推荐兼容旧版本新代码首选
# TaskGroup(推荐)
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch(url1))
task2 = tg.create_task(fetch(url2))
# 任一失败,其余自动取消

Q2: 如何检测事件循环被阻塞?

答案

# 开启 debug 模式
asyncio.run(main(), debug=True)
# 会打印警告:Executing <Task> took 0.XXX seconds

Q3: 如何优雅关闭异步服务?

答案

import signal

async def shutdown(signal, loop):
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()

loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown(s, loop)))

相关链接