本文目录导读:
异步IO(Asynchronous I/O)是一种非阻塞的编程模型,允许程序在发起IO操作后立即返回,无需等待操作完成,这能显著提高程序处理大量并发IO请求的效率。
在Python中,异步IO主要通过 asyncio 库实现,下面我会从基础到实际使用场景,详细讲解如何用。
核心概念:协程、事件循环、Future/Task
- 协程 (Coroutine):用
async def定义的函数称为协程函数,调用它不会立即执行,而是返回一个协程对象,协程是异步编程的基本单元。 - 事件循环 (Event Loop):这是异步IO的核心调度器,它负责监听和分配事件,并在协程需要等待IO时挂起它,在IO就绪时恢复它。
- Future 与 Task:
Future代表一个尚未完成的操作结果。Task是Future的一种,专门用来调度协程的执行,通常通过asyncio.create_task()创建。
如何使用(从简单到复杂)
定义和运行一个协程
import asyncio
import time
# 定义一个协程函数
async def say_hello():
print("Hello...")
# 模拟异步IO操作(比如网络请求、文件读取)
await asyncio.sleep(1)
print("...World!")
# 运行协程
async def main():
# 创建并运行一个协程
await say_hello()
# Python 3.7+ 的标准入口
asyncio.run(main())
asyncio.sleep(1)模拟了一个耗时的IO操作。await关键字表示在这个地方让出控制权给事件循环,等待1秒后再继续。asyncio.run(main())创建一个新的事件循环,运行main()协程,并在结束后关闭循环。
同时运行多个协程(并发)
这是异步IO的核心优势,通过 asyncio.create_task() 可以创建多个任务,让它们并发执行。
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(f"{what} after {delay}s")
async def main():
print(f"Started at {time.strftime('%X')}")
# 创建两个任务(不会等待,立即返回)
task1 = asyncio.create_task(say_after(2, 'Task 1'))
task2 = asyncio.create_task(say_after(1, 'Task 2'))
# 等待两个任务都完成
await task1
await task2
print(f"Finished at {time.strftime('%X')}")
asyncio.run(main())
# 预期输出:
# Started at 10:00:00
# Task 2 after 1s (1秒后)
# Task 1 after 2s (总共2秒后,而非3秒)
# Finished at 10:00:02
- 关键点:
task1和task2几乎同时启动,事件循环在await asyncio.sleep()处挂起协程,并转而去运行另一个协程,总耗时是最大延迟时间(2秒),而不是两个延迟之和(3秒)。
等待“一组”任务完成
asyncio.gather() 或 asyncio.wait() 可以同时等待多个任务。
import asyncio
async def fetch_data(url):
# 模拟网络请求
await asyncio.sleep(1)
return f"Data from {url}"
async def main():
urls = ["http://example.com", "http://python.org", "http://github.com"]
# 创建所有任务
tasks = [asyncio.create_task(fetch_data(url)) for url in urls]
# 等待所有任务完成,并收集结果
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())
# 大概1秒后,同时打印三个结果
真正IO操作:网络请求 (aiohttp)
asyncio 本身不处理HTTP请求,你需要结合专门的异步网络库,最常用的是 aiohttp。
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
print(f"Status: {response.status} for {url}")
# 等待异步读取响应内容
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
urls = [
"http://example.com",
"http://httpbin.org/delay/1", # 会延迟1秒的API
"http://httpbin.org/delay/2",
]
tasks = [fetch(session, url) for url in urls]
# 并发执行所有请求
pages = await asyncio.gather(*tasks)
print(f"Fetched {len(pages)} pages")
asyncio.run(main())
# 总耗时约为最慢的那个请求的延迟(2秒),而不是 0 + 1 + 2 = 3秒
文件操作 (aiofiles)
标准的文件IO (open) 是阻塞的,可以用 aiofiles 来异步读写文件。
import asyncio
import aiofiles
async def read_file(path):
async with aiofiles.open(path, mode='r') as f:
contents = await f.read()
return contents
async def main():
# 并发读取多个文件
tasks = [read_file("file1.txt"), read_file("file2.txt")]
results = await asyncio.gather(*tasks)
for result in results:
print(len(result)) # 打印文件长度
asyncio.run(main())
使用场景与注意事项
最佳使用场景
- 大量网络IO:爬虫、微服务API网关、Web服务器(如FastAPI、Sanic)、聊天应用。
- 数据库操作:使用异步数据库驱动(如
asyncpgfor PostgreSQL,aiomysql)。 - 高并发但计算量小的任务:如消息路由、日志处理。
- 文件操作:特别是需要同时处理多个文件读写的场景。
不要这样用(走弯路)
# ❌ 错误:在协程内部直接调用同步阻塞函数
async def bad_example():
time.sleep(2) # 这会阻塞整个事件循环,导致所有协程停止
print("Done")
# ❌ 错误:用 asyncio.create_task 来执行 CPU 密集型任务
async def cpu_intensive():
result = sum(i * i for i in range(10**7)) # 这也会阻塞事件循环
return result
- 误区:
time.sleep()会阻塞线程,在异步代码中,要使用await asyncio.sleep()。 - 误区:异步IO适合IO密集型任务,不适合CPU密集型任务,如果需要进行大量计算,应该使用多进程 (
multiprocessing)。
异步IO在Python中的使用方式
- 定义协程:
async def my_coro(): - 模拟IO:
await asyncio.sleep(1) - 并发执行:
task = asyncio.create_task(my_coro()) - 等待完成:
await task、results = await asyncio.gather(*tasks) - 实际IO:结合
aiohttp、aiofiles、asyncpg等异步库。 - 启动:
asyncio.run(main())
记住核心思想:遇到IO等待时,await 会让出控制权,让事件循环去处理其他就绪的任务,这就是异步IO提高效率的关键。
标签: 事件循环