Files
ai_wht_wechat/backend/test_proxy_connection.py
2026-01-07 22:55:59 +08:00

157 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试代理IP连接
使用requests验证代理是否可用
"""
import requests
from damai_proxy_config import get_all_enabled_proxies
import time
def test_proxy_with_requests(proxy_config: dict, test_url: str = "http://httpbin.org/ip"):
"""
使用requests测试代理连接
Args:
proxy_config: 代理配置
test_url: 测试URL
"""
print(f"\n{'='*60}")
print(f"🔄 测试代理: {proxy_config['name']}")
print(f" 服务器: {proxy_config['server']}")
print(f" 用户名: {proxy_config['username']}")
print(f"{'='*60}\n")
# 格式化代理URL
server = proxy_config['server'].replace('http://', '').replace('https://', '')
username = proxy_config['username']
password = proxy_config['password']
proxy_url = f"http://{username}:{password}@{server}"
proxies = {
"http": proxy_url,
"https": proxy_url
}
print(f"📍 代理URL: {proxy_url}")
# 测试HTTP
print(f"\n✅ 测试HTTP...")
try:
start_time = time.time()
response = requests.get("http://httpbin.org/ip", proxies=proxies, timeout=10)
elapsed_time = time.time() - start_time
if response.status_code == 200:
print(f" ✅ HTTP连接成功 - 响应时间: {elapsed_time:.2f}")
try:
result = response.json()
print(f" 🌍 出IP: {result.get('origin', 'N/A')}")
except:
pass
else:
print(f" ❌ HTTP失败 - 状态码: {response.status_code}")
return False
except Exception as e:
print(f" ❌ HTTP错误: {str(e)}")
return False
# 测试HTTPS
print(f"\n🔒 测试HTTPS...")
try:
start_time = time.time()
response = requests.get("https://httpbin.org/ip", proxies=proxies, timeout=10)
elapsed_time = time.time() - start_time
if response.status_code == 200:
print(f" ✅ HTTPS连接成功 - 响应时间: {elapsed_time:.2f}")
try:
result = response.json()
print(f" 🌍 出IP: {result.get('origin', 'N/A')}")
except:
pass
return True
else:
print(f" ❌ HTTPS失败 - 状态码: {response.status_code}")
return False
except Exception as e:
print(f" ❌ HTTPS错误: {str(e)}")
print(f" ⚠️ 该代理可能不支持HTTPS隧道(CONNECT方法)")
return False
def test_all_proxies():
"""测试所有代理"""
proxies = get_all_enabled_proxies()
print(f"\n{'='*60}")
print(f" 代理连接测试")
print(f" 共有 {len(proxies)} 个代理需要测试")
print(f"{'='*60}")
results = []
for i, proxy_config in enumerate(proxies, 1):
print(f"\n[{i}/{len(proxies)}]")
success = test_proxy_with_requests(proxy_config)
results.append({
'name': proxy_config['name'],
'server': proxy_config['server'],
'success': success
})
if i < len(proxies):
print("\n⏳ 等待2秒...")
time.sleep(2)
# 输出汇总
print(f"\n\n{'='*60}")
print(" 测试汇总")
print(f"{'='*60}\n")
success_count = sum(1 for r in results if r['success'])
fail_count = len(results) - success_count
print(f"✅ 成功: {success_count}/{len(results)}")
print(f"❌ 失败: {fail_count}/{len(results)}")
print(f"\n详细结果:")
for result in results:
status = "✅ 可用" if result['success'] else "❌ 不可用"
print(f" {status} - {result['name']} ({result['server']})")
return results
if __name__ == '__main__':
print("\n" + "="*60)
print(" 小红书代理IP连接测试")
print(" 使用requests库测试代理连通性")
print("="*60)
results = test_all_proxies()
# 给出建议
available_proxies = [r for r in results if r['success']]
if available_proxies:
print(f"\n\n💡 建议:")
print(f" 以下代理可用可以在damai_proxy_config.py中保留:")
for proxy in available_proxies:
print(f"{proxy['name']}")
unavailable_proxies = [r for r in results if not r['success']]
if unavailable_proxies:
print(f"\n 以下代理不可用建议在damai_proxy_config.py中设置enabled=False:")
for proxy in unavailable_proxies:
print(f"{proxy['name']}")
else:
print(f"\n\n⚠️ 警告: 所有代理均不可用!")
print(f" 请检查:")
print(f" 1. 代理服务器是否在线")
print(f" 2. 用户名和密码是否正确")
print(f" 3. 本机网络是否正常")