Files
ai_wht_wechat/backend/test_proxy_connectivity.py

152 lines
4.7 KiB
Python
Raw Normal View History

2026-01-06 19:36:42 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
固定代理IP测试脚本
使用requests请求代理服务器验证代理是否可用
"""
import requests
import json
from damai_proxy_config import get_proxy_config, get_all_enabled_proxies
def test_proxy_requests(proxy_info, target_url="http://httpbin.org/ip"):
"""
使用requests测试代理IP
Args:
proxy_info: 代理信息字典包含server, username, password
target_url: 目标测试URL
"""
print(f"\n{'='*60}")
print(f"🔍 测试代理: {proxy_info.get('name', 'Unknown')}")
print(f" 服务器: {proxy_info['server']}")
print(f" 用户名: {proxy_info['username']}")
print(f" 目标URL: {target_url}")
print(f"{'='*60}")
# 构建代理认证信息
proxy_server = proxy_info['server'].replace('http://', '')
proxy_url = f"http://{proxy_info['username']}:{proxy_info['password']}@{proxy_server}"
proxies = {
"http": proxy_url,
"https": proxy_url
}
try:
# 发送测试请求
print("🚀 发送测试请求...")
response = requests.get(target_url, proxies=proxies, timeout=5) # 减少超时时间到5秒
if response.status_code == 200:
print(f"✅ 代理测试成功!状态码: {response.status_code}")
# 尝试解析IP信息
try:
ip_info = response.json()
print(f"🌐 当前IP信息: {json.dumps(ip_info, indent=2, ensure_ascii=False)}")
except:
print(f"🌐 页面内容 (前500字符): {response.text[:500]}")
return True
else:
print(f"❌ 代理测试失败!状态码: {response.status_code}")
print(f"响应内容: {response.text[:200]}")
return False
except requests.exceptions.ProxyError:
print("❌ 代理连接错误:无法连接到代理服务器")
return False
except requests.exceptions.ConnectTimeout:
print("❌ 连接超时:代理服务器响应超时")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 请求异常: {str(e)}")
return False
def test_all_proxies():
"""测试所有配置的代理"""
print("🎯 开始测试所有代理IP")
proxies = get_all_enabled_proxies()
if not proxies:
print("❌ 没有找到可用的代理配置")
return
print(f"📊 共找到 {len(proxies)} 个代理IP")
results = []
for i, proxy in enumerate(proxies, 1):
print(f"\n\n{'#'*60}")
print(f"# 测试进度: {i}/{len(proxies)}")
print(f"{'#'*60}")
success = test_proxy_requests(proxy)
results.append({
'proxy': proxy['name'],
'server': proxy['server'],
'success': success
})
if i < len(proxies):
print(f"\n⏳ 等待2秒后测试下一个代理...")
import time
time.sleep(2)
# 输出测试结果汇总
print(f"\n{'='*60}")
print("📊 测试结果汇总:")
print(f"{'='*60}")
success_count = 0
for result in results:
status = "✅ 成功" if result['success'] else "❌ 失败"
print(f" {result['proxy']} ({result['server']}) - {status}")
if result['success']:
success_count += 1
print(f"\n📈 总体成功率: {success_count}/{len(results)} ({success_count/len(results)*100:.1f}%)")
# 如果有成功的代理,显示可用于小红书的代理
successful_proxies = [r for r in results if r['success']]
if successful_proxies:
print(f"\n🎉 以下代理可用于小红书登录发文:")
for proxy in successful_proxies:
print(f" - {proxy['proxy']}: {proxy['server']}")
return results
def test_xhs_proxy_format():
"""测试适用于小红书的代理格式"""
print(f"\n{'='*60}")
print("🔧 测试适用于Playwright的代理格式")
print(f"{'='*60}")
proxies = get_all_enabled_proxies()
for proxy in proxies:
server = proxy['server'].replace('http://', '') # 移除http://前缀
proxy_url = f"http://{proxy['username']}:{proxy['password']}@{server}"
print(f" {proxy['name']}:")
print(f" 服务器地址: {proxy['server']}")
print(f" Playwright格式: {proxy_url}")
print()
if __name__ == "__main__":
print("🚀 开始测试固定代理IP")
# 测试代理格式
test_xhs_proxy_format()
# 测试所有代理
test_all_proxies()
print(f"\n{'='*60}")
print("🎉 代理测试完成!")
print(f"{'='*60}")