Python磁盘监控案例有哪些?

wen python案例 1

Python磁盘监控案例有哪些?一文掌握8个实战方案与最佳实践

文章目录导读

  1. 为何要用Python做磁盘监控?
  2. 核心监控指标与底层原理
  3. 基于psutil的轻量级监控
  4. 跨平台磁盘IO监控(Windows+Linux)
  5. 实时告警脚本(邮件+短信)
  6. Web版可视化磁盘仪表盘(Flask+ECharts)
  7. 日志分析与历史趋势存储(SQLite+CSV)
  8. 分布式集群磁盘监控(SSH+Paramiko)
  9. 容器环境下的磁盘监控(Docker API)
  10. 综合监控平台搭建(Prometheus+Grafana对接)
  11. 常见问题FAQ
  12. 总结与最佳实践建议

为何要用Python做磁盘监控?

在现代运维与DevOps体系中,磁盘空间不足或IO性能瓶颈是导致服务宕机的主要原因之一,Python凭借其跨平台兼容性(Windows/Linux/macOS)、丰富的第三方库(如psutilshutilos)以及快速开发能力,成为实现磁盘监控的首选语言,根据Stack Overflow 2024调查报告,Python在运维自动化领域的采用率高达68.3%,远超Bash(31.5%)和Go(22.1%)。

问:为什么不直接用系统自带的dfiostat命令?
答:Python方案的优势在于:①与现有监控系统无缝整合 ②支持自定义告警逻辑(如连续三次超过阈值才告警) ③可输出结构化数据(JSON/数据库)供分析工具使用。


核心监控指标与底层原理

在编写脚本前,需要明确磁盘监控的核心维度:

指标类型 具体参数 数据来源(Python库)
空间占用 总容量、已用容量、可用容量、使用率(%) psutil.disk_usage()os.statvfs()
IO性能 读写速率、IOPS、等待时间 psutil.disk_io_counters()
文件系统 inode使用率、挂载点状态 os.stat()psutil.disk_partitions()
健康状态 SMART数据(需额外库pySMART) pySMARTsubprocess调用smartctl

底层原理psutil库通过调用系统底层的/proc/diskstats(Linux)或Win32_PerfFormattedData_PerfDisk(Windows)获取原始计数器数据,再计算时间差得到速率值。


案例一:基于psutil的轻量级监控

场景:单机版快速检查磁盘空间,适合集成到Cron任务或脚本中。

import psutil
def disk_monitor(threshold=80):
    partitions = psutil.disk_partitions()
    for part in partitions:
        usage = psutil.disk_usage(part.mountpoint)
        percent = usage.percent
        if percent > threshold:
            print(f"[WARNING] {part.mountpoint} 使用率 {percent}% 超过阈值 {threshold}%")
        else:
            print(f"[OK] {part.mountpoint}: {percent}%")
    # 详细IO情况
    io_before = psutil.disk_io_counters(perdisk=True)
    time.sleep(1)
    io_after = psutil.disk_io_counters(perdisk=True)
    for disk, counters in io_after.items():
        read_speed = (counters.read_bytes - io_before[disk].read_bytes) / 1024 / 1024
        write_speed = (counters.write_bytes - io_before[disk].write_bytes) / 1024 / 1024
        print(f"{disk}: 读 {read_speed:.2f} MB/s, 写 {write_speed:.2f} MB/s")

核心点:使用perdisk=True可以获取每块磁盘的独立数据,而不是总和。

问:如何避免在容器中误报?
答:如果运行在Docker容器内,disk_partitions()可能只返回容器挂载点,此时应排除overlaytmpfs类型分区:[p for p in partitions if 'overlay' not in p.fstype]


案例二:跨平台磁盘IO监控(Windows+Linux)

场景:跨平台生产环境,统一采集IO数据并输出标准格式。

import psutil, platform
def get_disk_io(interval=1):
    system = platform.system()
    # 初始统计
    prev = psutil.disk_io_counters(perdisk=True)
    time.sleep(interval)
    cur = psutil.disk_io_counters(perdisk=True)
    result = {}
    for disk in cur:
        # 过滤掉虚拟磁盘(如Linux的loop设备)
        if 'loop' in disk: continue
        if 'dm-' in disk: continue
        read_diff = cur[disk].read_bytes - prev[disk].read_bytes
        write_diff = cur[disk].write_bytes - prev[disk].write_bytes
        iops_read = cur[disk].read_count - prev[disk].read_count
        iops_write = cur[disk].write_count - prev[disk].write_count
        result[disk] = {
            'read_mb_s': read_diff / (1024*1024) / interval,
            'write_mb_s': write_diff / (1024*1024) / interval,
            'iops_read': iops_read / interval,
            'iops_write': iops_write / interval,
            'avg_wait_ms': calculate_wait(cur[disk], prev[disk], interval)  # 自定义函数
        }
    return result

跨平台适配:Linux下需解析/proc/diskstats,Windows下使用WMI,但psutil已统一接口。


案例三:实时告警脚本(邮件+短信)

场景:当磁盘使用率>90%或IO延迟>500ms时,通过SMTP邮件+企业微信机器人告警。

import smtplib, requests
from email.mime.text import MIMEText
def send_alert(subject, body, level='warning'):
    # 邮件发送
    msg = MIMEText(body)
    msg['Subject'] = f'[Disk {level}] {subject}'
    with smtplib.SMTP('smtp.xxx.com', 587) as server:
        server.login('user@xxx.com', 'password')
        server.send_message(msg, from_addr='monitor@xxx.com', to_addrs=['admin@xxx.com'])
    # 企业微信机器人(Webhook方式)
    webhook_url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx'
    data = {"msgtype": "text", "text": {"content": f"磁盘告警:{subject}\n{body}"}}
    requests.post(webhook_url, json=data)

优化建议:加入“静默期”机制,防止同一问题重复告警:if last_alert_time and (time.time()-last_alert_time) < 300: return


案例四:Web版可视化磁盘仪表盘(Flask+ECharts)

场景:开发一个轻量级Web页面,展示磁盘实时使用率曲线和IO趋势。

from flask import Flask, render_template, jsonify
import psutil, time
app = Flask(__name__)
@app.route('/api/disk_data')
def disk_data():
    # 采集5组数据点
    data = []
    for _ in range(5):
        usage = psutil.disk_usage('/')
        io = psutil.disk_io_counters()
        data.append({
            'time': int(time.time()*1000),
            'usage': usage.percent,
            'io_read': io.read_bytes/1024/1024,
            'io_write': io.write_bytes/1024/1024
        })
        time.sleep(1)
    return jsonify(data)
@app.route('/')
def index():
    return render_template('dashboard.html')
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

前端使用ECharts折线图展示数据,关键代码示例(dashboard.html片段):

setInterval(async () => {
    const resp = await fetch('/api/disk_data');
    const points = await resp.json();
    myChart.setOption({  // 更新图表数据
        series: [{
            data: points.map(p => p.usage)
        }]
    });
}, 5000);  // 每5秒轮询一次

案例五:日志分析与历史趋势存储(SQLite+CSV)

场景:每10分钟记录磁盘状态到SQLite数据库,用于生成周报。

import sqlite3, csv, datetime
def init_db():
    conn = sqlite3.connect('disk_monitor.db')
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS disk_stats
                (id INTEGER PRIMARY KEY, timestamp TEXT, mount_point TEXT,
                 total_gb REAL, used_gb REAL, percent REAL)''')
    conn.commit()
    return conn
def record_stats(conn):
    for part in psutil.disk_partitions():
        usage = psutil.disk_usage(part.mountpoint)
        c = conn.cursor()
        c.execute("INSERT INTO disk_stats VALUES (?,?,?,?,?,?,?)",
                  (None, datetime.now().isoformat(), part.mountpoint,
                   usage.total/1024**3, usage.used/1024**3, usage.percent))
    conn.commit()
# 导出CSV进行离线分析
def export_csv(conn, filename='disk_report.csv'):
    c = conn.cursor()
    c.execute("SELECT * FROM disk_stats WHERE timestamp > datetime('now', '-7 days')")
    with open(filename, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['id','timestamp','mount','total_gb','used_gb','percent'])
        writer.writerows(c.fetchall())

问:SQLite能否承受频繁写入?
答:10分钟一次写入对小规模环境完全足够,若需更高频率(如秒级),建议改用InfluxDB或TimescaleDB。


案例六:分布式集群磁盘监控(SSH+Paramiko)

场景:通过SSH批量获取10台以上服务器的磁盘信息,使用Paramiko库。

import paramiko
def remote_disk_check(hosts, username, pkey_path='/home/ops/.ssh/id_rsa'):
    script = '''
import psutil, json
result = [{'mount': p.mountpoint, 'percent': psutil.disk_usage(p.mountpoint).percent} for p in psutil.disk_partitions()]
print(json.dumps(result))
'''
    results = {}
    for host in hosts:
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(hostname=host, username=username, key_filename=pkey_path)
        stdin, stdout, stderr = client.exec_command(f"python3 -c '{script}'")
        output = stdout.read().decode()
        results[host] = json.loads(output) if output else []
        client.close()
    return results

注意:使用python3 -c直接执行字符串,避免在远程服务器创建临时文件,生产环境建议用Fabric或Ansible替代原生Paramiko。


案例七:容器环境下的磁盘监控(Docker API)

场景:监控Docker容器内部磁盘使用情况(非宿主机)。

import docker
client = docker.from_env()
for container in client.containers.list():
    stats = container.stats(stream=False)  # 单次统计
    blkio_stats = stats.get('blkio_stats', {})
    if blkio_stats:
        # 读取每个device的IO累计值
        for device in blkio_stats.get('io_service_bytes_recursive', []):
            print(f"Container {container.short_id} - Device {device['op']}: {device['value']} bytes")
    # 注意:容器内磁盘使用需通过exec执行df命令
    exit_code, output = container.exec_run('df -h /data')
    if exit_code == 0:
        print(output.decode())

坑点:容器的统计数据是自容器启动以来的累计值,需要自己计算速率差。


案例八:综合监控平台搭建(Prometheus+Grafana对接)

场景:将Python采集的数据暴露为Prometheus Metrics,用Grafana展示。

from prometheus_client import start_http_server, Gauge
import psutil, time
# 定义指标
disk_usage = Gauge('disk_usage_percent', 'Disk usage %', ['mountpoint'])
disk_read_speed = Gauge('disk_read_bytes_per_sec', 'Read speed', ['device'])
disk_write_speed = Gauge('disk_write_bytes_per_sec', 'Write speed', ['device'])
def collect():
    # 采集空间
    for part in psutil.disk_partitions():
        usage = psutil.disk_usage(part.mountpoint)
        disk_usage.labels(mountpoint=part.mountpoint).set(usage.percent)
if __name__ == '__main__':
    start_http_server(8000)  # 暴露metrics接口
    while True:
        collect()
        time.sleep(10)

之后在prometheus.yml中加入- targets: ['你的IP:8000'],Grafana中导入ID为1860的磁盘监控模板(官方推荐)。


常见问题FAQ

Q1:如何在Windows上获取磁盘型号和序列号?
A:使用wmi库:import wmi; c = wmi.WMI(); for disk in c.Win32_DiskDrive(): print(disk.Model, disk.SerialNumber)

Q2:采集数据占用CPU过高怎么办?
A:psutil.disk_io_counters()本身不占用资源,但disk_partitions()在NFS挂载点过多时可能阻塞,可增加超时参数:psutil.disk_partitions(all=False)

Q3:怎么监控网络文件系统(NFS/CIFS)的延迟?
A:使用subprocess执行mountstats命令,或挂载时启用stat=0(Linux)禁用客户端延迟统计。

Q4:是否有现成的Python磁盘监控开源项目?
A:推荐psutil官方示例(GitHub psutil/examples)、Telegraf(Go编写,但支持Python插件)、Glances(Python全栈监控)。


总结与最佳实践建议

综合以上8个案例,我们覆盖了从单机脚本到分布式集群、从终端告警到Grafana可视化的完整链路,在生成文章时,建议遵循以下原则:

  1. 轻量化优先:除非需要长时间历史分析,否则避免使用重量级数据库,SQLite或InfluxDB足够。
  2. 异常降级:当远程服务器SSH连接失败时,记录日志而非中断整个监控流程。
  3. 安全合规:不要在代码中硬编码密码,使用环境变量或Vault。
  4. IO采样时机:IO统计需要至少间隔0.5秒以上采样两次计算差值,单次数据意义不大。
  5. 缓存挂载点列表:频繁调用disk_partitions()可能产生系统调用开销,可每5分钟缓存一次。

一个生产级别的磁盘监控系统,不应该只关注“报警时”的处理,更应该通过历史趋势分析预测磁盘扩容时间点——这正是Python数据分析库(如Pandas、NumPy)可以大显身手的地方,希望本文整理的8个案例能帮助你快速搭建适合自己业务的磁盘监控体系。

标签: Python

抱歉,评论功能暂时关闭!