本文目录导读:
Django 异步开发主要通过以下几个层次实现,我来给你一个从基础到进阶的完整指南:
Django 异步支持的核心
异步视图(Async Views)
# views.py
import asyncio
from django.http import JsonResponse
from django.views import View
# 方式1:函数式异步视图
async def async_index(request):
await asyncio.sleep(1)
return JsonResponse({"message": "异步视图"})
# 方式2:类视图异步
class AsyncIndexView(View):
async def get(self, request):
# 异步调用其他服务
result = await some_async_task()
return JsonResponse({"data": result})
异步 ORM 查询
# 需要 Django 4.1+ 使用 acreate / aget 等方法
async def user_view(request):
from myapp.models import User
# 异步创建
user = await User.objects.acreate(username="test", email="test@example.com")
# 异步获取
user = await User.objects.aget(id=1)
# 异步过滤
users = [u async for u in User.objects.filter(is_active=True)]
return JsonResponse({"count": len(users)})
异步环境配置
ASGI 服务器设置
# asgi.py
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
application = get_asgi_application()
运行命令
# 安装 ASGI 服务器 pip install uvicorn # 启动异步服务 uvicorn myproject.asgi:application --host 0.0.0.0 --port 8000 # 或者使用 Daphne pip install daphne daphne myproject.asgi:application -b 0.0.0.0 -p 8000
异步任务处理
使用 asyncio 工具
# utils.py
import asyncio
from asgiref.sync import sync_to_async
# 将同步代码转为异步
@sync_to_async
def get_user_sync(user_id):
from myapp.models import User
return User.objects.get(id=user_id)
# 使用示例
async def async_view(request):
user = await get_user_sync(1)
return JsonResponse({"username": user.username})
Celery + 异步结合
# tasks.py
from celery import shared_task
import asyncio
@shared_task
def async_worker_task(data):
# 在 Celery worker 中执行异步操作
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
some_async_function(data)
)
return result
finally:
loop.close()
常见异步场景实现
异步 HTTP 请求
import aiohttp
from django.http import JsonResponse
async def fetch_external_api(request):
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
data = await response.json()
return JsonResponse(data)
异步文件处理
async def handle_upload(request):
if request.method == 'POST':
file = request.FILES['file']
# 异步读取文件内容
content = await file.read()
# 异步处理
result = await process_file_async(content)
return JsonResponse({"status": "success"})
WebSocket 支持(Django Channels)
# consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
import json
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
await self.channel_layer.group_add(
self.room_name,
self.channel_name
)
await self.accept()
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
await self.channel_layer.group_send(
self.room_name,
{
'type': 'chat_message',
'message': message
}
)
性能优化技巧
连接池管理
# database.py
from django.db import connections
import asyncio
class AsyncDatabasePool:
def __init__(self):
self.pool = None
async def get_connection(self):
# 管理数据库连接池
if not self.pool:
self.pool = await self.create_pool()
return await self.pool.acquire()
信号量控制并发
from asyncio import Semaphore
semaphore = Semaphore(10) # 限制并发10个
async def limited_api_call(data):
async with semaphore:
return await external_api_call(data)
注意事项
兼容性检查
# settings.py
# 确保使用兼容的应用
INSTALLED_APPS = [
'daphne', # ASGI 服务器
'channels', # WebSocket
'django.contrib.admin',
# ...
]
# 异步兼容模式
ASGI_APPLICATION = 'myproject.asgi.application'
同步到异步切换
# 使用 sync_to_async 安全转换
from asgiref.sync import sync_to_async
# 同步转异步(线程池执行)
async def safe_sync_call():
result = await sync_to_async(
some_sync_function
)()
return result
日志配置
import logging
import asyncio
logger = logging.getLogger(__name__)
async def async_view_with_logging(request):
logger.info("异步视图开始")
result = await some_async_task()
logger.info("异步任务完成")
return JsonResponse({"result": result})
实际项目建议
- 渐进式迁移:先对 I/O 密集型的视图改为异步
- 保持一致性:同一请求链路尽量全异步或全同步
- 监控性能:使用 opentelemetry 等工具监控异步性能
- 测试覆盖:
pytest-asyncio为异步代码编写测试
# test_async.py
import pytest
from django.test import AsyncClient
@pytest.mark.asyncio
async def test_async_view():
client = AsyncClient()
response = await client.get('/async/')
assert response.status_code == 200
assert response.json()['status'] == 'success'
如果你有具体的异步场景想要实现,可以告诉我,我可以提供更针对性的代码示例。