Exciting news! TCMS official website is live! Offering full-stack software services including enterprise-level custom R&D, App and mini-program development, multi-system integration, AI, blockchain, and embedded development, empowering digital-intelligent transformation across industries. Visit dev.tekin.cn to discuss cooperation!
Build a Python-based network monitoring system with device tracking, traffic analysis, and alert notifications. Complete code with scheduled tasks and cross-platform support.

import os
import psutil
import socket
import smtplib
from email.mime.text import MIMEText
from apscheduler.schedulers.blocking import BlockingScheduler
import platform
import time
# ============================== Configuration Parameters ==================
MONITORED_IPS = ["192.168.1.1", "192.168.1.100"] # IPs of devices to monitor
ALERT_THRESHOLD = {
"upload": 1024 * 1024 * 5, # Upload threshold (5MB/minute)
"download": 1024 * 1024 * 10 # Download threshold (10MB/minute)
}
EMAIL_CONFIG = {
"sender": "your_sender_email@example.com",
"receiver": "your_receiver_email@example.com",
"smtp_server": "smtp.example.com",
"smtp_port": 587,
"password": "your_email_password"
}
# ================== Device Status Monitoring ==================
def ping_device(ip):
"""
Cross-platform ping test
"""
param = "-n 1" if platform.system().lower() == "windows" else "-c 1"
command = f"ping {param} {ip}"
return os.system(command) == 0
# ================== Traffic Statistics (with incremental calculation) ==================
class TrafficMonitor:
"""
Traffic monitoring class to calculate real-time upload and download traffic
"""
def __init__(self):
self.prev_upload = 0
self.prev_download = 0
def get_traffic(self):
net_io = psutil.net_io_counters()
current_upload = net_io.bytes_sent
current_download = net_io.bytes_recv
traffic = {
"upload": current_upload - self.prev_upload,
"download": current_download - self.prev_download
}
self.prev_upload, self.prev_download = current_upload, current_download
return traffic
# ================== Connection Monitoring ==================
def monitor_connections():
"""
Monitor TCP/UDP connections
"""
connections = []
for conn in socket.net_connections():
if conn.type in (socket.SOCK_STREAM, socket.SOCK_DGRAM):
local = f"{conn.laddr[0]}:{conn.laddr[1]}"
remote = f"{conn.raddr[0]}:{conn.raddr[1]}" if conn.raddr else "N/A"
connections.append({
"type": "TCP" if conn.type == socket.SOCK_STREAM else "UDP",
"local": local,
"remote": remote
})
return connections
# ================== Fault Alerting ==================
def send_alert_email(subject, message):
"""
Email alert notification
"""
msg = MIMEText(message)
msg["Subject"] = subject
msg["From"] = EMAIL_CONFIG["sender"]
msg["To"] = EMAIL_CONFIG["receiver"]
try:
with smtplib.SMTP(EMAIL_CONFIG["smtp_server"], EMAIL_CONFIG["smtp_port"]) as server:
server.starttls()
server.login(EMAIL_CONFIG["sender"], EMAIL_CONFIG["password"])
server.sendmail(EMAIL_CONFIG["sender"], [EMAIL_CONFIG["receiver"]], msg.as_string())
print("Alert email sent successfully")
except Exception as e:
print(f"Failed to send email: {str(e)}")
# ================== Core Monitoring Logic ==================
traffic_monitor = TrafficMonitor()
def device_status_check():
"""
Device status check task
"""
offline_devices = [ip for ip in MONITORED_IPS if not ping_device(ip)]
if offline_devices:
message = f"Device offline alert: {', '.join(offline_devices)} are unreachable"
send_alert_email("[Device Offline] Network Monitoring Alert", message)
print(f"[WARNING] Offline devices: {offline_devices}")
def traffic_statistics():
"""
Traffic statistics and threshold checking
"""
traffic = traffic_monitor.get_traffic()
upload_mb = traffic["upload"] / (1024 * 1024)
download_mb = traffic["download"] / (1024 * 1024)
alerts = []
if upload_mb > ALERT_THRESHOLD["upload"] / (1024 * 1024):
alerts.append(f"Abnormal upload traffic: {upload_mb:.2f}MB (Threshold: {ALERT_THRESHOLD['upload']/1024/1024:.2f}MB)")
if download_mb > ALERT_THRESHOLD["download"] / (1024 * 1024):
alerts.append(f"Abnormal download traffic: {download_mb:.2f}MB (Threshold: {ALERT_THRESHOLD['download']/1024/1024:.2f}MB)")
if alerts:
send_alert_email("[Traffic Anomaly] Network Monitoring Alert", "\n".join(alerts))
print(f"[WARNING] Traffic anomalies: {', '.join(alerts)}")
else:
print(f"Traffic normal - Upload: {upload_mb:.2f}MB, Download: {download_mb:.2f}MB")
def connection_monitor():
"""
Connection monitoring task
"""
connections = monitor_connections()
if len(connections) > 100: # Example: Alert when connections exceed 100
message = f"Abnormal connection count alert: Current connections ({len(connections)}) exceed threshold"
send_alert_email("[Connection Anomaly] Network Monitoring Alert", message)
print(f"[WARNING] Abnormal connection count: {len(connections)}")
else:
print(f"Current connections: {len(connections)}")
# ================== System Integration and Scheduled Tasks ==================
if __name__ == "__main__":
# Initialize traffic monitoring baseline values
traffic_monitor.get_traffic() # First call to establish baseline
scheduler = BlockingScheduler()
scheduler.add_job(func=device_status_check, trigger="interval", minutes=1, name="Device Status Check")
scheduler.add_job(func=traffic_statistics, trigger="interval", minutes=5, name="Traffic Statistics")
scheduler.add_job(func=connection_monitor, trigger="interval", minutes=10, name="Connection Monitor")
print("Network monitoring system started...")
try:
scheduler.start()
except KeyboardInterrupt:
print("Monitoring system stopped")Configuration Parameters Modification:
MONITORED_IPS: Add the list of device IPs you need to monitor
ALERT_THRESHOLD: Adjust traffic alert thresholds as needed (unit: bytes)
EMAIL_CONFIG: Replace with your email server configuration (application-specific passwords recommended)
Dependency Installation:
pip install psutil apschedulerFeature Expansion Suggestions:
Data Storage: Add SQLite/MySQL for historical data storage (refer to sqlite3 library)
import sqlite3
def save_to_db(data):
conn = sqlite3.connect('network_monitor.db')
cursor = conn.cursor()
cursor.execute('INSERT INTO traffic (time, upload, download) VALUES (?, ?, ?)', data)
conn.commit()
conn.close()Multi-device Support: Set individual monitoring parameters and alert thresholds for each device
Enhanced Alerts:
Add SMS notifications
Support DingTalk/WeChat Work robot notifications
Visual Interface: Develop a web monitoring dashboard using Flask/Django
Protocol Analysis: Implement deeper packet analysis with the Scapy library
Running the System:
python network_monitor.pyCross-platform Support: Automatically adapts to Windows/Linux ping commands
Multi-level Alerts: Three-tier alert mechanism for device offline, traffic anomalies, and connection count anomalies
Configurable: Flexibly adjust monitoring targets and thresholds through parameters
Scheduled Tasks: Implement flexible task scheduling with APScheduler
Email alerts require SMTP service to be enabled
Traffic statistics accuracy depends on scheduled task intervals (recommended: 5-10 minutes)
For large-scale network environments, use multi-threading/asynchronous processing to improve performance
Add logging functionality in production environments (using the logging library)