环境介绍
开发Python环境 Python 3.9.14
,具体支持到什么版本未测试!
我运行在 PVE 的 CT 里面,CentOS 9
功能
家里的交换机,不晓得什么问题,有的时候就是丢包,正当来测试吧,又好了。不确定什么时候又有问题了。
还有一些个主机时不时的失联,所以就写了一个小工具,不间断的 ping,然后记录日志文件查看 ping 不通的时间,丢包率等等。
使用代码
进入 ping.py
目录,运行
python3 ping.py
日志输出
2024-05-10 10:00:00,000 - INFO - 日志文件名: /root/ping/network_monitor_2024-05-10_10-00-00.log 2024-05-10 10:00:00,000 - INFO - 程序开始运行时间: 2024-05-10 10:00:00 2024-05-10 10:00:05,000 - WARNING - 主机 10.0.0.1 不可达 - 开始时间: 2024-05-10 10:00:05 2024-05-10 10:00:10,000 - WARNING - 主机 10.0.0.1 已恢复 - 不可达持续时间: 5.00 秒 2024-05-10 10:01:00,000 - INFO - 程序结束运行时间: 2024-05-10 10:01:00 2024-05-10 10:01:00,000 - INFO - 程序总共运行时间: 60.00 秒 2024-05-10 10:01:00,000 - INFO - 总共 ping 次数: 12 2024-05-10 10:01:00,000 - INFO - 总共丢包次数: 1 2024-05-10 10:01:00,000 - INFO - 总丢包率: 8.33% 2024-05-10 10:01:00,000 - INFO - 平均响应时间: 0.15 毫秒 2024-05-10 10:01:00,000 - WARNING - 程序异常退出: 收到 SIGINT 信号 (Ctrl+C)
完整 Python 代码:
import subprocess import time import re import logging import atexit import signal import sys import os print("V2RaySSR 综合网原创") print("访问 https://www.v2rayssr.com 获取更多资讯和教程") # 定义主机名和日志文件路径为变量 HOST = "10.0.0.1" LOG_DIR = "/root/ping" # 获取程序开始时间并生成日志文件名 start_time = time.time() start_time_str = time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(start_time)) log_filename = os.path.join(LOG_DIR, f'network_monitor_{start_time_str}.log') # 检查目录是否存在,如果不存在则创建 if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) # 设置日志文件 logging.basicConfig(filename=log_filename, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 测试日志记录功能 try: logging.info(f"日志文件名: {log_filename}") except Exception as e: print(f"无法写入日志文件: {e}") sys.exit(1) # 初始化统计信息 total_pings = 0 total_lost_pings = 0 total_response_time = 0.0 log_exit_time_called = False # 标志位,确保 log_exit_time 只被调用一次 def ping(host): """ Ping 指定的主机并返回 ping 命令的输出。 """ process = subprocess.Popen(['ping', '-c', '1', host], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() if process.returncode == 0: return stdout.decode('utf-8') else: return None def parse_ping_output(output): """ 解析 ping 命令的输出,提取丢包率和响应时间。 """ loss_match = re.search(r'(\d+)% packet loss', output) if loss_match: packet_loss = int(loss_match.group(1)) else: packet_loss = 100 time_match = re.search(r'time=(\d+\.\d+) ms', output) if time_match: ping_time = float(time_match.group(1)) else: ping_time = None return packet_loss, ping_time def monitor_network(host, interval=5): """ 通过定期 ping 指定的主机来持续监控网络。 """ global total_pings, total_lost_pings, total_response_time last_failure_time = None # 记录上一次失败的时间 failure_duration = 0 # 记录持续失败的时间 while True: output = ping(host) total_pings += 1 if output: packet_loss, ping_time = parse_ping_output(output) if packet_loss == 100: total_lost_pings += 1 if last_failure_time is None: last_failure_time = time.time() logging.warning(f"主机 {host} 不可达 - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}") print(f"主机 {host} 不可达 - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}") failure_duration = time.time() - last_failure_time else: total_response_time += ping_time if last_failure_time is not None: logging.warning(f"主机 {host} 已恢复 - 不可达持续时间: {failure_duration:.2f} 秒") print(f"主机 {host} 已恢复 - 不可达持续时间: {failure_duration:.2f} 秒") last_failure_time = None failure_duration = 0 else: total_lost_pings += 1 if last_failure_time is None: last_failure_time = time.time() logging.error(f"无法 ping 通主机 {host} - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}") print(f"无法 ping 通主机 {host} - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}") failure_duration = time.time() - last_failure_time time.sleep(interval) def log_exit_time(exit_reason=None): """ 记录程序退出的时间和统计信息。 """ global log_exit_time_called if log_exit_time_called: return log_exit_time_called = True exit_time = time.time() exit_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(exit_time)) total_run_time = exit_time - start_time total_lost_rate = (total_lost_pings / total_pings) * 100 if total_pings > 0 else 0 avg_response_time = (total_response_time / (total_pings - total_lost_pings)) if (total_pings - total_lost_pings) > 0 else 0 logging.info(f"程序结束运行时间: {exit_time_str}") logging.info(f"程序总共运行时间: {total_run_time:.2f} 秒") logging.info(f"总共 ping 次数: {total_pings}") logging.info(f"总共丢包次数: {total_lost_pings}") logging.info(f"总丢包率: {total_lost_rate:.2f}%") logging.info(f"平均响应时间: {avg_response_time:.2f} 毫秒") if exit_reason: logging.warning(f"程序异常退出: {exit_reason}") print(f"程序结束运行时间: {exit_time_str}") print(f"程序总共运行时间: {total_run_time:.2f} 秒") print(f"总共 ping 次数: {total_pings}") print(f"总共丢包次数: {total_lost_pings}") print(f"总丢包率: {total_lost_rate:.2f}%") print(f"平均响应时间: {avg_response_time:.2f} 毫秒") # 确保所有日志记录被刷新到磁盘 logging.shutdown() # 注册程序正常退出时的回调函数 atexit.register(log_exit_time) # 捕捉终止信号(例如 Ctrl+C 和系统重启) def signal_handler(sig, frame): exit_reasons = { signal.SIGINT: "收到 SIGINT 信号 (Ctrl+C)", signal.SIGTERM: "收到 SIGTERM 信号 (系统关机或重启)", signal.SIGHUP: "收到 SIGHUP 信号 (SSH 会话关闭)" } reason = exit_reasons.get(sig, "收到未知信号") log_exit_time(reason) sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # 捕捉 SIGHUP 信号(SSH 关闭引起的挂起信号) signal.signal(signal.SIGHUP, signal_handler) if __name__ == "__main__": try: monitor_network(HOST) except Exception as e: logging.error(f"程序异常退出: {e}") log_exit_time(f"程序异常退出: {e}") raise