Exciting news! TCMS official website is live! Offering full-stack software services including enterprise-level custom R&D, App and mini-program development, multi-system integration, AI, blockchain, and embedded development, empowering digital-intelligent transformation across industries. Visit dev.tekin.cn to discuss cooperation!

Python-Powered Corporate Network "Guardian": A Comprehensive Guide to Building an Efficient Monitoring System

2025-10-30 31 mins read

This guide demonstrates how to build a full-featured corporate network monitoring system using Python—covering device status checks, traffic statistics, port monitoring, fault alerts, and system integration. Python’s flexibility and rich libraries make it an cost-effective alternative to heavyweight commercial tools, especially for small-to-medium enterprises or teams with limited O&M budgets.

Python打造公司网络“守护者”:高效监控系统全解析
 

In the digital era, a company’s network functions like "blood vessels" supporting business operations—from daily office communication and customer data transmission to the operation of core business systems, all rely on a stable network environment. Once network outages, device failures, or abnormal traffic occur, the consequences range from reduced work efficiency to severe risks such as business downtime and data leaks. Python, with its concise syntax, rich library ecosystem, and cross-platform compatibility, emerges as an ideal tool for building lightweight, cost-effective network monitoring systems. This article provides a full breakdown of how to use Python to construct a network "guardian" tailored to enterprise needs, covering system design ideas, core function implementation, code deployment, and optimization iterations. 

1. Why Python for Network Monitoring? One of the "Optimal Solutions" 

When choosing network monitoring tools, enterprises often face challenges like "redundant functions in open-source tools," "high costs of commercial software," and "difficulty in customization." Python’s features address these pain points, making it the first choice for small-to-medium enterprises and large enterprises with lightweight monitoring scenarios: 

  1. Low Threshold & High Efficiency : Python syntax is close to natural language, and its development cycle is only 1/3 that of Java or C++. Even engineers not specialized in operation and maintenance (O&M) development can quickly learn to write monitoring scripts. 

  2. Rich Professional Library Support : No need to "reinvent the wheel"— ping3 for device connectivity detection, psutil for network traffic capture, socket for port and connection monitoring, smtplib / wxpy for alert notifications, and matplotlib / pandas for data visualization—covering the entire monitoring workflow. 

  3. Cross-Platform Compatibility : A single set of code can run on Windows Server (for monitoring office area switches), Linux (for monitoring data center servers), and macOS (for local O&M debugging), eliminating the need for separate adaptations for different systems. 

  4. Lightweight Resource Footprint : Compared to open-source monitoring platforms like Zabbix and Nagios, Python scripts typically occupy less than 100MB of memory, avoiding additional load on monitored devices (such as edge routers and small switches). 

2. Core Requirements for Corporate Network Monitoring: What to "Guard Against"? 

Before starting development, it is essential to clarify the core monitoring objectives of the enterprise network to avoid "monitoring for the sake of monitoring." Combining O&M scenarios of most companies, core requirements can be categorized into three types: 

Monitoring DimensionCore ObjectiveTypical Scenarios
Device StatusEnsure core network devices (routers, switches, servers) are online and functioning properlyData center router outage causing company-wide network failure; core server downtime leading to unavailable business systems
Network TrafficReal-time statistics on bandwidth usage and identification of abnormal traffic (e.g., P2P downloads, malicious attacks)Employees’ excessive downloads occupying bandwidth, resulting in slow customer access to the company website; abnormal upload traffic indicating potential data leaks
Connections & PortsMonitor the status of critical ports (e.g., 80/443 for web services, 3306 for databases) and identify unauthorized connectionsFrequent scanning of database ports by external IPs; unauthorized devices accessing the internal LAN
Fault AlertsNotify O&M personnel immediately when network anomalies occur to shorten fault troubleshooting timeA switch failure at 2 AM remains undetected until work hours, causing 3 hours of business interruption

3. Modular Implementation: The "Vital Components" of a Python Monitoring System 

The following breaks down the system implementation process from four core modules— device monitoring → traffic statistics → connection monitoring → fault alerts —with practical code and scenarios. 

Module 1: Device Status Monitoring – "Basic Operation" Beyond Simple Ping 

Core Requirement 

Batch detect the online status of internal network devices (e.g., router 192.168.1.1, server 192.168.1.100, printer 192.168.1.200). Mark failures and record timestamps if devices go offline. 

Technology Selection 

Use the ping3 library (easier to integrate into Python scripts than the system’s built-in ping command, supporting timeout settings and TTL configuration) with multi-threading for fast batch device detection (avoiding long wait times with single-threaded sequential detection). 

Code Implementation & Explanation 

import ping3
import threading
from datetime import datetime

# 1. Define the list of devices to monitor (can be read from config files/databases to avoid hardcoding)
devices = [
  {"name": "Core Router", "ip": "192.168.1.1"},
  {"name": "Web Server", "ip": "192.168.1.100"},
  {"name": "Finance Printer", "ip": "192.168.1.200"}
]

# 2. Single device detection function
def check_device_status(device):
   name = device["name"]
   ip = device["ip"]
   # Set timeout to 2 seconds (prevent prolonged waiting)
   response_time = ping3.ping(ip, timeout=2)
   current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
   
   if response_time is not None:
       status = "Online"
       print(f"[{current_time}] {name}({ip}) - {status}, Response Time: {response_time:.2f}ms")
   else:
       status = "Offline"
       print(f"[{current_time}] {name}({ip}) - {status}!!!")
       # Call the alert function (Module 4 below) to trigger offline alerts
       send_alert(f"Device Offline Alert", f"{name}({ip}) was detected offline at {current_time}. Please troubleshoot promptly.")

# 3. Multi-threaded batch detection (improve efficiency for monitoring 10+ devices simultaneously)
def batch_check_devices():
   threads = []
   for device in devices:
       t = threading.Thread(target=check_device_status, args=(device,))
       threads.append(t)
       t.start()
   # Wait for all threads to complete execution
   for t in threads:
       t.join()

# Test execution
if __name__ == "__main__":
   batch_check_devices()

Optimization Tips 

  • Avoid "False Alerts" : A single failed ping may result from network fluctuations. Add logic like "mark as offline only after 3 consecutive failed detections"; 

  • Configurable Management : Store the device list in a devices.json file. No code modifications are needed when adding devices later. Example: 

    [{"name": "Core Router", "ip": "192.168.1.1"}, {"name": "Web Server", "ip": "192.168.1.100"}]

    Read it in code using json.load(open("devices.json"))

Module 2: Traffic Statistics – Identify "Bandwidth Hogs" and Abnormal Traffic 

Core Requirement 

Real-time statistics on upload/download speeds of specified network interfaces (e.g., eth0 for servers, wan port for office routers). Trigger alerts if traffic exceeds thresholds (e.g., download speed > 10MB/s for 5 consecutive minutes). 

Technology Selection 

Use the psutil library (cross-platform system resource retrieval, supporting traffic statistics for Windows "Ethernet" and Linux "eth0" interfaces). 

Code Implementation & Explanation 

import psutil
import time
from datetime import datetime

# 1. Get real-time traffic of a specified interface (unit: bytes)
def get_interface_traffic(interface="eth0"):
   net_io = psutil.net_io_counters(pernic=True).get(interface)
   if not net_io:
       raise ValueError(f"Network interface not found: {interface}. Please check the interface name.")
   # Return uploaded bytes and downloaded bytes
   return net_io.bytes_sent, net_io.bytes_recv

# 2. Calculate traffic speed (unit: MB/s)
def calculate_speed(interface, interval=1):
   # Get traffic 1 second ago
   sent_before, recv_before = get_interface_traffic(interface)
   time.sleep(interval)
   # Get current traffic
   sent_after, recv_after = get_interface_traffic(interface)
   # Calculate speed (1MB = 1024*1024 bytes)
   upload_speed = (sent_after - sent_before) / (1024 * 1024)
   download_speed = (recv_after - recv_before) / (1024 * 1024)
   return round(upload_speed, 2), round(download_speed, 2)

# 3. Continuously monitor traffic and check threshold violations
def monitor_traffic(interface="eth0", max_download=10):
   while True:
       upload, download = calculate_speed(interface)
       current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
       print(f"[{current_time}] {interface} - Upload: {upload}MB/s, Download: {download}MB/s")
       
       # Trigger alert if download speed exceeds the threshold
       if download > max_download:
           alert_msg = f"Traffic Exceeded Alert\nInterface: {interface}\nTime: {current_time}\nDownload Speed: {download}MB/s (Threshold: {max_download}MB/s)"
           send_alert("Traffic Over Limit", alert_msg)
       
       # Monitor every 5 seconds (adjustable based on needs)
       time.sleep(5)

# Test execution (Linux example; for Windows, replace with interface name like "Ethernet")
if __name__ == "__main__":
   monitor_traffic(interface="eth0", max_download=10)

Practical Scenarios 

  • For office networks: Set "alert if download speed > 5MB/s during working hours" to prevent employees from using company bandwidth to download videos; 

  • For web servers: Monitor "alert if WAN port upload speed > 2MB/s" to check for abnormal data uploads (e.g., data leaks). 

Module 3: Connection & Port Monitoring – Block "Network Backdoors" 

Core Requirement 

  • Monitor whether critical ports (e.g., 3306 for databases, 3389 for remote desktops) are open. Trigger alerts if ports are closed; 

  • Count connections to specified ports (e.g., concurrent connections to port 80 of web servers) and identify abnormal connections (e.g., frequent connections from a single IP may indicate an attack). 

Technology Selection 

  • socket library: Detect port availability and establish TCP connections; 

  • psutil.net_connections() : Retrieve all network connection information and filter connections to specified ports. 

Code Implementation & Explanation 

import socket
import psutil
from datetime import datetime

# 1. Check if a specified IP:port is open
def check_port(ip, port, timeout=2):
   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   sock.settimeout(timeout)
   try:
       sock.connect((ip, port))
       sock.close()
       return True  # Port is open
   except:
       return False  # Port is closed

# 2. Monitor the status of critical ports
def monitor_ports(port_list):
   while True:
       current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
       for ip, port in port_list:
           is_open = check_port(ip, port)
           status = "Open" if is_open else "Closed"
           print(f"[{current_time}] {ip}:{port} - {status}")
           if not is_open:
               alert_msg = f"Port Abnormality Alert\nIP:Port: {ip}:{port}\nTime: {current_time}\nStatus: {status} (Expected: Open)"
               send_alert("Port Anomaly", alert_msg)
       time.sleep(10)  # Check every 10 seconds

# 3. Count connections to a specified port (e.g., port 80)
def count_port_connections(port):
   connections = psutil.net_connections(kind="tcp")
   port_connections = [conn for conn in connections if conn.laddr.port == port and conn.status == "ESTABLISHED"]
   # Return connection count and list of connected IPs
   ip_list = [conn.raddr.ip for conn in port_connections if conn.raddr]
   return len(port_connections), ip_list

# 4. Monitor port connections and identify abnormal IPs
def monitor_port_connections(port=80, max_conn=100, max_ip_conn=20):
   while True:
       conn_count, ip_list = count_port_connections(port)
       current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
       print(f"[{current_time}] Port {port} - Concurrent Connections: {conn_count}")
       
       # Trigger alert if connection count exceeds the threshold
       if conn_count > max_conn:
           alert_msg = f"Port Connection Limit Exceeded Alert\nPort: {port}\nTime: {current_time}\nConnection Count: {conn_count} (Threshold: {max_conn})"
           send_alert("Port Connections Over Limit", alert_msg)
       
       # Count connections per IP; alert if exceeding threshold (e.g., 20 connections from one IP)
       ip_conn_count = {}
       for ip in ip_list:
           ip_conn_count[ip] = ip_conn_count.get(ip, 0) + 1
       for ip, count in ip_conn_count.items():
           if count > max_ip_conn:
               alert_msg = f"Abnormal IP Connection Alert\nPort: {port}\nIP: {ip}\nTime: {current_time}\nConnection Count: {count} (Threshold: {max_ip_conn})"
               send_alert("Abnormal IP Connections", alert_msg)
       
       time.sleep(5)

# Test execution (monitor database port 3306 and web server port 80)
if __name__ == "__main__":
   # Monitor critical port status
   critical_ports = [("192.168.1.100", 3306), ("192.168.1.101", 80)]
   monitor_ports(critical_ports)
   
   # Monitor port 80 connections (run in a separate thread to avoid blocking)
   # import threading
   # threading.Thread(target=monitor_port_connections, args=(80, 100, 20)).start()

Module 4: Fault Alerts – Keep O&M Teams "Informed First" 

A key value of monitoring systems is "timely notifications during anomalies." Without alerts, monitoring is like "having a fire alarm that doesn’t ring." Common alert methods include email, WeChat Work/DingTalk, and SMS (requiring third-party API integration). Below is an implementation example using WeChat Work Robot Alerts (low threshold, free, and suitable for team collaboration). 

Code Implementation (WeChat Work Robot Alert) 

import requests
import json
from datetime import datetime

# WeChat Work Robot WebHook (create a robot in WeChat Work to get the link)
WECHAT_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_robot_key"

def send_alert(title, content):
"""Send alerts via WeChat Work"""
# Construct alert message (supports Markdown for better readability)
msg = {
"msgtype": "markdown",
"markdown": {
"content": f"### {title}\n{content}\n\n**Alert Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}
}
try:
response = requests.post(WECHAT_WEBHOOK, data=json.dumps(msg), headers={"Content-Type": "application/json"})
if response.json().get("errcode") == 0:
print(f"Alert sent successfully: {title}")
else:
print(f"Alert failed to send: {response.text}")
except Exception as e:
print(f"Exception occurred while sending alert: {str(e)}")

Alert Optimization Suggestions 

  • Tiered Alerts : For critical faults (e.g., core server downtime), send alerts via email + WeChat Work + SMS; for minor faults (e.g., printer offline), only send WeChat Work alerts; 

  • Avoid "Alert Spam" : Set a rule like "only one alert per fault (e.g., device offline) within 10 minutes" to prevent frequent notifications from disturbing O&M teams. 

4. System Integration & Optimization: From "Script" to "Usable System" 

Individual module scripts only solve partial problems. A complete system requires integration with scheduled tasks, data storage, and visualization , plus performance optimization. 

1. Scheduled Tasks: Automate Monitoring 

Use the APScheduler library to implement scheduled tasks (more flexible than time.sleep() , supporting cron expressions, interval execution, and scheduled time execution). 

from apscheduler.schedulers.blocking import BlockingScheduler

# 1. Define all monitoring tasks
def run_all_monitors():
   # Task 1: Check device status every 1 minute
   scheduler.add_job(batch_check_devices, "interval", minutes=1, id="device_check")
   
   # Task 2: Monitor traffic every 5 seconds
   scheduler.add_job(monitor_traffic, "interval", seconds=5, id="traffic_monitor", args=("eth0", 10))
   
   # Task 3: Check critical ports every 10 seconds
   critical_ports = [("192.168.1.100", 3306), ("192.168.1.101", 80)]
   scheduler.add_job(monitor_ports, "interval", seconds=10, id="port_check", args=(critical_ports,))
   
   # Task 4: Monitor port 80 connections every 5 seconds
   scheduler.add_job(monitor_port_connections, "interval", seconds=5, id="conn_monitor", args=(80, 100, 20))

# 2. Start the scheduler
if __name__ == "__main__":
   scheduler = BlockingScheduler()
   run_all_monitors()
   print("Monitoring system started. Running continuously...")
   try:
       scheduler.start()
   except (KeyboardInterrupt, SystemExit):
       print("Monitoring system stopped manually.")
   except Exception as e:
       print(f"Monitoring system crashed: {str(e)}. Restarting...")
       # Optional: Add auto-restart logic for production environments
       run_all_monitors()
       scheduler.start()

2. Data Storage: Track Historical Trends 

For long-term O&M, storing monitoring data (e.g., daily traffic peaks, device offline records) helps analyze trends and optimize networks. A lightweight option is to use SQLite (no server required, file-based) or MySQL for larger-scale deployments. Below is an example of storing traffic data with SQLite: 

import sqlite3
from datetime import datetime

# 1. Initialize database and table
def init_db(db_name="network_monitor.db"):
   conn = sqlite3.connect(db_name)
   cursor = conn.cursor()
   # Create traffic statistics table
   cursor.execute('''
      CREATE TABLE IF NOT EXISTS traffic (
          id INTEGER PRIMARY KEY AUTOINCREMENT,
          interface TEXT NOT NULL,
          upload_speed REAL NOT NULL,
          download_speed REAL NOT NULL,
          record_time TEXT NOT NULL
      )
  ''')
   conn.commit()
   conn.close()

# 2. Insert traffic data into the database
def insert_traffic_data(interface, upload, download, db_name="network_monitor.db"):
   record_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
   conn = sqlite3.connect(db_name)
   cursor = conn.cursor()
   cursor.execute('''
      INSERT INTO traffic (interface, upload_speed, download_speed, record_time)
      VALUES (?, ?, ?, ?)
  ''', (interface, upload, download, record_time))
   conn.commit()
   conn.close()

# 3. Modify the monitor_traffic function to include data storage
def monitor_traffic(interface="eth0", max_download=10):
   init_db()  # Ensure database and table exist
   while True:
       upload, download = calculate_speed(interface)
       current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
       print(f"[{current_time}] {interface} - Upload: {upload}MB/s, Download: {download}MB/s")
       
       # Insert data into database
       insert_traffic_data(interface, upload, download)
       
       # Trigger alert if threshold is exceeded
       if download > max_download:
           alert_msg = f"Traffic Exceeded Alert\nInterface: {interface}\nTime: {current_time}\nDownload Speed: {download}MB/s (Threshold: {max_download}MB/s)"
           send_alert("Traffic Over Limit", alert_msg)
       
       time.sleep(5)

3. Visualization: Make Data Intuitive 

Raw data is hard to interpret—visualization tools like matplotlib or Plotly can turn historical traffic, device status, and connection counts into charts. Below is an example of plotting 24-hour traffic trends with matplotlib

import sqlite3
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime

# 1. Query 24-hour traffic data for a specified interface
def get_24h_traffic(interface, db_name="network_monitor.db"):
   conn = sqlite3.connect(db_name)
   cursor = conn.cursor()
   # Calculate time 24 hours ago
   twenty_four_hours_ago = (datetime.now() - timedelta(hours=24)).strftime("%Y-%m-%d %H:%M:%S")
   cursor.execute('''
      SELECT record_time, upload_speed, download_speed
      FROM traffic
      WHERE interface = ? AND record_time >= ?
      ORDER BY record_time ASC
  ''', (interface, twenty_four_hours_ago))
   data = cursor.fetchall()
   conn.close()
   
   # Format data for plotting
   times = [datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S") for row in data]
   uploads = [row[1] for row in data]
   downloads = [row[2] for row in data]
   return times, uploads, downloads

# 2. Plot 24-hour traffic trend chart
def plot_24h_traffic(interface="eth0"):
   times, uploads, downloads = get_24h_traffic(interface)
   
   plt.figure(figsize=(12, 6))
   plt.plot(times, uploads, label="Upload Speed (MB/s)", color="blue")
   plt.plot(times, downloads, label="Download Speed (MB/s)", color="red")
   
   # Format chart
   plt.xlabel("Time")
   plt.ylabel("Speed (MB/s)")
   plt.title(f"24-Hour Network Traffic Trend - {interface}")
   plt.legend()
   plt.grid(True, linestyle="--", alpha=0.7)
   # Rotate x-axis labels for readability
   plt.xticks(rotation=45)
   # Format x-axis to show hours
   plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
   plt.gca().xaxis.set_major_locator(mdates.HourLocator(interval=2))
   
   # Save or display the chart
   plt.tight_layout()
   plt.savefig(f"{interface}_24h_traffic.png")
   plt.close()
   print(f"24-hour traffic chart saved as {interface}_24h_traffic.png")

5. Production Deployment: Ensure Stability 

To turn the script into a production-ready system, consider the following adjustments: 

1. Run as a Background Service 

  • Linux : Use systemd to create a service (e.g., network-monitor.service ) so the system starts automatically on boot: 

    [Unit]
    Description=Python Network Monitoring Service
    After=network.target

    [Service]
    User=root # Use a non-root user for security in production
    ExecStart=/usr/bin/python3 /path/to/monitor_system.py
    Restart=always # Restart if the service crashes

    [Install]
    WantedBy=multi-user.target

    Enable and start the service with: 

    sudo systemctl enable network-monitor
    sudo systemctl start network-monitor
  • Windows : Use "Task Scheduler" to create a task that runs the script at startup (select "Run whether user is logged on or not"). 

2. Logging: Troubleshoot Efficiently 

Replace print() with Python’s built-in logging module to record detailed logs (e.g., errors, alerts, status updates) to a file: 

import logging

# Configure logging
logging.basicConfig(
   level=logging.INFO,
   format="%(asctime)s - %(levelname)s - %(message)s",
   handlers=[
       logging.FileHandler("network_monitor.log"),  # Write to file
       logging.StreamHandler()  # Also print to console
  ]
)

# Use logging instead of print
logging.info(f"[{current_time}] {interface} - Upload: {upload}MB/s, Download: {download}MB/s")
logging.error(f"Failed to send alert: {str(e)}")

3. Security Hardening 

  • Limit Permissions : Run the script with a non-root user (avoid unnecessary privileges); 

  • Encrypt Credentials : If using services like email or WeChat Work, store API keys/credentials in environment variables (not hardcoded in scripts); 

  • Whitelist IPs : Restrict database access (e.g., SQLite file permissions, MySQL IP whitelisting) to prevent unauthorized modifications. 

6. Conclusion & Future Outlook 

This guide demonstrates how to build a full-featured corporate network monitoring system using Python—covering device status checks, traffic statistics, port monitoring, fault alerts, and system integration. Python’s flexibility and rich libraries make it an cost-effective alternative to heavyweight commercial tools, especially for small-to-medium enterprises or teams with limited O&M budgets. 

For future enhancements, consider: 

  • AI-Powered Anomaly Detection : Use machine learning (e.g., scikit-learn ) to train models on historical data, enabling proactive identification of abnormal traffic or device behavior (e.g., predicting router failures based on temperature trends); 

  • Web Dashboard : Integrate with Flask or Django to build a web-based dashboard, allowing O&M teams to view real-time data and charts from any device; 

  • Multi-Region Monitoring : Extend the system to monitor cloud resources (e.g., AWS EC2 instances, Azure VMs) using their official Python SDKs, creating a unified monitoring platform for on-premises and cloud networks. 

With continuous optimization, this Python-based monitoring system can grow into a robust "guardian" that ensures long-term stability and security for corporate networks.

Image NewsLetter
Icon primary
Newsletter

Subscribe our newsletter

Please enter your email address below and click the subscribe button. By doing so, you agree to our Terms and Conditions.

Your experience on this site will be improved by allowing cookies Cookie Policy