""" 测试代理是否生效 验证Playwright是否真的通过代理IP访问网站 """ import asyncio import sys from playwright.async_api import async_playwright from damai_proxy_config import get_random_proxy, format_proxy_for_playwright async def test_proxy(): """测试代理IP是否生效""" print("=" * 60, file=sys.stderr) print("开始测试代理IP是否生效", file=sys.stderr) print("=" * 60, file=sys.stderr) # 1. 获取本机真实IP(不使用代理) print("\n[步骤1] 获取本机真实IP(不使用代理)", file=sys.stderr) playwright = await async_playwright().start() browser = await playwright.chromium.launch(headless=False) context = await browser.new_context() page = await context.new_page() # 访问IP查询网站 await page.goto('https://httpbin.org/ip', timeout=30000) real_ip_text = await page.locator('body').inner_text() print(f"本机IP信息:\n{real_ip_text}", file=sys.stderr) await browser.close() await playwright.stop() # 2. 使用代理IP访问 print("\n[步骤2] 使用代理IP访问", file=sys.stderr) # 获取随机代理 proxy_config = get_random_proxy() proxy = format_proxy_for_playwright(proxy_config) print(f"使用代理: {proxy_config.get('name', '未知')} - {proxy['server']}", file=sys.stderr) playwright = await async_playwright().start() # 启动浏览器时配置代理 launch_kwargs = { "headless": False, "proxy": proxy, "args": [ '--disable-blink-features=AutomationControlled', ] } browser = await playwright.chromium.launch(**launch_kwargs) context = await browser.new_context() # 注入反检测脚本 await context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); """) page = await context.new_page() # 访问IP查询网站 print("正在访问 httpbin.org/ip ...", file=sys.stderr) await page.goto('https://httpbin.org/ip', timeout=30000) proxy_ip_text = await page.locator('body').inner_text() print(f"代理IP信息:\n{proxy_ip_text}", file=sys.stderr) # 3. 访问多个IP查询网站进行交叉验证 print("\n[步骤3] 交叉验证 - 访问其他IP查询网站", file=sys.stderr) test_urls = [ 'https://api.ipify.org?format=json', 'https://ifconfig.me/all.json', ] for url in test_urls: try: print(f"\n访问: {url}", file=sys.stderr) await page.goto(url, timeout=30000) content = await page.locator('body').inner_text() print(f"返回内容:\n{content}", file=sys.stderr) except Exception as e: print(f"访问失败: {str(e)}", file=sys.stderr) # 4. 测试访问小红书 print("\n[步骤4] 测试访问小红书创作者中心", file=sys.stderr) try: await page.goto('https://creator.xiaohongshu.com/login', timeout=60000) print(f"小红书页面URL: {page.url}", file=sys.stderr) print(f"页面标题: {await page.title()}", file=sys.stderr) # 检查是否被重定向到验证页面 if 'captcha' in page.url: print("⚠️ 警告: 页面被重定向到验证页面,代理IP可能已被标记", file=sys.stderr) else: print("✅ 成功访问小红书登录页", file=sys.stderr) # 等待5秒观察页面 print("\n等待5秒,您可以观察浏览器窗口...", file=sys.stderr) await asyncio.sleep(5) except Exception as e: print(f"访问小红书失败: {str(e)}", file=sys.stderr) await browser.close() await playwright.stop() print("\n" + "=" * 60, file=sys.stderr) print("测试完成", file=sys.stderr) print("=" * 60, file=sys.stderr) print("\n对比上述IP信息:", file=sys.stderr) print("- 如果本机IP和代理IP不同,说明代理生效", file=sys.stderr) print("- 如果IP相同,说明代理未生效", file=sys.stderr) async def test_all_proxies(): """测试所有启用的代理""" from damai_proxy_config import get_all_enabled_proxies enabled_proxies = get_all_enabled_proxies() print(f"\n找到 {len(enabled_proxies)} 个已启用的代理", file=sys.stderr) for i, proxy_config in enumerate(enabled_proxies, 1): print(f"\n{'=' * 60}", file=sys.stderr) print(f"测试代理 {i}/{len(enabled_proxies)}: {proxy_config.get('name', '未知')}", file=sys.stderr) print(f"{'=' * 60}", file=sys.stderr) try: proxy = format_proxy_for_playwright(proxy_config) playwright = await async_playwright().start() browser = await playwright.chromium.launch( headless=True, proxy=proxy, args=['--disable-blink-features=AutomationControlled'] ) context = await browser.new_context() page = await context.new_page() # 访问IP查询网站 await page.goto('https://httpbin.org/ip', timeout=30000) ip_text = await page.locator('body').inner_text() print(f"代理IP: {ip_text}", file=sys.stderr) # 测试访问小红书 await page.goto('https://creator.xiaohongshu.com/login', timeout=60000) if 'captcha' in page.url: print(f"❌ {proxy_config['name']} - 触发验证页面", file=sys.stderr) else: print(f"✅ {proxy_config['name']} - 可正常访问", file=sys.stderr) await browser.close() await playwright.stop() except Exception as e: print(f"❌ {proxy_config['name']} - 测试失败: {str(e)}", file=sys.stderr) # 间隔2秒 await asyncio.sleep(2) if __name__ == "__main__": import sys if len(sys.argv) > 1 and sys.argv[1] == "all": # 测试所有代理 asyncio.run(test_all_proxies()) else: # 测试单个随机代理(详细模式) asyncio.run(test_proxy())