Files
ai_wht_wechat/backend/test_proxy_verification.py
2026-01-07 22:55:59 +08:00

173 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
测试代理是否生效
验证Playwright是否真的通过代理IP访问网站
"""
import asyncio
import sys
from playwright.async_api import async_playwright
from damai_proxy_config import get_random_proxy, format_proxy_for_playwright
async def test_proxy():
"""测试代理IP是否生效"""
print("=" * 60, file=sys.stderr)
print("开始测试代理IP是否生效", file=sys.stderr)
print("=" * 60, file=sys.stderr)
# 1. 获取本机真实IP不使用代理
print("\n[步骤1] 获取本机真实IP不使用代理", file=sys.stderr)
playwright = await async_playwright().start()
browser = await playwright.chromium.launch(headless=False)
context = await browser.new_context()
page = await context.new_page()
# 访问IP查询网站
await page.goto('https://httpbin.org/ip', timeout=30000)
real_ip_text = await page.locator('body').inner_text()
print(f"本机IP信息:\n{real_ip_text}", file=sys.stderr)
await browser.close()
await playwright.stop()
# 2. 使用代理IP访问
print("\n[步骤2] 使用代理IP访问", file=sys.stderr)
# 获取随机代理
proxy_config = get_random_proxy()
proxy = format_proxy_for_playwright(proxy_config)
print(f"使用代理: {proxy_config.get('name', '未知')} - {proxy['server']}", file=sys.stderr)
playwright = await async_playwright().start()
# 启动浏览器时配置代理
launch_kwargs = {
"headless": False,
"proxy": proxy,
"args": [
'--disable-blink-features=AutomationControlled',
]
}
browser = await playwright.chromium.launch(**launch_kwargs)
context = await browser.new_context()
# 注入反检测脚本
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
""")
page = await context.new_page()
# 访问IP查询网站
print("正在访问 httpbin.org/ip ...", file=sys.stderr)
await page.goto('https://httpbin.org/ip', timeout=30000)
proxy_ip_text = await page.locator('body').inner_text()
print(f"代理IP信息:\n{proxy_ip_text}", file=sys.stderr)
# 3. 访问多个IP查询网站进行交叉验证
print("\n[步骤3] 交叉验证 - 访问其他IP查询网站", file=sys.stderr)
test_urls = [
'https://api.ipify.org?format=json',
'https://ifconfig.me/all.json',
]
for url in test_urls:
try:
print(f"\n访问: {url}", file=sys.stderr)
await page.goto(url, timeout=30000)
content = await page.locator('body').inner_text()
print(f"返回内容:\n{content}", file=sys.stderr)
except Exception as e:
print(f"访问失败: {str(e)}", file=sys.stderr)
# 4. 测试访问小红书
print("\n[步骤4] 测试访问小红书创作者中心", file=sys.stderr)
try:
await page.goto('https://creator.xiaohongshu.com/login', timeout=60000)
print(f"小红书页面URL: {page.url}", file=sys.stderr)
print(f"页面标题: {await page.title()}", file=sys.stderr)
# 检查是否被重定向到验证页面
if 'captcha' in page.url:
print("⚠️ 警告: 页面被重定向到验证页面代理IP可能已被标记", file=sys.stderr)
else:
print("✅ 成功访问小红书登录页", file=sys.stderr)
# 等待5秒观察页面
print("\n等待5秒您可以观察浏览器窗口...", file=sys.stderr)
await asyncio.sleep(5)
except Exception as e:
print(f"访问小红书失败: {str(e)}", file=sys.stderr)
await browser.close()
await playwright.stop()
print("\n" + "=" * 60, file=sys.stderr)
print("测试完成", file=sys.stderr)
print("=" * 60, file=sys.stderr)
print("\n对比上述IP信息:", file=sys.stderr)
print("- 如果本机IP和代理IP不同说明代理生效", file=sys.stderr)
print("- 如果IP相同说明代理未生效", file=sys.stderr)
async def test_all_proxies():
"""测试所有启用的代理"""
from damai_proxy_config import get_all_enabled_proxies
enabled_proxies = get_all_enabled_proxies()
print(f"\n找到 {len(enabled_proxies)} 个已启用的代理", file=sys.stderr)
for i, proxy_config in enumerate(enabled_proxies, 1):
print(f"\n{'=' * 60}", file=sys.stderr)
print(f"测试代理 {i}/{len(enabled_proxies)}: {proxy_config.get('name', '未知')}", file=sys.stderr)
print(f"{'=' * 60}", file=sys.stderr)
try:
proxy = format_proxy_for_playwright(proxy_config)
playwright = await async_playwright().start()
browser = await playwright.chromium.launch(
headless=True,
proxy=proxy,
args=['--disable-blink-features=AutomationControlled']
)
context = await browser.new_context()
page = await context.new_page()
# 访问IP查询网站
await page.goto('https://httpbin.org/ip', timeout=30000)
ip_text = await page.locator('body').inner_text()
print(f"代理IP: {ip_text}", file=sys.stderr)
# 测试访问小红书
await page.goto('https://creator.xiaohongshu.com/login', timeout=60000)
if 'captcha' in page.url:
print(f"{proxy_config['name']} - 触发验证页面", file=sys.stderr)
else:
print(f"{proxy_config['name']} - 可正常访问", file=sys.stderr)
await browser.close()
await playwright.stop()
except Exception as e:
print(f"{proxy_config['name']} - 测试失败: {str(e)}", file=sys.stderr)
# 间隔2秒
await asyncio.sleep(2)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "all":
# 测试所有代理
asyncio.run(test_all_proxies())
else:
# 测试单个随机代理(详细模式)
asyncio.run(test_proxy())