commit
This commit is contained in:
143
backend/test_xhs_proxy.py
Normal file
143
backend/test_xhs_proxy.py
Normal file
@@ -0,0 +1,143 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
测试小红书代理配置
|
||||
验证代理IP是否可用
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from xhs_login import XHSLoginService
|
||||
from damai_proxy_config import get_all_enabled_proxies, format_proxy_for_playwright
|
||||
|
||||
|
||||
async def test_proxy_access():
|
||||
"""测试代理访问小红书"""
|
||||
proxies = get_all_enabled_proxies()
|
||||
|
||||
print(f"\n📋 共有 {len(proxies)} 个可用代理\n")
|
||||
|
||||
for i, proxy_config in enumerate(proxies):
|
||||
print(f"\n{'='*60}")
|
||||
print(f"🔄 测试代理 {i+1}/{len(proxies)}: {proxy_config['name']}")
|
||||
print(f" 服务器: {proxy_config['server']}")
|
||||
print(f" 用户名: {proxy_config['username']}")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
try:
|
||||
# 格式化代理配置
|
||||
proxy_dict = format_proxy_for_playwright(proxy_config)
|
||||
print(f"🌐 代理配置: {proxy_dict}\n")
|
||||
|
||||
# 创建登录服务实例,使用指定代理
|
||||
login_service = XHSLoginService(
|
||||
use_pool=False, # 测试时不使用浏览器池
|
||||
headless=False, # 有头模式方便观察
|
||||
)
|
||||
|
||||
# 初始化浏览器,传入代理
|
||||
print("🚀 正在启动浏览器...")
|
||||
await login_service.init_browser(
|
||||
proxy=proxy_dict, # 使用代理字典
|
||||
use_random_proxy=False # 禁用自动代理,使用我们指定的
|
||||
)
|
||||
|
||||
print("✅ 浏览器启动成功\n")
|
||||
|
||||
# 访问小红书首页
|
||||
print("🌍 正在访问小红书首页...")
|
||||
await login_service.page.goto('https://www.xiaohongshu.com', timeout=30000)
|
||||
|
||||
# 检查当前URL
|
||||
current_url = login_service.page.url
|
||||
print(f"📍 当前URL: {current_url}\n")
|
||||
|
||||
# 判断是否被风控
|
||||
if '/website-login/captcha' in current_url or 'verifyUuid=' in current_url:
|
||||
print("❌ 该代理被风控,需要验证\n")
|
||||
else:
|
||||
print("✅ 代理正常,成功访问小红书\n")
|
||||
|
||||
# 等待5秒观察页面
|
||||
print("⏳ 等待5秒观察页面...")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
# 关闭浏览器
|
||||
await login_service.close()
|
||||
print("✅ 浏览器已关闭\n")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 测试失败: {str(e)}\n")
|
||||
try:
|
||||
await login_service.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
async def test_random_proxy():
|
||||
"""测试随机代理"""
|
||||
print(f"\n{'='*60}")
|
||||
print("🎲 测试随机代理功能")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
try:
|
||||
# 创建登录服务实例
|
||||
login_service = XHSLoginService(
|
||||
use_pool=False,
|
||||
headless=False,
|
||||
)
|
||||
|
||||
# 初始化浏览器,自动选择随机代理
|
||||
print("🚀 正在启动浏览器(自动选择随机代理)...")
|
||||
await login_service.init_browser(use_random_proxy=True)
|
||||
|
||||
print("✅ 浏览器启动成功\n")
|
||||
|
||||
# 访问小红书首页
|
||||
print("🌍 正在访问小红书首页...")
|
||||
await login_service.page.goto('https://www.xiaohongshu.com', timeout=30000)
|
||||
|
||||
# 检查当前URL
|
||||
current_url = login_service.page.url
|
||||
print(f"📍 当前URL: {current_url}\n")
|
||||
|
||||
# 判断是否被风控
|
||||
if '/website-login/captcha' in current_url or 'verifyUuid=' in current_url:
|
||||
print("❌ 该代理被风控,需要验证\n")
|
||||
else:
|
||||
print("✅ 代理正常,成功访问小红书\n")
|
||||
|
||||
# 等待10秒观察页面
|
||||
print("⏳ 等待10秒观察页面...")
|
||||
await asyncio.sleep(10)
|
||||
|
||||
# 关闭浏览器
|
||||
await login_service.close()
|
||||
print("✅ 浏览器已关闭\n")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 测试失败: {str(e)}\n")
|
||||
try:
|
||||
await login_service.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("\n" + "="*60)
|
||||
print(" 小红书代理配置测试")
|
||||
print("="*60)
|
||||
|
||||
# 选择测试模式
|
||||
print("\n请选择测试模式:")
|
||||
print("1. 测试所有代理")
|
||||
print("2. 测试随机代理功能")
|
||||
|
||||
choice = input("\n请输入选项(1 or 2): ").strip()
|
||||
|
||||
if choice == '1':
|
||||
asyncio.run(test_proxy_access())
|
||||
elif choice == '2':
|
||||
asyncio.run(test_random_proxy())
|
||||
else:
|
||||
print("无效选项")
|
||||
Reference in New Issue
Block a user