Exciting news! TCMS official website is live! Offering full-stack software services including enterprise-level custom R&D, App and mini-program development, multi-system integration, AI, blockchain, and embedded development, empowering digital-intelligent transformation across industries. Visit dev.tekin.cn to discuss cooperation!

Python-Based Network Monitoring System

2025-10-30 11 mins read

Build a Python-based network monitoring system with device tracking, traffic analysis, and alert notifications. Complete code with scheduled tasks and cross-platform support.

基于Python的网络监控系统
 

Below is a complete implementation of a Python-based network monitoring system, including core functional modules and scheduled task mechanisms:

import os
import psutil
import socket
import smtplib
from email.mime.text import MIMEText
from apscheduler.schedulers.blocking import BlockingScheduler
import platform
import time

# ============================== Configuration Parameters ==================
MONITORED_IPS = ["192.168.1.1", "192.168.1.100"]  # IPs of devices to monitor
ALERT_THRESHOLD = {
   "upload": 1024 * 1024 * 5,  # Upload threshold (5MB/minute)
   "download": 1024 * 1024 * 10  # Download threshold (10MB/minute)
}
EMAIL_CONFIG = {
   "sender": "your_sender_email@example.com",
   "receiver": "your_receiver_email@example.com",
   "smtp_server": "smtp.example.com",
   "smtp_port": 587,
   "password": "your_email_password"
}

# ================== Device Status Monitoring ==================
def ping_device(ip):
   """
  Cross-platform ping test
  """
   param = "-n 1" if platform.system().lower() == "windows" else "-c 1"
   command = f"ping {param} {ip}"
   return os.system(command) == 0

# ================== Traffic Statistics (with incremental calculation) ==================
class TrafficMonitor:
   """
  Traffic monitoring class to calculate real-time upload and download traffic
  """
   def __init__(self):
       self.prev_upload = 0
       self.prev_download = 0

   def get_traffic(self):
       net_io = psutil.net_io_counters()
       current_upload = net_io.bytes_sent
       current_download = net_io.bytes_recv
       traffic = {
           "upload": current_upload - self.prev_upload,
           "download": current_download - self.prev_download
      }
       self.prev_upload, self.prev_download = current_upload, current_download
       return traffic

# ================== Connection Monitoring ==================
def monitor_connections():
   """
  Monitor TCP/UDP connections
  """
   connections = []
   for conn in socket.net_connections():
       if conn.type in (socket.SOCK_STREAM, socket.SOCK_DGRAM):
           local = f"{conn.laddr[0]}:{conn.laddr[1]}"
           remote = f"{conn.raddr[0]}:{conn.raddr[1]}" if conn.raddr else "N/A"
           connections.append({
               "type": "TCP" if conn.type == socket.SOCK_STREAM else "UDP",
               "local": local,
               "remote": remote
          })
   return connections

# ================== Fault Alerting ==================
def send_alert_email(subject, message):
   """
  Email alert notification
  """
   msg = MIMEText(message)
   msg["Subject"] = subject
   msg["From"] = EMAIL_CONFIG["sender"]
   msg["To"] = EMAIL_CONFIG["receiver"]

   try:
       with smtplib.SMTP(EMAIL_CONFIG["smtp_server"], EMAIL_CONFIG["smtp_port"]) as server:
           server.starttls()
           server.login(EMAIL_CONFIG["sender"], EMAIL_CONFIG["password"])
           server.sendmail(EMAIL_CONFIG["sender"], [EMAIL_CONFIG["receiver"]], msg.as_string())
       print("Alert email sent successfully")
   except Exception as e:
       print(f"Failed to send email: {str(e)}")

# ================== Core Monitoring Logic ==================
traffic_monitor = TrafficMonitor()

def device_status_check():
   """
  Device status check task
  """
   offline_devices = [ip for ip in MONITORED_IPS if not ping_device(ip)]
   if offline_devices:
       message = f"Device offline alert: {', '.join(offline_devices)} are unreachable"
       send_alert_email("[Device Offline] Network Monitoring Alert", message)
       print(f"[WARNING] Offline devices: {offline_devices}")

def traffic_statistics():
   """
  Traffic statistics and threshold checking
  """
   traffic = traffic_monitor.get_traffic()
   upload_mb = traffic["upload"] / (1024 * 1024)
   download_mb = traffic["download"] / (1024 * 1024)
   
   alerts = []
   if upload_mb > ALERT_THRESHOLD["upload"] / (1024 * 1024):
       alerts.append(f"Abnormal upload traffic: {upload_mb:.2f}MB (Threshold: {ALERT_THRESHOLD['upload']/1024/1024:.2f}MB)")
   if download_mb > ALERT_THRESHOLD["download"] / (1024 * 1024):
       alerts.append(f"Abnormal download traffic: {download_mb:.2f}MB (Threshold: {ALERT_THRESHOLD['download']/1024/1024:.2f}MB)")
   
   if alerts:
       send_alert_email("[Traffic Anomaly] Network Monitoring Alert", "\n".join(alerts))
       print(f"[WARNING] Traffic anomalies: {', '.join(alerts)}")
   else:
       print(f"Traffic normal - Upload: {upload_mb:.2f}MB, Download: {download_mb:.2f}MB")

def connection_monitor():
   """
  Connection monitoring task
  """
   connections = monitor_connections()
   if len(connections) > 100:  # Example: Alert when connections exceed 100
       message = f"Abnormal connection count alert: Current connections ({len(connections)}) exceed threshold"
       send_alert_email("[Connection Anomaly] Network Monitoring Alert", message)
       print(f"[WARNING] Abnormal connection count: {len(connections)}")
   else:
       print(f"Current connections: {len(connections)}")

# ================== System Integration and Scheduled Tasks ==================
if __name__ == "__main__":
   # Initialize traffic monitoring baseline values
   traffic_monitor.get_traffic()  # First call to establish baseline
   
   scheduler = BlockingScheduler()
   scheduler.add_job(func=device_status_check, trigger="interval", minutes=1, name="Device Status Check")
   scheduler.add_job(func=traffic_statistics, trigger="interval", minutes=5, name="Traffic Statistics")
   scheduler.add_job(func=connection_monitor, trigger="interval", minutes=10, name="Connection Monitor")
   
   print("Network monitoring system started...")
   try:
       scheduler.start()
   except KeyboardInterrupt:
       print("Monitoring system stopped")

Usage Instructions:

  1. Configuration Parameters Modification:

    • MONITORED_IPS: Add the list of device IPs you need to monitor

    • ALERT_THRESHOLD: Adjust traffic alert thresholds as needed (unit: bytes)

    • EMAIL_CONFIG: Replace with your email server configuration (application-specific passwords recommended)

  2. Dependency Installation:

    pip install psutil apscheduler
  3. Feature Expansion Suggestions:

    • Data Storage: Add SQLite/MySQL for historical data storage (refer to sqlite3 library)

    import sqlite3
    def save_to_db(data):
       conn = sqlite3.connect('network_monitor.db')
       cursor = conn.cursor()
       cursor.execute('INSERT INTO traffic (time, upload, download) VALUES (?, ?, ?)', data)
       conn.commit()
       conn.close()
    • Multi-device Support: Set individual monitoring parameters and alert thresholds for each device

    • Enhanced Alerts:

      • Add SMS notifications

      • Support DingTalk/WeChat Work robot notifications

    • Visual Interface: Develop a web monitoring dashboard using Flask/Django

    • Protocol Analysis: Implement deeper packet analysis with the Scapy library

  4. Running the System:

    python network_monitor.py

System Features:

  1. Cross-platform Support: Automatically adapts to Windows/Linux ping commands

  2. Multi-level Alerts: Three-tier alert mechanism for device offline, traffic anomalies, and connection count anomalies

  3. Configurable: Flexibly adjust monitoring targets and thresholds through parameters

  4. Scheduled Tasks: Implement flexible task scheduling with APScheduler

Notes:

  1. Email alerts require SMTP service to be enabled

  2. Traffic statistics accuracy depends on scheduled task intervals (recommended: 5-10 minutes)

  3. For large-scale network environments, use multi-threading/asynchronous processing to improve performance

  4. Add logging functionality in production environments (using the logging library)

This implementation provides basic network monitoring functionality and can be further extended and optimized according to actual needs, such as adding more monitoring metrics, improving data visualization, and enhancing alert reliability.

 

Tags: Python network monitor, network traffic monitoring Python, Python ping monitor, network alert system Python, Python psutil network, APScheduler network tasks

Image NewsLetter
Icon primary
Newsletter

Subscribe our newsletter

Please enter your email address below and click the subscribe button. By doing so, you agree to our Terms and Conditions.

Your experience on this site will be improved by allowing cookies Cookie Policy