本文目录导读:
- 使用
signal模块(仅Linux/Mac) - 使用
func_timeout库(推荐) - 使用
concurrent.futures(线程池实现) - 使用
asyncio(异步编程) - 装饰器实现(实用方案)
- 实际应用场景:API请求超时处理
- 批量任务超时处理
- 总结对比
我来分享几个Python超时处理的实战案例,从简单到复杂逐步深入。
使用signal模块(仅Linux/Mac)
import signal
import time
class TimeoutError(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutError("函数执行超时")
def long_running_function():
"""模拟耗时操作"""
time.sleep(5)
return "任务完成"
# 设置超时
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(3) # 3秒超时
try:
result = long_running_function()
print(result)
except TimeoutError:
print("任务超时了!")
finally:
signal.alarm(0) # 取消闹钟
使用func_timeout库(推荐)
首先安装:
pip install func_timeout
from func_timeout import func_timeout, FunctionTimedOut
def long_task():
import time
time.sleep(10)
return "结果"
try:
result = func_timeout(3, long_task) # 3秒超时
print(f"成功: {result}")
except FunctionTimedOut:
print("函数执行超时")
except Exception as e:
print(f"其他错误: {e}")
使用concurrent.futures(线程池实现)
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
import time
def fetch_data(url, delay=2):
"""模拟网络请求"""
time.sleep(delay)
return f"从 {url} 获取的数据"
def timeout_executor(func, args=(), kwargs={}, timeout=5):
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args, **kwargs)
try:
result = future.result(timeout=timeout)
return result
except FutureTimeoutError:
future.cancel()
raise TimeoutError(f"函数执行超过{timeout}秒")
except Exception as e:
raise e
# 测试用例
try:
# 正常执行
result = timeout_executor(fetch_data, ("https://api.example.com",), {"delay": 2}, timeout=5)
print(result)
# 超时执行
result = timeout_executor(fetch_data, ("https://api.example.com",), {"delay": 8}, timeout=3)
print(result)
except TimeoutError as e:
print(f"超时错误: {e}")
使用asyncio(异步编程)
import asyncio
import time
async def slow_operation(seconds):
await asyncio.sleep(seconds)
return f"等待了{seconds}秒"
async def timeout_handler(coro, timeout):
try:
result = await asyncio.wait_for(coro, timeout=timeout)
return result
except asyncio.TimeoutError:
return "超时"
except Exception as e:
return f"错误: {e}"
async def main():
# 正常情况
result = await timeout_handler(slow_operation(2), timeout=5)
print(result) # 等待了2秒
# 超时情况
result = await timeout_handler(slow_operation(8), timeout=3)
print(result) # 超时
# 运行
asyncio.run(main())
装饰器实现(实用方案)
import functools
import threading
import time
def timeout_decorator(timeout):
"""超时装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
result = [TimeoutError("函数超时")]
def target():
try:
result[0] = func(*args, **kwargs)
except Exception as e:
result[0] = e
thread = threading.Thread(target=target)
thread.daemon = True
thread.start()
thread.join(timeout)
if thread.is_alive():
return result[0] # 返回超时错误
if isinstance(result[0], Exception):
raise result[0]
return result[0]
return wrapper
return decorator
# 使用示例
@timeout_decorator(3)
def slow_function(seconds):
time.sleep(seconds)
return f"耗时{seconds}秒"
@timeout_decorator(2)
def network_request():
time.sleep(5) # 模拟慢请求
return "数据"
# 测试
try:
print(slow_function(1)) # 正常
print(slow_function(5)) # 超时
except Exception as e:
print(f"异常: {e}")
实际应用场景:API请求超时处理
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
class APIClient:
def __init__(self, base_url, timeout=5, max_retries=3):
self.base_url = base_url
self.timeout = timeout
self.session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def request_with_timeout(self, method, endpoint, **kwargs):
url = f"{self.base_url}/{endpoint}"
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(
self.session.request,
method, url,
timeout=self.timeout,
**kwargs
)
try:
response = future.result(timeout=self.timeout + 2)
response.raise_for_status()
return response.json()
except TimeoutError:
return {"error": "请求超时", "url": url}
except requests.exceptions.RequestException as e:
return {"error": str(e), "url": url}
def get(self, endpoint, **kwargs):
return self.request_with_timeout("GET", endpoint, **kwargs)
def post(self, endpoint, data=None, **kwargs):
return self.request_with_timeout("POST", endpoint, json=data, **kwargs)
# 使用示例
client = APIClient("https://jsonplaceholder.typicode.com", timeout=3)
# 正常请求
result = client.get("posts/1")
print(result)
# 超时请求(模拟)
class SlowAPIClient(APIClient):
def request_with_timeout(self, method, endpoint, **kwargs):
time.sleep(6) # 模拟慢响应
return super().request_with_timeout(method, endpoint, **kwargs)
slow_client = SlowAPIClient("https://jsonplaceholder.typicode.com", timeout=3)
result = slow_client.get("posts/1")
print(result) # 返回超时错误
批量任务超时处理
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def batch_task_processor(tasks, timeout_per_task=5, max_workers=3):
"""
批量处理任务,每个任务单独超时控制
Args:
tasks: 任务列表,每个元素是 (func, args, kwargs)
timeout_per_task: 每个任务的超时时间
max_workers: 最大并发数
"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_task = {
executor.submit(func, *args, **kwargs): (i, func)
for i, (func, args, kwargs) in enumerate(tasks)
}
for future in as_completed(future_to_task, timeout=len(tasks) * timeout_per_task):
task_id, func = future_to_task[future]
try:
result = future.result(timeout=timeout_per_task)
results.append({
"id": task_id,
"status": "success",
"result": result
})
except TimeoutError:
future.cancel()
results.append({
"id": task_id,
"status": "timeout",
"result": None
})
except Exception as e:
results.append({
"id": task_id,
"status": "error",
"result": str(e)
})
return results
# 测试
def task_a(x):
time.sleep(1)
return x * 2
def task_b(x):
time.sleep(6) # 这个会超时
return x * 3
def task_c(x):
time.sleep(2)
return x + 10
tasks = [
(task_a, (5,), {}),
(task_b, (3,), {}),
(task_c, (8,), {})
]
results = batch_task_processor(tasks, timeout_per_task=4)
for r in results:
print(f"ID {r['id']}: {r['status']} -> {r['result']}")
总结对比
| 方法 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
signal |
Linux/Mac系统 | 原生支持,轻量 | 不跨平台,线程中不可用 |
func_timeout |
通用 | 简单易用 | 需要安装第三方库 |
ThreadPoolExecutor |
线程安全 | 灵活可控 | 资源开销较大 |
asyncio |
异步编程 | 高效,非阻塞 | 需要理解异步编程 |
| 装饰器 | 通用 | 复用性好 | 实现稍复杂 |
推荐在实际项目中使用func_timeout(简单场景)或ThreadPoolExecutor(复杂场景),避免跨平台兼容性问题。
标签: 实操案例