协程进阶
问题
如何使用 aiohttp 进行异步 HTTP 请求?异步上下文管理器和异步生成器怎么用?
答案
aiohttp 异步 HTTP
import aiohttp
import asyncio
async def fetch_all(urls: list[str]) -> list[str]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def fetch_one(session: aiohttp.ClientSession, url: str) -> str:
async with session.get(url) as response:
return await response.text()
# 比 requests 同步请求快 10x+
results = asyncio.run(fetch_all(["https://example.com"] * 10))
异步上下文管理器
import asyncio
class AsyncDBConnection:
async def __aenter__(self):
self.conn = await self._connect()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.conn.close()
async def _connect(self):
await asyncio.sleep(0.1) # 模拟连接
return self
# 使用 contextlib 的异步版本
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_transaction(conn):
await conn.execute("BEGIN")
try:
yield conn
await conn.execute("COMMIT")
except Exception:
await conn.execute("ROLLBACK")
raise
异步生成器与异步推导
import asyncio
async def stream_data(source: str):
"""异步生成器:模拟流式数据"""
for i in range(10):
await asyncio.sleep(0.1)
yield {"source": source, "value": i}
async def main():
# 异步 for
async for item in stream_data("sensor"):
print(item)
# 异步推导式
values = [item async for item in stream_data("sensor") if item["value"] > 5]
sums = {item["source"]: item["value"] async for item in stream_data("sensor")}
混合同步和异步
import asyncio
from concurrent.futures import ThreadPoolExecutor
# 在异步中运行同步阻塞函数
async def main():
loop = asyncio.get_event_loop()
# 将阻塞调用放到线程池
result = await loop.run_in_executor(
ThreadPoolExecutor(),
blocking_io_function,
arg1, arg2,
)
# 3.9+ 简写
result = await asyncio.to_thread(blocking_io_function, arg1, arg2)
常见面试问题
Q1: 为什么不能在异步函数中调用 time.sleep()?
答案:
time.sleep() 会阻塞整个线程(包括事件循环),导致所有协程都无法执行。必须用 await asyncio.sleep() 替代,它会释放事件循环让其他协程运行。
Q2: asyncio.run() 和 loop.run_until_complete() 的区别?
答案:
asyncio.run():创建新事件循环、运行协程、关闭循环。一站式入口,推荐使用loop.run_until_complete():在现有事件循环上运行。需手动管理循环生命周期