异步IO怎么用?

访客 性能优化 5

本文目录导读:

  1. 核心概念:协程、事件循环、Future/Task
  2. 如何使用(从简单到复杂)
  3. 使用场景与注意事项
  4. 异步IO在Python中的使用方式

异步IO(Asynchronous I/O)是一种非阻塞的编程模型,允许程序在发起IO操作后立即返回,无需等待操作完成,这能显著提高程序处理大量并发IO请求的效率。

在Python中,异步IO主要通过 asyncio 库实现,下面我会从基础到实际使用场景,详细讲解如何用。


核心概念:协程、事件循环、Future/Task

  1. 协程 (Coroutine):用 async def 定义的函数称为协程函数,调用它不会立即执行,而是返回一个协程对象,协程是异步编程的基本单元。
  2. 事件循环 (Event Loop):这是异步IO的核心调度器,它负责监听和分配事件,并在协程需要等待IO时挂起它,在IO就绪时恢复它。
  3. Future 与 Task
    • Future 代表一个尚未完成的操作结果。
    • TaskFuture 的一种,专门用来调度协程的执行,通常通过 asyncio.create_task() 创建。

如何使用(从简单到复杂)

定义和运行一个协程

import asyncio
import time
# 定义一个协程函数
async def say_hello():
    print("Hello...")
    # 模拟异步IO操作(比如网络请求、文件读取)
    await asyncio.sleep(1)  
    print("...World!")
# 运行协程
async def main():
    # 创建并运行一个协程
    await say_hello()
# Python 3.7+ 的标准入口
asyncio.run(main())
  • asyncio.sleep(1) 模拟了一个耗时的IO操作。await 关键字表示在这个地方让出控制权给事件循环,等待1秒后再继续。
  • asyncio.run(main()) 创建一个新的事件循环,运行 main() 协程,并在结束后关闭循环。

同时运行多个协程(并发)

这是异步IO的核心优势,通过 asyncio.create_task() 可以创建多个任务,让它们并发执行。

import asyncio
import time
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} after {delay}s")
async def main():
    print(f"Started at {time.strftime('%X')}")
    # 创建两个任务(不会等待,立即返回)
    task1 = asyncio.create_task(say_after(2, 'Task 1'))
    task2 = asyncio.create_task(say_after(1, 'Task 2'))
    # 等待两个任务都完成
    await task1
    await task2
    print(f"Finished at {time.strftime('%X')}")
asyncio.run(main())
# 预期输出:
# Started at 10:00:00
# Task 2 after 1s  (1秒后)
# Task 1 after 2s  (总共2秒后,而非3秒)
# Finished at 10:00:02
  • 关键点task1task2 几乎同时启动,事件循环在 await asyncio.sleep() 处挂起协程,并转而去运行另一个协程,总耗时是最大延迟时间(2秒),而不是两个延迟之和(3秒)。

等待“一组”任务完成

asyncio.gather()asyncio.wait() 可以同时等待多个任务。

import asyncio
async def fetch_data(url):
    # 模拟网络请求
    await asyncio.sleep(1)
    return f"Data from {url}"
async def main():
    urls = ["http://example.com", "http://python.org", "http://github.com"]
    # 创建所有任务
    tasks = [asyncio.create_task(fetch_data(url)) for url in urls]
    # 等待所有任务完成,并收集结果
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)
asyncio.run(main())
# 大概1秒后,同时打印三个结果

真正IO操作:网络请求 (aiohttp)

asyncio 本身不处理HTTP请求,你需要结合专门的异步网络库,最常用的是 aiohttp

import asyncio
import aiohttp
async def fetch(session, url):
    async with session.get(url) as response:
        print(f"Status: {response.status} for {url}")
        # 等待异步读取响应内容
        return await response.text()
async def main():
    async with aiohttp.ClientSession() as session:
        urls = [
            "http://example.com",
            "http://httpbin.org/delay/1",  # 会延迟1秒的API
            "http://httpbin.org/delay/2",
        ]
        tasks = [fetch(session, url) for url in urls]
        # 并发执行所有请求
        pages = await asyncio.gather(*tasks)
        print(f"Fetched {len(pages)} pages")
asyncio.run(main())
# 总耗时约为最慢的那个请求的延迟(2秒),而不是 0 + 1 + 2 = 3秒

文件操作 (aiofiles)

标准的文件IO (open) 是阻塞的,可以用 aiofiles 来异步读写文件。

import asyncio
import aiofiles
async def read_file(path):
    async with aiofiles.open(path, mode='r') as f:
        contents = await f.read()
    return contents
async def main():
    # 并发读取多个文件
    tasks = [read_file("file1.txt"), read_file("file2.txt")]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(len(result))  # 打印文件长度
asyncio.run(main())

使用场景与注意事项

最佳使用场景

  • 大量网络IO:爬虫、微服务API网关、Web服务器(如FastAPI、Sanic)、聊天应用。
  • 数据库操作:使用异步数据库驱动(如 asyncpg for PostgreSQL, aiomysql)。
  • 高并发但计算量小的任务:如消息路由、日志处理。
  • 文件操作:特别是需要同时处理多个文件读写的场景。

不要这样用(走弯路)

# ❌ 错误:在协程内部直接调用同步阻塞函数
async def bad_example():
    time.sleep(2)  # 这会阻塞整个事件循环,导致所有协程停止
    print("Done")
# ❌ 错误:用 asyncio.create_task 来执行 CPU 密集型任务
async def cpu_intensive():
    result = sum(i * i for i in range(10**7))  # 这也会阻塞事件循环
    return result
  • 误区time.sleep() 会阻塞线程,在异步代码中,要使用 await asyncio.sleep()
  • 误区:异步IO适合IO密集型任务,不适合CPU密集型任务,如果需要进行大量计算,应该使用多进程 (multiprocessing)。

异步IO在Python中的使用方式

  1. 定义协程async def my_coro():
  2. 模拟IOawait asyncio.sleep(1)
  3. 并发执行task = asyncio.create_task(my_coro())
  4. 等待完成await taskresults = await asyncio.gather(*tasks)
  5. 实际IO:结合 aiohttpaiofilesasyncpg 等异步库。
  6. 启动asyncio.run(main())

记住核心思想:遇到IO等待时,await 会让出控制权,让事件循环去处理其他就绪的任务,这就是异步IO提高效率的关键。

标签: 事件循环

抱歉,评论功能暂时关闭!