#!/usr/bin/env python3 """ CDN 测速工具 v2.1 - 测试多个服务地址的连接速度 支持: HTTP延迟、TCP Ping、Traceroute、下载速度测试、JSON导出 """ import urllib.request import urllib.error import time import ssl import socket import statistics import subprocess import struct import sys import json import os from datetime import datetime from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor, as_completed # 测试地址列表 ENDPOINTS = [ { "name": "大陆优化CDN", "url": "https://c.cspok.cn", "desc": "针对中国大陆地区优化的后端服务" }, { "name": "主站直连", "url": "https://anyrouter.top", "desc": "主站后端直连服务" }, { "name": "大陆网络优化-宁波", "url": "https://pmpjfbhq.cn-nb1.rainapp.top", "desc": "针对中国大陆地区优化的后端服务" }, { "name": "大陆网络优化-上海", "url": "https://a-ocnfniawgw.cn-shanghai.fcapp.run", "desc": "针对中国大陆地区优化的后端服务" } ] # 测试参数 TEST_COUNT = 3 TCP_PING_COUNT = 5 TIMEOUT = 10 TRACEROUTE_MAX_HOPS = 20 def create_ssl_context(): """创建SSL上下文""" ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx def parse_url(url): """解析URL获取主机名和端口""" parsed = urlparse(url) host = parsed.hostname port = parsed.port or (443 if parsed.scheme == 'https' else 80) return host, port def resolve_host(hostname): """DNS解析""" try: ip = socket.gethostbyname(hostname) return {"success": True, "ip": ip} except socket.gaierror as e: return {"success": False, "error": str(e)} def tcp_ping(host, port, timeout=5): """TCP Ping - 测试TCP连接延迟""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) start = time.time() sock.connect((host, port)) elapsed = (time.time() - start) * 1000 # ms sock.close() return {"success": True, "latency": elapsed} except socket.timeout: return {"success": False, "error": "连接超时"} except socket.error as e: return {"success": False, "error": str(e)} def tcp_ping_test(host, port, count=TCP_PING_COUNT): """执行多次TCP Ping测试""" results = [] for i in range(count): result = tcp_ping(host, port) results.append(result) time.sleep(0.1) # 短暂间隔 return results def traceroute_tcp(host, port=443, max_hops=TRACEROUTE_MAX_HOPS, timeout=2): """ TCP Traceroute 实现 使用递增的TTL值来追踪路由路径 """ results = [] # 先解析目标IP try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪路由到 {host} ({dest_ip}), 最大 {max_hops} 跳:") print(f" {'跳数':<4} {'IP地址':<18} {'延迟':<12} {'主机名'}") print(" " + "-" * 55) for ttl in range(1, max_hops + 1): # 创建原始socket(需要root权限)或使用UDP hop_result = {"hop": ttl, "ip": None, "latency": None, "hostname": None} try: # 使用UDP进行traceroute(不需要root) recv_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) recv_socket.settimeout(timeout) send_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) send_socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl) start = time.time() send_socket.sendto(b'', (dest_ip, 33434 + ttl)) try: data, addr = recv_socket.recvfrom(1024) elapsed = (time.time() - start) * 1000 hop_ip = addr[0] # 尝试反向DNS解析 try: hostname = socket.gethostbyaddr(hop_ip)[0] except: hostname = hop_ip hop_result["ip"] = hop_ip hop_result["latency"] = elapsed hop_result["hostname"] = hostname print(f" {ttl:<4} {hop_ip:<18} {elapsed:.2f} ms {hostname}") # 如果到达目标 if hop_ip == dest_ip: results.append(hop_result) break except socket.timeout: hop_result["error"] = "超时" print(f" {ttl:<4} {'*':<18} {'*':<12} 请求超时") recv_socket.close() send_socket.close() except PermissionError: # 没有root权限,使用系统traceroute命令 return traceroute_system(host, max_hops) except Exception as e: hop_result["error"] = str(e) # 尝试使用系统命令 return traceroute_system(host, max_hops) results.append(hop_result) return results def traceroute_system(host, max_hops=TRACEROUTE_MAX_HOPS): """使用系统traceroute命令""" results = [] # 检测可用的traceroute命令 traceroute_cmd = None for cmd in ['traceroute', 'tracepath', 'mtr']: try: subprocess.run([cmd, '--version'], capture_output=True, timeout=2) traceroute_cmd = cmd break except: continue if not traceroute_cmd: print(" 未找到traceroute工具,尝试安装...") return [{"hop": 0, "error": "未找到traceroute工具"}] try: if traceroute_cmd == 'traceroute': cmd = ['traceroute', '-n', '-m', str(max_hops), '-w', '2', host] elif traceroute_cmd == 'tracepath': cmd = ['tracepath', '-n', '-m', str(max_hops), host] else: # mtr cmd = ['mtr', '-n', '-c', '1', '--report', host] print(f" 执行: {' '.join(cmd)}") print() process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # 实时输出 for line in process.stdout: print(f" {line.rstrip()}") # 解析输出 parts = line.split() if parts and parts[0].isdigit(): hop_num = int(parts[0]) results.append({"hop": hop_num, "raw": line.strip()}) process.wait(timeout=60) except subprocess.TimeoutExpired: print(" traceroute 超时") except Exception as e: print(f" traceroute 错误: {e}") return results def traceroute_simple(host, max_hops=TRACEROUTE_MAX_HOPS): """ 简化版TCP traceroute,使用connect超时方式 """ results = [] try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪到 {host} ({dest_ip}):") print(f" {'跳数':<4} {'状态':<15} {'延迟'}") print(" " + "-" * 40) # 尝试直接连接测试 for ttl in range(1, max_hops + 1): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) sock.settimeout(2) start = time.time() try: sock.connect((dest_ip, 443)) elapsed = (time.time() - start) * 1000 print(f" {ttl:<4} {'到达目标':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "到达", "latency": elapsed}) sock.close() break except socket.timeout: print(f" {ttl:<4} {'超时':<15} *") results.append({"hop": ttl, "status": "超时"}) except socket.error as e: elapsed = (time.time() - start) * 1000 if e.errno == 111: # Connection refused - 通常表示到达 print(f" {ttl:<4} {'中间节点':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "中间节点", "latency": elapsed}) else: print(f" {ttl:<4} {'TTL过期':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "TTL过期", "latency": elapsed}) sock.close() except Exception as e: print(f" {ttl:<4} 错误: {e}") results.append({"hop": ttl, "error": str(e)}) return results def test_latency(url, timeout=TIMEOUT): """测试HTTP延迟""" ctx = create_ssl_context() start = time.time() try: req = urllib.request.Request(url, method='HEAD') req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') with urllib.request.urlopen(req, timeout=timeout, context=ctx) as response: status = response.status elapsed = (time.time() - start) * 1000 return {"success": True, "latency": elapsed, "status": status} except urllib.error.HTTPError as e: elapsed = (time.time() - start) * 1000 return {"success": True, "latency": elapsed, "status": e.code} except Exception as e: return {"success": False, "error": str(e)} def test_download_speed(url, timeout=TIMEOUT): """测试下载速度""" ctx = create_ssl_context() start = time.time() try: req = urllib.request.Request(url) req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') with urllib.request.urlopen(req, timeout=timeout, context=ctx) as response: data = response.read() size = len(data) elapsed = time.time() - start if elapsed > 0: speed = size / elapsed return {"success": True, "size": size, "time": elapsed, "speed": speed} return {"success": True, "size": size, "time": 0, "speed": 0} except Exception as e: return {"success": False, "error": str(e)} def format_speed(bps): """格式化速度""" if bps >= 1024 * 1024: return f"{bps / (1024 * 1024):.2f} MB/s" elif bps >= 1024: return f"{bps / 1024:.2f} KB/s" else: return f"{bps:.2f} B/s" def format_size(bytes_size): """格式化大小""" if bytes_size >= 1024 * 1024: return f"{bytes_size / (1024 * 1024):.2f} MB" elif bytes_size >= 1024: return f"{bytes_size / 1024:.2f} KB" else: return f"{bytes_size} B" def test_endpoint(endpoint, enable_traceroute=True): """测试单个端点""" name = endpoint["name"] url = endpoint["url"] desc = endpoint["desc"] host, port = parse_url(url) print(f"\n{'='*65}") print(f"📡 {name}") print(f" URL: {url}") print(f" 描述: {desc}") print(f"{'='*65}") # 结果数据结构 result_data = { "name": name, "url": url, "description": desc, "host": host, "port": port, "test_time": datetime.now().isoformat(), "dns": {}, "tcp_ping": {}, "traceroute": [], "http": {}, "download": {}, "summary": {} } # DNS 解析 print(f"\n🔍 DNS解析:") dns_result = resolve_host(host) if dns_result["success"]: ip = dns_result["ip"] print(f" {host} -> {ip}") result_data["dns"] = {"success": True, "ip": ip, "hostname": host} else: print(f" ❌ DNS解析失败: {dns_result['error']}") result_data["dns"] = {"success": False, "error": dns_result['error']} result_data["summary"] = { "avg_latency": None, "tcp_latency": None, "speed": None, "status": "DNS解析失败" } return result_data # TCP Ping 测试 print(f"\n🏓 TCP Ping 测试 (端口 {port}, 共{TCP_PING_COUNT}次):") tcp_results = tcp_ping_test(ip, port, TCP_PING_COUNT) tcp_latencies = [] tcp_details = [] for i, result in enumerate(tcp_results, 1): if result["success"]: lat = result["latency"] tcp_latencies.append(lat) tcp_details.append({"seq": i, "latency_ms": round(lat, 2), "success": True}) print(f" 第{i}次: {lat:.2f} ms") else: tcp_details.append({"seq": i, "success": False, "error": result['error']}) print(f" 第{i}次: 失败 - {result['error']}") result_data["tcp_ping"]["details"] = tcp_details if tcp_latencies: tcp_avg = statistics.mean(tcp_latencies) tcp_min = min(tcp_latencies) tcp_max = max(tcp_latencies) tcp_jitter = statistics.stdev(tcp_latencies) if len(tcp_latencies) > 1 else 0 tcp_loss = (TCP_PING_COUNT - len(tcp_latencies)) / TCP_PING_COUNT * 100 print(f"\n 📊 TCP Ping 统计:") print(f" 平均: {tcp_avg:.2f} ms") print(f" 最小: {tcp_min:.2f} ms") print(f" 最大: {tcp_max:.2f} ms") print(f" 抖动: {tcp_jitter:.2f} ms") print(f" 丢包: {tcp_loss:.1f}%") result_data["tcp_ping"]["statistics"] = { "avg_ms": round(tcp_avg, 2), "min_ms": round(tcp_min, 2), "max_ms": round(tcp_max, 2), "jitter_ms": round(tcp_jitter, 2), "packet_loss_percent": round(tcp_loss, 1), "success_count": len(tcp_latencies), "total_count": TCP_PING_COUNT } else: tcp_avg = None print(f"\n ❌ TCP Ping 全部失败") result_data["tcp_ping"]["statistics"] = {"success": False, "packet_loss_percent": 100} # Traceroute 测试 if enable_traceroute: print(f"\n🛤️ Traceroute 测试:") trace_results = traceroute_simple_with_data(ip, max_hops=15) result_data["traceroute"] = trace_results # HTTP 延迟测试 print(f"\n⏱️ HTTP 延迟测试 (共{TEST_COUNT}次):") latencies = [] http_details = [] http_status = None for i in range(TEST_COUNT): result = test_latency(url) if result["success"]: latency = result["latency"] latencies.append(latency) http_status = result.get("status", "N/A") http_details.append({"seq": i+1, "latency_ms": round(latency, 2), "status": http_status, "success": True}) print(f" 第{i+1}次: {latency:.2f} ms (HTTP {http_status})") else: http_details.append({"seq": i+1, "success": False, "error": result['error']}) print(f" 第{i+1}次: 失败 - {result['error']}") result_data["http"]["details"] = http_details result_data["http"]["status_code"] = http_status if latencies: avg_latency = statistics.mean(latencies) min_latency = min(latencies) max_latency = max(latencies) print(f"\n 📊 HTTP延迟统计:") print(f" 平均: {avg_latency:.2f} ms") print(f" 最小: {min_latency:.2f} ms") print(f" 最大: {max_latency:.2f} ms") result_data["http"]["statistics"] = { "avg_ms": round(avg_latency, 2), "min_ms": round(min_latency, 2), "max_ms": round(max_latency, 2), "success_count": len(latencies), "total_count": TEST_COUNT } else: avg_latency = None print(f"\n ❌ HTTP延迟测试失败") result_data["http"]["statistics"] = {"success": False} # 下载测试 print(f"\n📥 下载测试:") dl_result = test_download_speed(url) if dl_result["success"]: size = dl_result["size"] dl_time = dl_result["time"] speed = dl_result["speed"] print(f" 下载大小: {format_size(size)}") print(f" 下载用时: {dl_time:.3f} 秒") print(f" 下载速度: {format_speed(speed)}") result_data["download"] = { "success": True, "size_bytes": size, "time_seconds": round(dl_time, 3), "speed_bps": round(speed, 2), "speed_formatted": format_speed(speed) } else: speed = None print(f" ❌ 下载失败 - {dl_result['error']}") result_data["download"] = {"success": False, "error": dl_result['error']} # 汇总 result_data["summary"] = { "avg_latency": round(avg_latency, 2) if avg_latency else None, "tcp_latency": round(tcp_avg, 2) if tcp_avg else None, "speed": round(speed, 2) if speed else None, "status": "可用" if tcp_avg else "不可用" } return result_data def traceroute_simple_with_data(host, max_hops=TRACEROUTE_MAX_HOPS): """简化版TCP traceroute,返回数据""" results = [] try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪到 {host} ({dest_ip}):") print(f" {'跳数':<4} {'状态':<15} {'延迟'}") print(" " + "-" * 40) for ttl in range(1, max_hops + 1): hop_data = {"hop": ttl} try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) sock.settimeout(2) start = time.time() try: sock.connect((dest_ip, 443)) elapsed = (time.time() - start) * 1000 print(f" {ttl:<4} {'到达目标':<15} {elapsed:.2f} ms") hop_data.update({"status": "到达目标", "latency_ms": round(elapsed, 2), "reached": True}) results.append(hop_data) sock.close() break except socket.timeout: print(f" {ttl:<4} {'超时':<15} *") hop_data.update({"status": "超时", "timeout": True}) except socket.error as e: elapsed = (time.time() - start) * 1000 if e.errno == 111: print(f" {ttl:<4} {'中间节点':<15} {elapsed:.2f} ms") hop_data.update({"status": "中间节点", "latency_ms": round(elapsed, 2)}) else: print(f" {ttl:<4} {'TTL过期':<15} {elapsed:.2f} ms") hop_data.update({"status": "TTL过期", "latency_ms": round(elapsed, 2)}) sock.close() except Exception as e: print(f" {ttl:<4} 错误: {e}") hop_data.update({"error": str(e)}) results.append(hop_data) return results def print_summary(results): """打印汇总结果""" print("\n\n" + "="*70) print("📈 测速结果汇总") print("="*70) # 按TCP延迟排序 valid_results = [r for r in results if r["summary"].get("tcp_latency") is not None] valid_results.sort(key=lambda x: x["summary"]["tcp_latency"]) print(f"\n{'排名':<4} {'名称':<22} {'TCP延迟':<12} {'HTTP延迟':<12} {'速度':<12}") print("-" * 70) for i, r in enumerate(valid_results, 1): tcp_str = f"{r['summary']['tcp_latency']:.2f} ms" if r['summary']['tcp_latency'] else "N/A" http_str = f"{r['summary']['avg_latency']:.2f} ms" if r['summary']['avg_latency'] else "N/A" speed_str = format_speed(r['summary']['speed']) if r['summary']['speed'] else "N/A" print(f"{i:<4} {r['name']:<22} {tcp_str:<12} {http_str:<12} {speed_str:<12}") # 显示失败的 failed = [r for r in results if r["summary"].get("tcp_latency") is None] if failed: print("\n❌ 无法连接的节点:") for r in failed: print(f" - {r['name']} ({r['url']})") # 推荐 if valid_results: best = valid_results[0] print(f"\n" + "="*70) print(f"✅ 推荐使用: {best['name']}") print(f" URL: {best['url']}") if best['summary']['tcp_latency']: print(f" TCP延迟: {best['summary']['tcp_latency']:.2f} ms") if best['summary']['avg_latency']: print(f" HTTP延迟: {best['summary']['avg_latency']:.2f} ms") def export_json(results, output_file=None): """导出测试结果到JSON文件""" if output_file is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f"speedtest_result_{timestamp}.json" # 构建完整报告 report = { "report_info": { "tool": "CDN 测速工具", "version": "2.1", "generated_at": datetime.now().isoformat(), "test_parameters": { "tcp_ping_count": TCP_PING_COUNT, "http_test_count": TEST_COUNT, "timeout_seconds": TIMEOUT, "traceroute_max_hops": TRACEROUTE_MAX_HOPS } }, "endpoints": results, "ranking": [], "recommendation": None } # 生成排名 valid_results = [r for r in results if r["summary"].get("tcp_latency") is not None] valid_results.sort(key=lambda x: x["summary"]["tcp_latency"]) for i, r in enumerate(valid_results, 1): report["ranking"].append({ "rank": i, "name": r["name"], "url": r["url"], "tcp_latency_ms": r["summary"]["tcp_latency"], "http_latency_ms": r["summary"]["avg_latency"], "speed_bps": r["summary"]["speed"] }) # 推荐 if valid_results: best = valid_results[0] report["recommendation"] = { "name": best["name"], "url": best["url"], "tcp_latency_ms": best["summary"]["tcp_latency"], "http_latency_ms": best["summary"]["avg_latency"], "reason": "TCP延迟最低" } # 写入文件 with open(output_file, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) return output_file def main(): import argparse parser = argparse.ArgumentParser(description='CDN 测速工具 v2.1') parser.add_argument('--no-traceroute', '-t', action='store_true', help='禁用 traceroute 测试') parser.add_argument('--quick', '-q', action='store_true', help='快速模式(减少测试次数)') parser.add_argument('--json', '-j', nargs='?', const='auto', default=None, metavar='FILE', help='导出结果到JSON文件(不指定文件名则自动生成)') parser.add_argument('--output', '-o', type=str, default=None, help='指定JSON输出文件路径') args = parser.parse_args() global TEST_COUNT, TCP_PING_COUNT if args.quick: TEST_COUNT = 1 TCP_PING_COUNT = 3 print("\n" + "="*70) print(" 🚀 CDN 测速工具 v2.1") print(" 支持: TCP Ping | Traceroute | HTTP延迟 | 下载速度 | JSON导出") print(" 测试中国大陆优化节点") print("="*70) results = [] enable_traceroute = not args.no_traceroute for endpoint in ENDPOINTS: result = test_endpoint(endpoint, enable_traceroute=enable_traceroute) results.append(result) print_summary(results) # 导出JSON json_file = args.output or args.json if json_file: if json_file == 'auto': json_file = None # 让export_json自动生成文件名 output_path = export_json(results, json_file) print(f"\n📄 测试结果已导出到: {output_path}") print("\n" + "="*70) print("测速完成!") print("="*70 + "\n") if __name__ == "__main__": main()anyrouter CDN 测速工具 v2.1 - 测试多个服务地址的连接速度
张小明
前端开发工程师
基于C# WinForm实现的带条码打印的固定资产管理
一、系统架构设计 1. 技术架构 // 技术栈组成 - 开发框架:.NET Framework 4.8 - UI框架:Windows Forms - 条码生成:BarcodeLib(开源库) - 数据库:SQL Server 2019 - 打印组件:Microsoft Print t…
8、Windows应用程序用户体验设计全解析
Windows应用程序用户体验设计全解析 在开发Windows Store应用程序时,列表管理控件起着关键作用。无论采用何种开发方式,列表控件都能以不同形式展示项目列表。Metro风格的列表控件包含在 Windows.UI.Xaml.Control (XAML)或 WinJS.UI (HTML)命名空间中。 1. 列表管理…
免费开源Android视频编辑器 - 移动端专业视频制作完整解决方案
免费开源Android视频编辑器 - 移动端专业视频制作完整解决方案 【免费下载链接】open-video-editor Open source Android video editor, built with Media3 and Jetpack Compose. 项目地址: https://gitcode.com/gh_mirrors/op/open-video-editor 在移动设备上进行专业级…
语雀文档批量迁移大师:高效导出解决方案
语雀文档批量迁移大师:高效导出解决方案 【免费下载链接】yuque-exporter 项目地址: https://gitcode.com/gh_mirrors/yuqu/yuque-exporter 语雀文档批量导出工具yuque-exporter为内容创作者提供了智能迁移和本地归档的完美解决方案。随着语雀平台定位从内容…
Mermaid数据可视化:让图表绘制像写文档一样简单![特殊字符]
Mermaid数据可视化:让图表绘制像写文档一样简单!🎯 【免费下载链接】mermaid mermaid-js/mermaid: 是一个用于生成图表和流程图的 Markdown 渲染器,支持多种图表类型和丰富的样式。适合对 Markdown、图表和流程图以及想要使用 Mar…
Win-PS2EXE:PowerShell脚本一键编译为EXE的终极方案
Win-PS2EXE:PowerShell脚本一键编译为EXE的终极方案 【免费下载链接】Win-PS2EXE Graphical frontend to PS1-to-EXE-compiler PS2EXE.ps1 项目地址: https://gitcode.com/gh_mirrors/wi/Win-PS2EXE 还在为PowerShell脚本的部署分发而烦恼吗?&…