173 lines
6.2 KiB
Python
173 lines
6.2 KiB
Python
"""
|
||
测试代理是否生效
|
||
验证Playwright是否真的通过代理IP访问网站
|
||
"""
|
||
import asyncio
|
||
import sys
|
||
from playwright.async_api import async_playwright
|
||
from damai_proxy_config import get_random_proxy, format_proxy_for_playwright
|
||
|
||
|
||
async def test_proxy():
|
||
"""测试代理IP是否生效"""
|
||
print("=" * 60, file=sys.stderr)
|
||
print("开始测试代理IP是否生效", file=sys.stderr)
|
||
print("=" * 60, file=sys.stderr)
|
||
|
||
# 1. 获取本机真实IP(不使用代理)
|
||
print("\n[步骤1] 获取本机真实IP(不使用代理)", file=sys.stderr)
|
||
playwright = await async_playwright().start()
|
||
browser = await playwright.chromium.launch(headless=False)
|
||
context = await browser.new_context()
|
||
page = await context.new_page()
|
||
|
||
# 访问IP查询网站
|
||
await page.goto('https://httpbin.org/ip', timeout=30000)
|
||
real_ip_text = await page.locator('body').inner_text()
|
||
print(f"本机IP信息:\n{real_ip_text}", file=sys.stderr)
|
||
|
||
await browser.close()
|
||
await playwright.stop()
|
||
|
||
# 2. 使用代理IP访问
|
||
print("\n[步骤2] 使用代理IP访问", file=sys.stderr)
|
||
|
||
# 获取随机代理
|
||
proxy_config = get_random_proxy()
|
||
proxy = format_proxy_for_playwright(proxy_config)
|
||
print(f"使用代理: {proxy_config.get('name', '未知')} - {proxy['server']}", file=sys.stderr)
|
||
|
||
playwright = await async_playwright().start()
|
||
|
||
# 启动浏览器时配置代理
|
||
launch_kwargs = {
|
||
"headless": False,
|
||
"proxy": proxy,
|
||
"args": [
|
||
'--disable-blink-features=AutomationControlled',
|
||
]
|
||
}
|
||
|
||
browser = await playwright.chromium.launch(**launch_kwargs)
|
||
context = await browser.new_context()
|
||
|
||
# 注入反检测脚本
|
||
await context.add_init_script("""
|
||
Object.defineProperty(navigator, 'webdriver', {
|
||
get: () => undefined
|
||
});
|
||
""")
|
||
|
||
page = await context.new_page()
|
||
|
||
# 访问IP查询网站
|
||
print("正在访问 httpbin.org/ip ...", file=sys.stderr)
|
||
await page.goto('https://httpbin.org/ip', timeout=30000)
|
||
proxy_ip_text = await page.locator('body').inner_text()
|
||
print(f"代理IP信息:\n{proxy_ip_text}", file=sys.stderr)
|
||
|
||
# 3. 访问多个IP查询网站进行交叉验证
|
||
print("\n[步骤3] 交叉验证 - 访问其他IP查询网站", file=sys.stderr)
|
||
|
||
test_urls = [
|
||
'https://api.ipify.org?format=json',
|
||
'https://ifconfig.me/all.json',
|
||
]
|
||
|
||
for url in test_urls:
|
||
try:
|
||
print(f"\n访问: {url}", file=sys.stderr)
|
||
await page.goto(url, timeout=30000)
|
||
content = await page.locator('body').inner_text()
|
||
print(f"返回内容:\n{content}", file=sys.stderr)
|
||
except Exception as e:
|
||
print(f"访问失败: {str(e)}", file=sys.stderr)
|
||
|
||
# 4. 测试访问小红书
|
||
print("\n[步骤4] 测试访问小红书创作者中心", file=sys.stderr)
|
||
try:
|
||
await page.goto('https://creator.xiaohongshu.com/login', timeout=60000)
|
||
print(f"小红书页面URL: {page.url}", file=sys.stderr)
|
||
print(f"页面标题: {await page.title()}", file=sys.stderr)
|
||
|
||
# 检查是否被重定向到验证页面
|
||
if 'captcha' in page.url:
|
||
print("⚠️ 警告: 页面被重定向到验证页面,代理IP可能已被标记", file=sys.stderr)
|
||
else:
|
||
print("✅ 成功访问小红书登录页", file=sys.stderr)
|
||
|
||
# 等待5秒观察页面
|
||
print("\n等待5秒,您可以观察浏览器窗口...", file=sys.stderr)
|
||
await asyncio.sleep(5)
|
||
|
||
except Exception as e:
|
||
print(f"访问小红书失败: {str(e)}", file=sys.stderr)
|
||
|
||
await browser.close()
|
||
await playwright.stop()
|
||
|
||
print("\n" + "=" * 60, file=sys.stderr)
|
||
print("测试完成", file=sys.stderr)
|
||
print("=" * 60, file=sys.stderr)
|
||
print("\n对比上述IP信息:", file=sys.stderr)
|
||
print("- 如果本机IP和代理IP不同,说明代理生效", file=sys.stderr)
|
||
print("- 如果IP相同,说明代理未生效", file=sys.stderr)
|
||
|
||
|
||
async def test_all_proxies():
|
||
"""测试所有启用的代理"""
|
||
from damai_proxy_config import get_all_enabled_proxies
|
||
|
||
enabled_proxies = get_all_enabled_proxies()
|
||
print(f"\n找到 {len(enabled_proxies)} 个已启用的代理", file=sys.stderr)
|
||
|
||
for i, proxy_config in enumerate(enabled_proxies, 1):
|
||
print(f"\n{'=' * 60}", file=sys.stderr)
|
||
print(f"测试代理 {i}/{len(enabled_proxies)}: {proxy_config.get('name', '未知')}", file=sys.stderr)
|
||
print(f"{'=' * 60}", file=sys.stderr)
|
||
|
||
try:
|
||
proxy = format_proxy_for_playwright(proxy_config)
|
||
|
||
playwright = await async_playwright().start()
|
||
browser = await playwright.chromium.launch(
|
||
headless=True,
|
||
proxy=proxy,
|
||
args=['--disable-blink-features=AutomationControlled']
|
||
)
|
||
context = await browser.new_context()
|
||
page = await context.new_page()
|
||
|
||
# 访问IP查询网站
|
||
await page.goto('https://httpbin.org/ip', timeout=30000)
|
||
ip_text = await page.locator('body').inner_text()
|
||
print(f"代理IP: {ip_text}", file=sys.stderr)
|
||
|
||
# 测试访问小红书
|
||
await page.goto('https://creator.xiaohongshu.com/login', timeout=60000)
|
||
|
||
if 'captcha' in page.url:
|
||
print(f"❌ {proxy_config['name']} - 触发验证页面", file=sys.stderr)
|
||
else:
|
||
print(f"✅ {proxy_config['name']} - 可正常访问", file=sys.stderr)
|
||
|
||
await browser.close()
|
||
await playwright.stop()
|
||
|
||
except Exception as e:
|
||
print(f"❌ {proxy_config['name']} - 测试失败: {str(e)}", file=sys.stderr)
|
||
|
||
# 间隔2秒
|
||
await asyncio.sleep(2)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import sys
|
||
|
||
if len(sys.argv) > 1 and sys.argv[1] == "all":
|
||
# 测试所有代理
|
||
asyncio.run(test_all_proxies())
|
||
else:
|
||
# 测试单个随机代理(详细模式)
|
||
asyncio.run(test_proxy())
|