Files
ai_wht_wechat/backend/test_xhs_proxy.py
2026-01-07 22:55:59 +08:00

144 lines
4.6 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试小红书代理配置
验证代理IP是否可用
"""
import asyncio
from xhs_login import XHSLoginService
from damai_proxy_config import get_all_enabled_proxies, format_proxy_for_playwright
async def test_proxy_access():
"""测试代理访问小红书"""
proxies = get_all_enabled_proxies()
print(f"\n📋 共有 {len(proxies)} 个可用代理\n")
for i, proxy_config in enumerate(proxies):
print(f"\n{'='*60}")
print(f"🔄 测试代理 {i+1}/{len(proxies)}: {proxy_config['name']}")
print(f" 服务器: {proxy_config['server']}")
print(f" 用户名: {proxy_config['username']}")
print(f"{'='*60}\n")
try:
# 格式化代理配置
proxy_dict = format_proxy_for_playwright(proxy_config)
print(f"🌐 代理配置: {proxy_dict}\n")
# 创建登录服务实例,使用指定代理
login_service = XHSLoginService(
use_pool=False, # 测试时不使用浏览器池
headless=False, # 有头模式方便观察
)
# 初始化浏览器,传入代理
print("🚀 正在启动浏览器...")
await login_service.init_browser(
proxy=proxy_dict, # 使用代理字典
use_random_proxy=False # 禁用自动代理,使用我们指定的
)
print("✅ 浏览器启动成功\n")
# 访问小红书首页
print("🌍 正在访问小红书首页...")
await login_service.page.goto('https://www.xiaohongshu.com', timeout=30000)
# 检查当前URL
current_url = login_service.page.url
print(f"📍 当前URL: {current_url}\n")
# 判断是否被风控
if '/website-login/captcha' in current_url or 'verifyUuid=' in current_url:
print("❌ 该代理被风控,需要验证\n")
else:
print("✅ 代理正常,成功访问小红书\n")
# 等待5秒观察页面
print("⏳ 等待5秒观察页面...")
await asyncio.sleep(5)
# 关闭浏览器
await login_service.close()
print("✅ 浏览器已关闭\n")
except Exception as e:
print(f"❌ 测试失败: {str(e)}\n")
try:
await login_service.close()
except:
pass
async def test_random_proxy():
"""测试随机代理"""
print(f"\n{'='*60}")
print("🎲 测试随机代理功能")
print(f"{'='*60}\n")
try:
# 创建登录服务实例
login_service = XHSLoginService(
use_pool=False,
headless=False,
)
# 初始化浏览器,自动选择随机代理
print("🚀 正在启动浏览器(自动选择随机代理)...")
await login_service.init_browser(use_random_proxy=True)
print("✅ 浏览器启动成功\n")
# 访问小红书首页
print("🌍 正在访问小红书首页...")
await login_service.page.goto('https://www.xiaohongshu.com', timeout=30000)
# 检查当前URL
current_url = login_service.page.url
print(f"📍 当前URL: {current_url}\n")
# 判断是否被风控
if '/website-login/captcha' in current_url or 'verifyUuid=' in current_url:
print("❌ 该代理被风控,需要验证\n")
else:
print("✅ 代理正常,成功访问小红书\n")
# 等待10秒观察页面
print("⏳ 等待10秒观察页面...")
await asyncio.sleep(10)
# 关闭浏览器
await login_service.close()
print("✅ 浏览器已关闭\n")
except Exception as e:
print(f"❌ 测试失败: {str(e)}\n")
try:
await login_service.close()
except:
pass
if __name__ == '__main__':
print("\n" + "="*60)
print(" 小红书代理配置测试")
print("="*60)
# 选择测试模式
print("\n请选择测试模式:")
print("1. 测试所有代理")
print("2. 测试随机代理功能")
choice = input("\n请输入选项(1 or 2): ").strip()
if choice == '1':
asyncio.run(test_proxy_access())
elif choice == '2':
asyncio.run(test_random_proxy())
else:
print("无效选项")