你是否需要一个关于定时任务的Python案例

访客 python案例 8

本文目录导读:

  1. 安装所需库
  2. 基础定时任务案例
  3. 使用APScheduler库(更强大的定时任务框架)
  4. 使用Python内置的time模块(简单版本)
  5. 定时任务最佳实践

是的,我可以为你提供一个关于定时任务的Python案例,下面是一个使用schedule库实现简单定时任务的示例:

安装所需库

pip install schedule

基础定时任务案例

import schedule
import time
from datetime import datetime
def job1():
    """任务1:打印当前时间"""
    print(f"执行任务1 - 当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
def job2(name):
    """任务2:带参数的定时任务"""
    print(f"执行任务2 - {name}说: 你好,现在是 {datetime.now().strftime('%H:%M:%S')}")
def job3():
    """任务3:执行需要时间操作的任务"""
    print("任务3开始执行...")
    time.sleep(5)  # 模拟耗时操作
    print("任务3执行完成")
# 设置定时任务
# 每隔5秒执行一次
schedule.every(5).seconds.do(job1)
# 每隔1分钟执行一次
schedule.every(1).minutes.do(job1)
# 每隔2小时执行一次
schedule.every(2).hours.do(job1)
# 每天在特定时间执行
schedule.every().day.at("10:30").do(job1)
# 每周一执行
schedule.every().monday.do(job1)
# 带参数的定时任务
schedule.every(10).seconds.do(job2, name="小明")
# 每隔一段时间执行复杂任务
schedule.every(30).seconds.do(job3)
# 使用装饰器方式
@schedule.repeat(schedule.every(15).seconds)
def job4():
    print(f"装饰器任务 - {datetime.now()}")
if __name__ == "__main__":
    print("定时任务启动...")
    print("按 Ctrl+C 停止运行")
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)  # 每秒检查一次是否有待执行的任务
    except KeyboardInterrupt:
        print("\n定时任务已停止")

使用APScheduler库(更强大的定时任务框架)

# 安装:pip install apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def tick():
    print(f"执行任务 - {datetime.now()}")
def tick_with_param(text):
    print(f"参数任务: {text} - {datetime.now()}")
def interval_job():
    print("每隔5秒执行一次的任务")
if __name__ == "__main__":
    # 创建调度器
    scheduler = BlockingScheduler()
    # 添加定时任务
    # 每隔3秒执行一次
    scheduler.add_job(tick, 'interval', seconds=3)
    # 每隔10秒执行带参数的任务
    scheduler.add_job(tick_with_param, 'interval', seconds=10, args=["定时任务"])
    # 在特定时间执行(每天下午3点)
    scheduler.add_job(tick, 'cron', hour=15, minute=0)
    # 每周一执行
    scheduler.add_job(tick, 'cron', day_of_week='mon', hour=0, minute=0)
    print(f"任务调度器启动 - {datetime.now()}")
    try:
        scheduler.start()
    except KeyboardInterrupt:
        print("调度器停止")
        scheduler.shutdown()

使用Python内置的time模块(简单版本)

import time
from datetime import datetime
def simple_task():
    """简单的定时任务"""
    print(f"执行任务 - {datetime.now()}")
def run_periodically(func, interval_seconds):
    """周期性执行函数"""
    start_time = time.time()
    next_run = start_time
    try:
        while True:
            current_time = time.time()
            if current_time >= next_run:
                func()
                next_run += interval_seconds
            time.sleep(0.1)  # 避免CPU占用过高
    except KeyboardInterrupt:
        print("任务已停止")
# 设置每5秒执行一次
interval = 5  # 秒
run_periodically(simple_task, interval)

定时任务最佳实践

  1. 错误处理:添加try-except防止任务异常导致程序崩溃
  2. 日志记录:记录任务执行情况
  3. 线程安全:如果多个任务同时执行,考虑使用线程池
  4. 任务持久化:如果需要持久化任务,使用APScheduler的jobstores
# 带日志和错误处理的完整示例
import schedule
import time
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
def safe_task():
    try:
        logging.info("开始执行任务")
        # 你的任务逻辑
        result = 10 / 2  # 示例计算
        logging.info(f"任务执行成功,结果: {result}")
    except Exception as e:
        logging.error(f"任务执行失败: {e}")
schedule.every(10).seconds.do(safe_task)
while True:
    try:
        schedule.run_pending()
        time.sleep(1)
    except Exception as e:
        logging.error(f"调度器异常: {e}")
        time.sleep(10)  # 避免频繁报错

根据你的需求选择合适的方式:

  • 简单任务:使用schedule
  • 复杂调度:使用APScheduler
  • 极简单:使用time.sleep

标签: 定时任务 Python案例

抱歉,评论功能暂时关闭!