喜讯!TCMS 官网正式上线!一站式提供企业级定制研发、App 小程序开发、AI 与区块链等全栈软件服务,助力多行业数智转型,欢迎致电:13888011868 QQ 932256355 洽谈合作!
基于Python的网络监控系统实战

 
import os
import psutil
import socket
import smtplib
from email.mime.text import MIMEText
from apscheduler.schedulers.blocking import BlockingScheduler
import platform
import time
# ================== 配置参数 ==================
MONITORED_IPS = ["192.168.1.1", "192.168.1.100"]  # 待监控设备IP
ALERT_THRESHOLD = {
    "upload": 1024 * 1024 * 5,  # 上传流量阈值(5MB/分钟)
    "download": 1024 * 1024 * 10  # 下载流量阈值(10MB/分钟)
}
EMAIL_CONFIG = {
    "sender": "your_sender_email@example.com",
    "receiver": "your_receiver_email@example.com",
    "smtp_server": "smtp.example.com",
    "smtp_port": 587,
    "password": "your_email_password"
}
# ================== 设备状态监测 ==================
def ping_device(ip):
    """
    跨平台Ping检测
    """
    param = "-n 1" if platform.system().lower() == "windows" else "-c 1"
    command = f"ping {param} {ip}"
    return os.system(command) == 0
# ================== 流量统计(带增量计算) ==================
class TrafficMonitor:
    """
    流量监控类,计算实时上传下载流量
    """
    def __init__(self):
        self.prev_upload = 0
        self.prev_download = 0
    def get_traffic(self):
        net_io = psutil.net_io_counters()
        current_upload = net_io.bytes_sent
        current_download = net_io.bytes_recv
        traffic = {
            "upload": current_upload - self.prev_upload,
            "download": current_download - self.prev_download
        }
        self.prev_upload, self.prev_download = current_upload, current_download
        return traffic
# ================== 连接监控 ==================
def monitor_connections():
    """
    监控TCP/UDP连接
    """
    connections = []
    for conn in socket.net_connections():
        if conn.type in (socket.SOCK_STREAM, socket.SOCK_DGRAM):
            local = f"{conn.laddr[0]}:{conn.laddr[1]}"
            remote = f"{conn.raddr[0]}:{conn.raddr[1]}" if conn.raddr else "N/A"
            connections.append({
                "type": "TCP" if conn.type == socket.SOCK_STREAM else "UDP",
                "local": local,
                "remote": remote
            })
    return connections
# ================== 故障告警 ==================
def send_alert_email(subject, message):
    """
    邮件告警
    """
    msg = MIMEText(message)
    msg["Subject"] = subject
    msg["From"] = EMAIL_CONFIG["sender"]
    msg["To"] = EMAIL_CONFIG["receiver"]
    try:
        with smtplib.SMTP(EMAIL_CONFIG["smtp_server"], EMAIL_CONFIG["smtp_port"]) as server:
            server.starttls()
            server.login(EMAIL_CONFIG["sender"], EMAIL_CONFIG["password"])
            server.sendmail(EMAIL_CONFIG["sender"], [EMAIL_CONFIG["receiver"]], msg.as_string())
        print("告警邮件发送成功")
    except Exception as e:
        print(f"邮件发送失败: {str(e)}")
# ================== 核心监控逻辑 ==================
traffic_monitor = TrafficMonitor()
def device_status_check():
    """
    设备状态检查任务
    """
    offline_devices = [ip for ip in MONITORED_IPS if not ping_device(ip)]
    if offline_devices:
        message = f"设备离线告警:{', '.join(offline_devices)} 无法连接"
        send_alert_email("【设备离线】网络监控告警", message)
        print(f"[警告] 设备离线: {offline_devices}")
def traffic_statistics():
    """
    流量统计与阈值检查
    """
    traffic = traffic_monitor.get_traffic()
    upload_mb = traffic["upload"] / (1024 * 1024)
    download_mb = traffic["download"] / (1024 * 1024)
    
    alert = []
    if upload_mb > ALERT_THRESHOLD["upload"]:
        alert.append(f"上传流量异常: {upload_mb:.2f}MB (阈值: {ALERT_THRESHOLD['upload']/1024/1024:.2f}MB)")
    if download_mb > ALERT_THRESHOLD["download"]:
        alert.append(f"下载流量异常: {download_mb:.2f}MB (阈值: {ALERT_THRESHOLD['download']/1024/1024:.2f}MB)")
    
    if alert:
        send_alert_email("【流量异常】网络监控告警", "\n".join(alert))
        print(f"[警告] 流量异常: {', '.join(alert)}")
    else:
        print(f"流量正常 - 上传: {upload_mb:.2f}MB, 下载: {download_mb:.2f}MB")
def connection_monitor():
    """
    连接监控任务
    """
    connections = monitor_connections()
    if len(connections) > 100:  # 示例:连接数超过100触发告警
        message = f"异常连接数告警:当前连接数 {len(connections)} 超过阈值"
        send_alert_email("【连接异常】网络监控告警", message)
        print(f"[警告] 连接数异常: {len(connections)}")
    else:
        print(f"当前连接数: {len(connections)}")
# ================== 系统整合与定时任务 ==================
if __name__ == "__main__":
    # 初始化流量监控初始值
    traffic_monitor.get_traffic()  # 第一次调用获取基准值
    
    scheduler = BlockingScheduler()
    scheduler.add_job(func=device_status_check, trigger="interval", minutes=1, name="设备状态检查")
    scheduler.add_job(func=traffic_statistics, trigger="interval", minutes=5, name="流量统计")
    scheduler.add_job(func=connection_monitor, trigger="interval", minutes=10, name="连接监控")
    
    print("网络监控系统启动...")
    try:
        scheduler.start()
    except KeyboardInterrupt:
        print("监控系统停止")配置参数修改 :
MONITORED_IPS :添加需要监控的设备 IP 列表
ALERT_THRESHOLD :根据需求调整流量告警阈值(单位:字节)
EMAIL_CONFIG :替换为你的邮件服务器配置(推荐使用应用专用密码)
依赖安装 :
pip install psutil apscheduler功能扩展建议 :
数据存储 :添加 SQLite/MySQL 存储历史数据(参考 sqlite3 库)
import sqlite3
def save_to_db(data):
    conn = sqlite3.connect('network_monitor.db')
    cursor = conn.cursor()
    cursor.execute('INSERT INTO traffic (time, upload, download) VALUES (?, ?, ?)', data)
    conn.commit()
    conn.close()多设备支持 :为每个设备单独设置监控参数和告警阈值
增强告警:
添加短信告警
支持钉钉 / 企业微信机器人通知
可视化界面 :使用 Flask/Django 开发 Web 监控面板
协议解析 :结合 Scapy 库实现更深入的数据包分析
运行方式 :
python network_monitor.py跨平台支持 :自动适配 Windows/Linux 系统 Ping 命令
多层告警 :设备离线、流量异常、连接数异常三级告警机制
可配置化 :通过参数配置灵活调整监控对象和阈值
定时任务 :使用 APScheduler 实现灵活的任务调度
邮件告警需要开启 SMTP 服务
流量统计精度取决于定时任务间隔,建议 5-10 分钟一次
大规模网络环境下建议使用多线程 / 异步处理提高性能
生产环境中建议添加日志记录功能(使用 logging 库)