开发者

使用Python实现IP地址和端口状态检测与监控

目录
  • 概述:为什么需要IP监控系统
  • 使用步骤说明
    • 1. 环境准备
    • 2. 系统部署
    • 3. 核心功能配置
  • 系统效果展示
    • 实时监控面板
    • 邮件告警示例
  • 核心源码解析
    • 1. 状态检测引擎
    • 2. 多线程监控架构
    • 3. 配置持久化实现
  • 高级功能扩展建议
    • 1. 微信/钉钉机器人告警
    • 2. 数据库存储历史记录
    • 3. 可视化图表展示
  • 总结与资源下载
    • 常见问题解答

      概述:为什么需要IP监控系统

      在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求。传统的人工巡检方式效率低下,而商业监控工具又往往价格昂贵。本文将带你用python从零打造一个高可用IP监控系统,具备以下核心功能:

      • 多目标监控:同时监测多个IP+端口组合状态
      • 智能告警:异常状态自动触发邮件通知(支持多收件人)
      • 可视化界面:基于Tkinter的现代化UI,操作直观
      • 配置持久化:所有设置自动保存,重启不丢失
      • 断线重试机制:避免网络抖动导致的误报

      技术栈:Python 3 + Tkinter + smtplib + socket + 多线程

      使用步骤说明

      1. 环境准备

      # 所需库(Python内置,无需额外安装)
      import tkinter
      import threading
      import smtplib
      import socket
      

      2. 系统部署

      下载完整代码(文末提供)

      配置config.json(首次运行会自动生成)

      运行主程序:

       python ip_monitor.py
      

      3. 核心功能配置

      目标配置

      通过表格形式管理监控目标

      复选框控制是否启用监测

      支持双击编辑现有条目

      邮件设置

      服务商SMTP地址端口授权码获取方式
      网易163smtp.163.com465/994邮箱设置→POP3/SMTP服务
      QQ邮箱smtp.qq.com465设置→账户→POP3服务
      Gmailsmtp.gmail.com587Google账号→应用密码

      # 配置示例(支持SSL/TLS)

      SMTP服务器: smtp.163.com:465

      邮箱账户: yourname@163.com

      授权码: xxxxxx  # 需在邮箱设置中获取

      接收邮箱: admin@company.com,backup@company.com

      注意事项:

      必须开启SMTP服务

      部分邮箱需要使用授权码而非密码

      Gmail需开启"低安全性应用访问"

      监控参数

      检测模式说明

      纯IP检测模式

      • 留空端口字段
      • 使用ICMP协议Ping检测
      • 适用场景:网络设备监控

      组合检测模式

      # 同时验证IP可达性和端口开放状态
      if ping_success and port_open:
          return ONLINE
      参数名默认值说明
      监测间隔10秒两次检测的时间间隔
      超时时间2秒判定离线的超时阈值
      重试次数3次连续失败次数触发告警
      重试间隔5秒失败后的快速重试间隔

      系统效果展示

      实时监控面板

      使用Python实现IP地址和端口状态检测与监控

      使用Python实现IP地址和端口状态检测与监控

      邮件告警示例

      主题:[告警] IP状态变更: 192.168.1.1:80 - 离线

      IP地址: 192.168.1.1

      端口: 80

      备注: 主数据库服务器

      状态变更为: 离线

      检测时间: 2023-08-20 14:30:45

      日志记录

      [2023-08-20 14:30:45] 检测到 192.168.1.1:80 状态变更为离线

      [2023-08-20 14:31:00] 已发送告警邮件给3个收件人

      [2023-08-20 15:00:00] 已发送每日状态报告

      核心源码解析

      1. 状态检测引擎

      def check_target(self, ip, port=None, timeout=2):
          """智能检测IP/端口状态"""
          try:
              if port:  # 端口检测模式
                  with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                      s.settimeout(timeout)
                      s.connect((ip, port))
                      return IPStatus.ONLINE
              else:     # 纯Ping模式
                  # 使用ICMP协议实现Ping
                  sock = socket.socket(socket.AF_INET, 
                                     socket.SOCK_RAW, 
                                     socket.IPPROTO_ICMP)
                  sock.settimeout(timeout)
                  sock.connect((ip, 1))
                  return IPStatus.ONLINE
          except socket.timeout:
              return IPStatus.OFFLINE
          except Exception:
              return IPStatus.UNKNOWN

      2. 多线程监控架构

      class MonitorThread(threading.Thread):
          def __init__(self, app):
              super().__init__(daemon=True)
              self.app = app
              self.running = True
              
          def run(self):
              while self.running:
                  # 1. 执行所有目标的检测
                  # 2. 触发状态变更通知
                  # 3. 智能休眠控制CPU占用
                  time.sleep(self.calculate_sleep_time())
                  
          def stop(self):
              self.running = False
      

      3. 配置持久化实现

      def save_config(self):
          """JSON格式保存所有python配置"""
          config = {
              'targets': [
                  self.tree.item(item, 'values')
                  for item in self.tree.get_children()
              ],
              'email_settings': {
                  'smtp': self.smtp_entry.get(),
                  'user': self.user_entry.get(),
                  'pass': self.pass_entry.get(),
                  'receivers': self.receiver_entry.get()
              }
          }
          with open('config.json', 'w') as f:
              json.dump(config, f, indent=2)

      高级功能扩展建议

      1. 微信/钉钉机器人告警

      # 示例:企业微信机器人API
      import requests
      def send_wechat_alert(message):
          webhook = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx"
          requests.post(webhook, json={"text": {"content": message}})
      

      2. 数据库存储历史记录

      # 使用SQLite记录状态变化
      import sqlite3
      conn = sqlite3.connect('monitor.db')
      c = conn.cursor()
      c.execute('''CREATE TABLE IF NOT EXISTS status_log
                   (ip TEXT, port INT, status TEXT, check_time TIMESTAMP)''')
      

      3. 可视化图表展示

      # 使用Matplotlib绘制可用率曲线
      plt.plot(dates, availability_rates)
      plt.title('月度服务可用率')
      plt.ylabel('百分比(%)')
      plt.savefig('report.png')
      

      总结与资源下载

      项目亮点

      • 工业级可靠性:断线重试+智能休眠机制
      • 开箱即用:无需复杂配置,5分钟快速部署
      • 高度可扩展:代码结构清晰,便于二次开发

      完整源码下载

      import tkinter as tk
      from tkinter import ttk, messagebox, scrolledtext
      import threading
      import time
      import smtplib
      import ipaddress
      import re
      import json
      import socket
      from email.message import EmailMessage
      from enum import Enum
      
      
      class IPStatus(Enum):
          ONLINE = 1
          OFFLINE = 2
          UNKNOWN = 3
      
      
      class IPMonitorApp(tk.Tk):
          def __init__(self):
              super().__init__()
              self.title("IP状态监测系统")
              self.geometry("1300x850")
              
              try:
                  self.iconbitmap('monitor.ico')
              except:
                  pass
      
              self.style = ttk.Style()
              self.configure_style()
              
              self.stop_event = threading.Event()
              self.monitor_thread = None
              self.ip_status = {}
              self.next_report_time = self.calculate_next_report_time()
              
              self.create_widgets()
              self.load_config()
      
          def configure_style(self):
              self.style.theme_create('ipmonitor', parent='alt', settings={
                  'TFrame': {'configure': {'background': '#f5f5f5'}},
                  'TLabelFrame': {
                      'configure': {
                          'background': '#f5f5f5',
                          'foreground': '#1e3d59',
                          'font': ('微软雅黑', 10, 'bold')
                      }
                  },
                  'TLabel': {
                      'configure': {
                          'background': '#f5f5f5',
                          'foreground': '#1e3d59',
                          'font': ('微软雅黑', 10)
                      }
                  },
                  'TButton': {
                      'configure': {
                          'background': '#1e3d59',
                          'foreground': 'white',
                          'font': ('微软雅黑', 10),
                          'padding': 5
                      },
                      'map': {
                          'background': [('active', '#3a6ea5')],
                          'foreground': [('disabled', '#888888')]
                      }
                  },
                  'TCheckbutton': {
                      'configure': {
                          'background': '#f5f5f5',
                          'font': ('微软雅黑', 10)
                      }
                  }
              })
              self.style.theme_use('ipmonitor')
      
          def create_widgets(self):
              # 主容器
              main_frame = ttk.Frame(self, padding=10)
              main_frame.grid(row=0, column=0, sticky="nsew")
              self.grid_columnconfigure(0, weight=1)
              self.grid_rowconfigure(0, weight=1)
              
              # 配置区域
              config_frame = ttk.Frame(main_frame)
              config_frame.grid(row=0, column=0, sticky="nsew")
              
              # IP配置区域
              ip_frame = ttk.LabelFrame(config_frame, text="监测目标配置", padding=10)
              ip_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")
              ip_frame.columnconfigure(0, weight=1)
              
              # IP列表Treeview
              self.ip_tree = ttk.Treeview(
                  ip_frame, 
                  columns=("check", "ip", "port", "remark"), 
                  show="headings",
                  height=8,
                  selectmode="browse"
              )
              self.ip_tree.grid(row=0, column=0, sticky="nsew")
              
              # 配置列
              self.ip_tree.heading("check", text="检测")
              self.ip_tree.heading("ip", text="IP地址")
              self.ip_tree.heading("port", text="端口")
              self.ip_tree.heading("remark", text="备注")
              
              self.ip_tree.column("check", width=50, anchor="center")
              self.ip_tree.column("ip", width=150, anchor="w")
              self.ip_tree.column("port", width=80, anchor="center")
              self.ip_tree.column("remark", width=250, anchor="w")
              
              # 添加复选框
              self.ip_tree.tag_configure("checked", background="#e6f7ff")
              self.ip_tree.tag_configure("unchecked", background="#f5f5f5")
              
              # 编辑区域
              edit_frame = ttk.Frame(ip_frame)
              edit_frame.grid(row=1, column=0, sticky="ew", pady=(5, 0))
              
              ttk.Label(edit_frame, text="IP:").grid(row=0, column=0, sticky="e")
              self.ip_entry = ttk.Entry(edit_frame, width=18)
              self.ip_entry.grid(row=0, column=1, padx=2, sticky="w")
              
              ttk.Label(edit_frame, text="端口:").grid(row=0, column=2, sticky="e")
              self.port_entry = ttk.Entry(edit_frame, width=8)
              self.port_entry.grid(row=0, column=3, padx=2, sticky="w")
              
              ttk.Label(edit_frame, text="备注:").grid(row=0, column=4, sticky="e")
              self.remark_entry = ttk.Entry(edit_frame, width=20)
              self.remark_entry.grid(row=0, column=5, padx=2, sticky="w")
              
              # 操作按钮
              btn_frame = ttk.Frame(edit_frame)
              btn_frame.grid(row=0, column=6, padx=5)
              
              ttk.Button(btn_frame, text="添加", command=self.add_ip).grid(row=0, column=0, padx=2)
              ttk.Button(btn_frame, text="更新", command=self.update_ip).grid(row=0, column=1, padx=2)
              ttk.Button(btn_frame, text="删除", command=self.remove_ip).grid(row=0, column=2, padx=2)
              
              # 邮件配置区域
              mail_frame = ttk.LabelFrame(config_frame, text="邮件通知配置", padding=10)
              mail_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
              
              # 邮件配置项
              mail_config = [
                  ("SMTP服务器:端口", "smtp_server", "smtp.163.com:465"),
                  ("邮箱账户", "email_user", "yourname@163.com"),
                  ("邮箱授权码", "email_pass", ""),
                  ("接收邮箱", "email_receiver", "多个邮箱用逗号分隔")
              ]
              
              for i, (label, attr, ph) in enumerate(mail_config):
                  ttk.Label(mail_frame, text=label).grid(row=i, column=0, sticky="e", pady=3)
                  entry = ttk.Entry(mail_frame, width=25)
                  entry.grid(row=i, column=1, padx=5, pady=3, sticky="ew")
                  setattr(self, attr+"_entry", entry)
                  ttk.Label(mail_frame, text=ph, foreground="#888888").grid(row=i, column=2, sticky="w", padx=5)
              
              # 监控参数
              param_frame = ttk.LabelFrame(config_frame, text="监控参数", padding=10)
              param_frame.grid(row=1, column=0, columnspan=2, sticky="ew", pady=(5, 0))
              
              params = [
                  ("监测间隔(秒):", "interval", "10"),
                  ("超时(秒):", "timeout", "2"),
                  ("重试次数:", "retry", "3"),
                  ("重试间隔(秒):", "retry_interval", "5")
              ]
              
              for i, (label, attr, default) in enumerate(params):
                  ttk.Label(param_frame, text=label).grid(row=0, column=i*2, sticky="e")
                  entry = ttk.Entry(param_frame, width=8)
                  entry.insert(0, default)
                  entry.grid(row=0, column=i*2+1, padx=5, sticky="w")
                  setattr(self, attr+"_entry", entry)
              
              # 控制按钮
              ctrl_frame = ttk.Frame(config_frame)
              ctrl_frame.grid(row=2, column=0, columnspan=2, pady=(10, 0), sticky="e")
              
              ttk.Button(ctrl_frame, text="开始监测", command=self.start_monitoring).grid(row=0, column=0, padx=5)
              ttk.Button(ctrl_frame, text="停止监测", command=self.stop_monitoring, state=tk.DISABLED).grid(row=0, column=1, padx=5)
              self.start_btn = ctrl_frame.grid_slaves(row=0, column=0)[0]
              self.stop_btn = ctrl_frame.grid_slaves(row=0, column=1)[0]
              
              ttk.Button(ctrl_frame, text="保存配置", command=self.save_config).grid(row=0, column=2, padx=5)
              ttk.Button(ctrl_frame, text="清空日志", command=self.clear_logs).grid(row=0, column=3, padx=5)
              
              # 状态监控区域
              status_frame = ttk.LabelFrame(main_frame, text="状态监控", padding=10)
              status_frame.grid(row=1, column=0, sticky="nsew", pady=(5, 0))
              
              self.status_tree = ttk.Treeview(
                  status_frame, 
                  columns=("ip", "port", "remark", "status", "last_check"), 
                  show="headings",
                  height=8
              )
              self.status_tree.grid(row=0, column=0, sticky="nsew")
              
              # 状态列配置
              for col, width in [("ip", 150), ("port", 80), ("remark", 200), ("status", 100), ("last_check", 180)]:
                  self.status_tree.column(col, width=width, anchor="center")
                  self.status_tree.heading(col, text=col)
              
              # 日志区域
              log_frame = ttk.LabelFrame(main_frame, text="系统日志", padding=10)
              log_frame.grid(row=2, column=0, sticky="nsew", pady=(5, 0))
              
              self.log_text = scrolledtext.ScrolledText(
                  log_frame, 
                  wrap=tk.WORD, 
                  font=('微软雅黑', 9), 
                  bg='white', 
                  fg='#333333',
                  height=10
              )
              self.log_text.pack(fill=tk.BOTH, expand=True)
              
              # 配置权重
              main_frame.columnconfigure(0, weight=1)
              main_frame.rowconfigure(1, weight=1)
              main_frame.rowconfigure(2, weight=1)
              
              config_frame.columnconfigure(1, weight=1)
              
              ip_frame.rowconfigure(0, weight=1)
              ip_frame.columnconfigure(0, weight=1)
              
              status_frame.rowconfigure(0, weight=1)
              status_frame.columnconfigure(0, weight=1)
              
              # 绑定事件
              self.ip_tree.bind("<Button-1>", self.on_tree_click)
              self.ip_tree.bind("<Double-1>", self.on_tree_double_click)
      
          def on_tree_click(self, event):
              """处理树状图点击事件"""
              region = self.ip_tree.identify("region", event.x, event.y)
              if region == "cell":
                  column = self.ip_tree.identify_column(event.x)
                  item = self.ip_tree.identify_row(event.y)
                  if column == "#1":  # 复选框列
                      current_val = self.ip_tree.item(item, "values")[0]
                      new_val = "✓" if current_val == "" else ""
                      values = list(self.ip_tree.item(item, "values"))
                      values[0] = new_val
                      self.ip_tree.item(item, values=values, tags=("checked" if new_val else "unchecked"))
      
          def on_tree_double_click(self, event):
              """处理树状图双击事件"""
              item = self.ip_tree.selection()
              if item:
                  values = self.ip_tree.item(item, "values")
                  self.ip_entry.delete(0, tk.END)
                  self.ip_entry.insert(0, values[1])
                  self.port_entry.delete(0, tk.END)
                  self.port_entry.insert(0, values[2])
                  self.remark_entry.delete(0, tk.END)
                  self.remark_entry.insert(0, values[3])
      
          def add_ip(self):
              """添加IP到列表"""
              ip = self.ip_entry.get().strip()
              port = self.port_entry.get().strip()
              remark = self.remark_entry.get().strip()
              
              if not ip:
                  messagebox.showwarning("错误", "IP地址不能为空")
                  return
                  
              if not self.is_valid_ip(ip):
                  messagebox.showwarning("错误", "无效的IP地址格式")
                  return
                  
              if port:
                  try:
                      port = int(port)
                      if not (1 <= port <= 65535):
                          messagebox.showwarning("错误", "端口必须在1-65535之间")
                          return
                  except ValueError:
                      messagebox.showwarning("错误", "端口必须是数字")
                      return
              
              self.ip_tree.insert("", tk.END, values=("✓", ip, port, remark), tags=("checked",))
              
              # 清空输入框
              self.ip_entry.delete(0, tk.END)
              self.port_entry.delete(0, tk.END)
              self.remark_entry.delete(0, tk.END)
      
          def update_ip(self):
              """更新选中的IP"""
              item = self.ip_tree.selection()
              if not item:
                  messagebox.showwarning("错误", "请先选择要更新的项")
                  return
                  
              ip = self.ip_entry.get().strip()
              port = self.port_entry.get().strip()
              remark = self.remark_entry.get().strip()
              
              if not ip:
                  messagebox.showwarning("错误", "IP地址不能为空")
                  return
                  
              if not sejslf.is_valid_ip(ip):
                  messagebox.showwarning("错误", "无效的IP地址格式")
                  return
                  
              if port:
                  try:
                      port = int(port)
                      if not (1 <= port <= 65535):
                          messagebox.showwarning("错误", "端口必须在1-65535之间")
                          return
                  except ValueError:
                      messagebox.showwarning("错误", "端口必须是数字")
                      return
              
              # 保留原来的复选框状态
              current_check = self.ip_tree.item(item, "values")[0]
              self.ip_tree.item(item, values=(current_check, ip, port, remark))
      
          def remove_ip(self):
              """删除选中的IP"""
              items = self.ip_tree.selection()
              if not items:
                  messagebox.showwarning("错误", "请先选择要删除的项")
                  return
                  
              for item in items:
                  self.ip_tree.delete(item)
      
          def is_valid_ip(self, ip):
              """验证IP地址格式"""
              try:
                  ipaddress.ip_address(ip)
                  return True
              except ValueError:
                  return False
      
          def start_monitoring(self):
              """开始监控"""
              if not self.validate_inputs():
                  return
                  
              # 获取要监控的目标
              targets = []
              for item in self.ip_tree.get_children():
                  check, ip, port, remark = self.ip_tree.item(item, "values")
                  if check == "✓":  # 只监控选中的项
                      targets.append((ip, int(port) if port else None, remark))
              
              if not targets:
                  messagebox.showwarning("错误", "没有选中要监控的目标")
                  return
                  
              # 初始化状态字典
              self.ip_status = {}
              for ip, port, remark in targets:
                  self.ip_status[(ip, port)] = {
                      "status": IPStatus.UNKNOWN,
                      "last_notified": None,
                      "remark": remark,
                      "retries": 0,
                      "next_retry_time": 0,
                      "last_check": "从未检测"
                  }
              
              # 更新状态显示
              self.update_status_display()
              
              # 更新按钮状态
              self.start_btn.config(state=tk.DISABLED)
              self.stop_btn.config(state=tk.NORMAL)
              self.stop_event.clear()
              
              # 获取监控参数
              interval = int(self.interval_entry.get())
              timeout = int(self.timeout_entry.get())
              max_retries = int(self.retry_entry.get())
              retry_interval = int(self.retry_interval_entry.get())
              
              # 创建监控线程
              self.monitor_thread = threading.Thread(
                  target=self.monitor_targets,
                  args=(interval, timeout, max_retries, retry_interval),
                  daemon=True
              )
              self.monitor_thread.start()
              
              self.log_message("监控已启动")
      
          def stop_monitoring(self):
              """停止监控"""
              self.stop_event.set()
              self.start_btn.config(state=tk.NORMAL)
              self.stop_btn.config(state=tk.DISABLED)
              self.log_message("监控已停止")
      
          def monitor_targets(self, interval, timeout, max_retries, retry_interval):
              """监控主循环"""
              while not self.stop_event.is_set():
                  current_time = time.time()
      
                  # 每日报告检查
                  if current_time >= self.next_report_time:
                      self.send_daily_report()
                      self.next_report_time = self.calculate_next_report_time()
      
                  # 检查所有目标
                  for (ip, port), data in list(self.ip_status.items()):
                      if self.stop_event.is_set():
                          break
      
                      if current_time < data['next_retry_time']:
                          continue
      
                      # 执行检测
                      if port:  # 如果有端口则检测端口
                          current_status = self.check_port(ip, port, timeout)
                      else:  # 否则只ping IP
                          current_status = self.ping_ip(ip, timeout)
                      
                      previous_status = data["status"]
                      data["last_check"] = time.strftime("%Y-%m-%d %H:%M:%S")
                      
                      # 处理状态变化
                      if current_status == IPStatus.OFFLINE:
                          if data['retries'] < max_retries:
                              data['retries'] += 1
                              data['next_retry_time'] = current_time + retry_interval
                              self.log_message(f"{ip}:{port or '无端口'} 检测失败,正在进行第 {data['retries']} 次重试...")
                          else:
                              if previous_status != current_status:
                                  self.send_alert(ip, port, data['remark'], current_status)
                              data["status"] = current_status
                              data['retries'] = 0
                              data['next_retry_time'] = current_time + interval
                      else:
                          if previous_status != current_status:
                              self.send_alert(ip, port, data['remark'], current_status)
                          data["status"] = current_status
                          data['retries'] = 0
                          data['next_retry_time'] = current_time + interval
                      
                      # 更新UI
                      self.after(0, self.update_status_display)
      
                  # 计算睡眠时间
                  next_checks = [data['next_retry_time'] for data in self.ip_status.values()]
                  next_check = min(next_checks) if next_checks else current_time + interval
                  sleep_time = max(min(next_check - time.time(), interval), 0.1)
                  time.sleep(sleep_time)
      
          def ping_ip(self, ip, timeout):
              """Ping IP地址"""
              try:
                  # 使用socket创建ICMP ping
                  sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
                  sock.settimeout(timeout)
                  sock.connect((ip, 1))  # 端口号不重要
                  return IPStatus.ONLINE
              except socket.timeout:
                  return IPStatus.OFFLINE
              except Exception:
                  return IPStatus.UNKNOWN
              finally:
                  try:
                      sock.close()
                  except:
                      pass
      
          def check_port(self, ip, port, timeout):
              """检查端口"""
              try:
                  with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                      s.settimeout(timeout)
                      s.connect((ip, port))
                      return IPStatus.ONLINE
              except socket.timeout:
                  return IPStatus.OFFLINE
              except ConnectionRefusedError:
                  return IPStatus.OFFLINE
              except Exception as e:
                  self.log_message(f"检测异常: {str(e)}")
                  return IPStatus.UNKNOWN
      
          def update_status_display(self):
              """更新状态显示"""
              # 清空现有显示
              for item in self.status_tree.get_children():
                  self.status_tree.delete(item)
              
              # 添加新状态
              for (ip, port), data in self.ip_status.items():
                  status = data["status"]
                  status_text = "在线" if status == IPStatus.ONLINE else "离线" if status == IPStatus.OFFLINE else "未知"
                  status_color = "green" if status == IPStatus.ONLINE else "red" if status == IPStatus.OFFLINE else "orange"
                  
                YnAexkGYwp  item = self.status_tree.insert("", tk.END, values=(
                      ip, 
                      port if port else "无", 
                      data["remark"], 
                      status_text, 
                      data["last_check"]
                  ))
                  self.status_tree.tag_configure(status_color, foreground=status_color)
                  self.status_tree.item(item, tags=(status_color,))
      
          def send_alert(self, ip, port, remark, status):
              """发送状态变更通知"""
              status_text = "在线" if status == IPStatus.ONLINE else "离线"
              subject = f"IP状态变更: {remark or ip}:{port if port else '无端口'} - {status_text}"
              
              content = "\n".join([
                  f"IP地址: {ip}",
                  f"端口: {port if port else '无'}",
                  f"备注: {remark or '无备注信息'}",
                  f"状态变更为: {status_text}",
                  f"检测时间: {time.strftime('%Y-%m-%d %H:%M:%S')}"
              ])
              
              try:
                  self.send_email(subject, content)
                  self.log_message(f"已发送状态变更通知: {ip}:{port if port else '无'} -> {status_text}")
              except Exception as e:
                  self.log_message(f"邮件发送失败: {str(e)}")
      
          def send_email(self, subject, content):
              """发送邮件"""
              smtp_server = self.smtp_server_entry.get().strip()
              user = self.email_user_entry.get().strip()
              password = self.email_pass_entry.get().strip()
              receiver_str = self.email_receiver_entry.get().strip()
              
              if not all([smtp_server, user, password, receiver_str]):
                  raise Exception("邮件配置不完整")
              
              receivers = [addr.strip() for addr in receiver_str.split(',') if addr.strip()]
              
              msg = EmailMessage()
              msg['Subject'] = subject
              msg['From'] = user
              msg['To'] = receivers
              msg.set_content(content)
              
              server, port = smtp_server.split(":")
              port = int(port)
              
              try:
                  if port == 465:
                      with smtplib.SMTP_SSL(server, port) as smtp:
                          smtp.login(user, password)
                          smtp.send_message(msg)
                  else:
                      with smtplib.SMTP(server, port) as smtp:
                          smtp.starttls()
                          smtp.login(user, password)
                          smtp.send_message(msg)
              except Exception as e:
                  raise Exception(f"SMTP错误: {str(e)}")
      
          def send_daily_report(self):
              """发送每日报告"""
              report_lines = []
              for (ip, port), data in self.ip_status.items():
                  status = data["status"]
                  status_text = "在线" if status == IPStatus.ONLINE else "离线" if status == IPStatus.OFFLINE else "未知"
                  report_lines.append(f"IP: {ip}:{port if port else '无'} 备注: {data['remark']} 状态: {status_text}")
              
              current_time = time.strftime("%Y-%m-%d %H:%M:%S")
              content = "每日IP状态报告\n\n" + "\n".join(report_lines) + f"\n\n报告时间: {current_time}"
              subject = "每日IP状态报告"
              
              try:
                  self.send_email(subject, content)
                  self.log_message("已发送每日状态报告")
              except Exception as e:
                  self.log_message(f"发送每日报告失败: {str(e)}")
      
          def calculate_next_report_time(self):
              """计算下次报告时间"""
              now = time.localtime()
              today_10am = time.mktime((now.tm_year, now.tm_mon, now.tm_mday, 10, 0, 0, 0, 0, -1))
              current_time = time.time()
              
              return today_10am + 86400 if current_time >= today_10am else today_10am
      
          def log_message(self, message):
              """记录日志"""
              timesta编程mp = time.strftime("%Y-%m-%d %H:%M:%S")
              log_line = f"[{timestamp}] {message}\n"
              
              self.log_text.insert(tk.END, log_line)
              self.log_text.see(tk.END)
              
              with open("monitor.log", "a", encandroidoding="utf-8") as f:
                  f.write(log_line)
      
          def clear_logs(self):
              """清空日志"""
              self.log_text.delete(1.0, tk.END)
      
          def validate_inputs(self):
              """验证输入"""
              # 检查邮件配置
              smtp_server = self.smtp_server_entry.get().strip()
              user = self.email_user_entry.get().strip()
              password = self.email_pass_entry.get().strip()
              receiver_str = self.email_receiver_entry.get().strip()
              
              if not smtp_server:
                  messagebox.showwarning("错误", "SMTP服务器不能为空")
                  return False
                  
              if ":" not in smtp_server:
                  messagebox.showwarning("错误", "SMTP服务器格式应为 host:port")
                  return False
                  
              if not user:
                  messagebox.showwarning("错误", "邮箱账户不能为空")
                  return False
                  
              if not password:
                  messagebox.showwarning("错误", "邮箱授权码不能为空")
                  return False
                  
              if not receiver_str:
                  messagebox.showwarning("错误", "接收邮箱不能为空")
                  return False
                  
              # 验证监控参数
              try:
                  interval = int(self.interval_entry.get())
                  timeout = int(self.timeout_entry.get())
                  retries = int(self.retry_entry.get())
                  retry_interval = int(self.retry_interval_entry.get())
                  
                  if interval < 5:
                      messagebox.showwarning("错误", "监测间隔不能小于5秒")
                      return False
                      
                  if timeout < 1:
                      messagebox.showwarning("错误", "超时时间不能小于1秒")
                      return False
                      
                  if retries < 0:
                      messagebox.showwarning("错误", "重试次数不能为负数")
                      return False
                      
                  if retry_interval < 1:
                      messagebox.showwarning("错误", "重试间隔不能小于1秒")
                      return False
                      
              except ValueError:
                  messagebox.showwarning("错误", "请输入有效的数字")
                  return False
                  
              return True
      
          def save_config(self):
              """保存配置"""
              config = {
                  "targets": [
                      self.ip_tree.item(item, "values")
                      for item in self.ip_tree.get_children()
                  ],
                  "smtp_server": self.smtp_server_entry.get(),
                  "email_user": self.email_user_entry.get(),
                  "email_pass": self.email_pass_entry.get(),
                  "email_receiver": self.email_receiver_entry.get(),
                  "interval": self.interval_entry.get(),
                  "timeout": self.timeout_entry.get(),
                  "retry": self.retry_entry.get(),
                  "retry_interval": self.retry_interval_entry.get()
              }
              
              try:
                  with open("config.json", "w", encoding="utf-8") as f:
                      json.dump(config, f, indent=2)
                  self.log_message("配置已保存")
                  messagebox.showinfo("成功", "配置已保存到config.json")
              except Exception as e:
                  self.log_message(f"保存配置失败: {str(e)}")
                  messagebox.showerror("错误", f"保存配置失败: {str(e)}")
      
          def load_config(self):
              """加载配置"""
              try:
                  with open("config.json", encoding="utf-8") as f:
                      config = json.load(f)
                  
                  # 加载目标
                  for item in self.ip_tree.get_children():
                      self.ip_tree.delete(item)
                      
                  for values in config.get("targets", []):
                      self.ip_tree.insert("", tk.END, values=values, tags=("checked" if values[0] == "✓" else "unchecked"))
                  
                  # 加载邮件配置
                  self.smtp_server_entry.delete(0, tk.END)
                  self.smtp_server_entry.insert(0, config.get("smtp_server", ""))
                  
                  self.email_user_entry.delete(0, tk.END)
                  self.email_user_entry.insert(0, config.get("email_user", ""))
                  
                  self.email_pass_entry.delete(0, tk.END)
                  self.email_pass_entry.insert(0, config.get("email_pass", ""))
                  
                  self.email_receiver_entry.delete(0, tk.END)
                  self.email_receiver_entry.insert(0, config.get("email_receiver", ""))
                  
                  # 加载监控参数
                  self.interval_entry.delete(0, tk.END)
                  self.interval_entry.insert(0, config.get("interval", "10"))
                  
                  self.timeout_entry.delete(0, tk.END)
                  self.timeout_entry.insert(0, config.get("timeout", "2"))
                  
                  self.retry_entry.delete(0, tk.END)
                  self.retry_entry.insert(0, config.get("retry", "3"))
                  
                  self.retry_interval_entry.delete(0, tk.END)
                  self.retry_interval_entry.insert(0, config.get("retry_interval", "5"))
                  
                  self.log_message("配置已加载")
              except FileNotFoundError:
                  self.log_message("未找到配置文件,使用默认配置")
              except Exception as e:
                  self.log_message(f"加载配置失败: {str(e)}")
      
      
      if __name__ == "__main__":
          app = IPMonitorApp()
          app.mainloop()
      

      适合人群:网络管理员、运维工程师、Python中级开发者

      常见问题解答

      Q:如何修改每日报告发送时间?

      A:修改calculate_next_report_time()方法中的小时数(默认10:00)

      Q:支持监控IPv6地址吗?

      A:当前版本需要稍作修改,建议使用ipaddress库进行验证

      Q:最大支持监控多少个IP?

      A:理论上无限制,但建议不超过100个以保证性能

      到此这篇关于使用Python实现IP地址和端口状态检测与监控的文章就介绍到这了,更多相关Python IP地址检测与监控内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新开发

      开发排行榜