Python超时处理案例实操?

wen python案例 1

本文目录导读:

  1. 使用signal模块(仅Linux/Mac)
  2. 使用func_timeout库(推荐)
  3. 使用concurrent.futures(线程池实现)
  4. 使用asyncio(异步编程)
  5. 装饰器实现(实用方案)
  6. 实际应用场景:API请求超时处理
  7. 批量任务超时处理
  8. 总结对比

我来分享几个Python超时处理的实战案例,从简单到复杂逐步深入。

使用signal模块(仅Linux/Mac)

import signal
import time
class TimeoutError(Exception):
    pass
def timeout_handler(signum, frame):
    raise TimeoutError("函数执行超时")
def long_running_function():
    """模拟耗时操作"""
    time.sleep(5)
    return "任务完成"
# 设置超时
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(3)  # 3秒超时
try:
    result = long_running_function()
    print(result)
except TimeoutError:
    print("任务超时了!")
finally:
    signal.alarm(0)  # 取消闹钟

使用func_timeout库(推荐)

首先安装:

pip install func_timeout
from func_timeout import func_timeout, FunctionTimedOut
def long_task():
    import time
    time.sleep(10)
    return "结果"
try:
    result = func_timeout(3, long_task)  # 3秒超时
    print(f"成功: {result}")
except FunctionTimedOut:
    print("函数执行超时")
except Exception as e:
    print(f"其他错误: {e}")

使用concurrent.futures(线程池实现)

from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
import time
def fetch_data(url, delay=2):
    """模拟网络请求"""
    time.sleep(delay)
    return f"从 {url} 获取的数据"
def timeout_executor(func, args=(), kwargs={}, timeout=5):
    with ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(func, *args, **kwargs)
        try:
            result = future.result(timeout=timeout)
            return result
        except FutureTimeoutError:
            future.cancel()
            raise TimeoutError(f"函数执行超过{timeout}秒")
        except Exception as e:
            raise e
# 测试用例
try:
    # 正常执行
    result = timeout_executor(fetch_data, ("https://api.example.com",), {"delay": 2}, timeout=5)
    print(result)
    # 超时执行
    result = timeout_executor(fetch_data, ("https://api.example.com",), {"delay": 8}, timeout=3)
    print(result)
except TimeoutError as e:
    print(f"超时错误: {e}")

使用asyncio(异步编程)

import asyncio
import time
async def slow_operation(seconds):
    await asyncio.sleep(seconds)
    return f"等待了{seconds}秒"
async def timeout_handler(coro, timeout):
    try:
        result = await asyncio.wait_for(coro, timeout=timeout)
        return result
    except asyncio.TimeoutError:
        return "超时"
    except Exception as e:
        return f"错误: {e}"
async def main():
    # 正常情况
    result = await timeout_handler(slow_operation(2), timeout=5)
    print(result)  # 等待了2秒
    # 超时情况
    result = await timeout_handler(slow_operation(8), timeout=3)
    print(result)  # 超时
# 运行
asyncio.run(main())

装饰器实现(实用方案)

import functools
import threading
import time
def timeout_decorator(timeout):
    """超时装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            result = [TimeoutError("函数超时")]
            def target():
                try:
                    result[0] = func(*args, **kwargs)
                except Exception as e:
                    result[0] = e
            thread = threading.Thread(target=target)
            thread.daemon = True
            thread.start()
            thread.join(timeout)
            if thread.is_alive():
                return result[0]  # 返回超时错误
            if isinstance(result[0], Exception):
                raise result[0]
            return result[0]
        return wrapper
    return decorator
# 使用示例
@timeout_decorator(3)
def slow_function(seconds):
    time.sleep(seconds)
    return f"耗时{seconds}秒"
@timeout_decorator(2)
def network_request():
    time.sleep(5)  # 模拟慢请求
    return "数据"
# 测试
try:
    print(slow_function(1))  # 正常
    print(slow_function(5))  # 超时
except Exception as e:
    print(f"异常: {e}")

实际应用场景:API请求超时处理

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
class APIClient:
    def __init__(self, base_url, timeout=5, max_retries=3):
        self.base_url = base_url
        self.timeout = timeout
        self.session = requests.Session()
        # 配置重试策略
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    def request_with_timeout(self, method, endpoint, **kwargs):
        url = f"{self.base_url}/{endpoint}"
        with ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(
                self.session.request,
                method, url,
                timeout=self.timeout,
                **kwargs
            )
            try:
                response = future.result(timeout=self.timeout + 2)
                response.raise_for_status()
                return response.json()
            except TimeoutError:
                return {"error": "请求超时", "url": url}
            except requests.exceptions.RequestException as e:
                return {"error": str(e), "url": url}
    def get(self, endpoint, **kwargs):
        return self.request_with_timeout("GET", endpoint, **kwargs)
    def post(self, endpoint, data=None, **kwargs):
        return self.request_with_timeout("POST", endpoint, json=data, **kwargs)
# 使用示例
client = APIClient("https://jsonplaceholder.typicode.com", timeout=3)
# 正常请求
result = client.get("posts/1")
print(result)
# 超时请求(模拟)
class SlowAPIClient(APIClient):
    def request_with_timeout(self, method, endpoint, **kwargs):
        time.sleep(6)  # 模拟慢响应
        return super().request_with_timeout(method, endpoint, **kwargs)
slow_client = SlowAPIClient("https://jsonplaceholder.typicode.com", timeout=3)
result = slow_client.get("posts/1")
print(result)  # 返回超时错误

批量任务超时处理

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def batch_task_processor(tasks, timeout_per_task=5, max_workers=3):
    """
    批量处理任务,每个任务单独超时控制
    Args:
        tasks: 任务列表,每个元素是 (func, args, kwargs)
        timeout_per_task: 每个任务的超时时间
        max_workers: 最大并发数
    """
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_task = {
            executor.submit(func, *args, **kwargs): (i, func)
            for i, (func, args, kwargs) in enumerate(tasks)
        }
        for future in as_completed(future_to_task, timeout=len(tasks) * timeout_per_task):
            task_id, func = future_to_task[future]
            try:
                result = future.result(timeout=timeout_per_task)
                results.append({
                    "id": task_id,
                    "status": "success",
                    "result": result
                })
            except TimeoutError:
                future.cancel()
                results.append({
                    "id": task_id,
                    "status": "timeout",
                    "result": None
                })
            except Exception as e:
                results.append({
                    "id": task_id,
                    "status": "error",
                    "result": str(e)
                })
    return results
# 测试
def task_a(x):
    time.sleep(1)
    return x * 2
def task_b(x):
    time.sleep(6)  # 这个会超时
    return x * 3
def task_c(x):
    time.sleep(2)
    return x + 10
tasks = [
    (task_a, (5,), {}),
    (task_b, (3,), {}),
    (task_c, (8,), {})
]
results = batch_task_processor(tasks, timeout_per_task=4)
for r in results:
    print(f"ID {r['id']}: {r['status']} -> {r['result']}")

总结对比

方法 适用场景 优点 缺点
signal Linux/Mac系统 原生支持,轻量 不跨平台,线程中不可用
func_timeout 通用 简单易用 需要安装第三方库
ThreadPoolExecutor 线程安全 灵活可控 资源开销较大
asyncio 异步编程 高效,非阻塞 需要理解异步编程
装饰器 通用 复用性好 实现稍复杂

推荐在实际项目中使用func_timeout(简单场景)或ThreadPoolExecutor(复杂场景),避免跨平台兼容性问题。

标签: 实操案例

抱歉,评论功能暂时关闭!