#!/usr/bin/env python # -*- coding: utf-8 -*- """ 测试代理IP连接 使用requests验证代理是否可用 """ import requests from damai_proxy_config import get_all_enabled_proxies import time def test_proxy_with_requests(proxy_config: dict, test_url: str = "http://httpbin.org/ip"): """ 使用requests测试代理连接 Args: proxy_config: 代理配置 test_url: 测试URL """ print(f"\n{'='*60}") print(f"🔄 测试代理: {proxy_config['name']}") print(f" 服务器: {proxy_config['server']}") print(f" 用户名: {proxy_config['username']}") print(f"{'='*60}\n") # 格式化代理URL server = proxy_config['server'].replace('http://', '').replace('https://', '') username = proxy_config['username'] password = proxy_config['password'] proxy_url = f"http://{username}:{password}@{server}" proxies = { "http": proxy_url, "https": proxy_url } print(f"📍 代理URL: {proxy_url}") # 测试HTTP print(f"\n✅ 测试HTTP...") try: start_time = time.time() response = requests.get("http://httpbin.org/ip", proxies=proxies, timeout=10) elapsed_time = time.time() - start_time if response.status_code == 200: print(f" ✅ HTTP连接成功 - 响应时间: {elapsed_time:.2f}秒") try: result = response.json() print(f" 🌍 出IP: {result.get('origin', 'N/A')}") except: pass else: print(f" ❌ HTTP失败 - 状态码: {response.status_code}") return False except Exception as e: print(f" ❌ HTTP错误: {str(e)}") return False # 测试HTTPS print(f"\n🔒 测试HTTPS...") try: start_time = time.time() response = requests.get("https://httpbin.org/ip", proxies=proxies, timeout=10) elapsed_time = time.time() - start_time if response.status_code == 200: print(f" ✅ HTTPS连接成功 - 响应时间: {elapsed_time:.2f}秒") try: result = response.json() print(f" 🌍 出IP: {result.get('origin', 'N/A')}") except: pass return True else: print(f" ❌ HTTPS失败 - 状态码: {response.status_code}") return False except Exception as e: print(f" ❌ HTTPS错误: {str(e)}") print(f" ⚠️ 该代理可能不支持HTTPS隧道(CONNECT方法)") return False def test_all_proxies(): """测试所有代理""" proxies = get_all_enabled_proxies() print(f"\n{'='*60}") print(f" 代理连接测试") print(f" 共有 {len(proxies)} 个代理需要测试") print(f"{'='*60}") results = [] for i, proxy_config in enumerate(proxies, 1): print(f"\n[{i}/{len(proxies)}]") success = test_proxy_with_requests(proxy_config) results.append({ 'name': proxy_config['name'], 'server': proxy_config['server'], 'success': success }) if i < len(proxies): print("\n⏳ 等待2秒...") time.sleep(2) # 输出汇总 print(f"\n\n{'='*60}") print(" 测试汇总") print(f"{'='*60}\n") success_count = sum(1 for r in results if r['success']) fail_count = len(results) - success_count print(f"✅ 成功: {success_count}/{len(results)}") print(f"❌ 失败: {fail_count}/{len(results)}") print(f"\n详细结果:") for result in results: status = "✅ 可用" if result['success'] else "❌ 不可用" print(f" {status} - {result['name']} ({result['server']})") return results if __name__ == '__main__': print("\n" + "="*60) print(" 小红书代理IP连接测试") print(" 使用requests库测试代理连通性") print("="*60) results = test_all_proxies() # 给出建议 available_proxies = [r for r in results if r['success']] if available_proxies: print(f"\n\n💡 建议:") print(f" 以下代理可用,可以在damai_proxy_config.py中保留:") for proxy in available_proxies: print(f" ✅ {proxy['name']}") unavailable_proxies = [r for r in results if not r['success']] if unavailable_proxies: print(f"\n 以下代理不可用,建议在damai_proxy_config.py中设置enabled=False:") for proxy in unavailable_proxies: print(f" ❌ {proxy['name']}") else: print(f"\n\n⚠️ 警告: 所有代理均不可用!") print(f" 请检查:") print(f" 1. 代理服务器是否在线") print(f" 2. 用户名和密码是否正确") print(f" 3. 本机网络是否正常")