Files
ai_wht_wechat/backend/verify_proxy_usage.py

230 lines
8.1 KiB
Python
Raw Permalink Normal View History

2026-01-06 19:36:42 +08:00
"""
Playwright代理IP验证脚本
验证Playwright浏览器是否使用了代理IP而不是本机IP
"""
import asyncio
from playwright.async_api import async_playwright
import requests
import json
async def get_my_ip_requests():
"""使用requests获取当前IP不使用代理"""
try:
response = requests.get('http://httpbin.org/ip', timeout=10)
if response.status_code == 200:
data = response.json()
return data.get('origin', 'Unknown')
except Exception as e:
print(f"获取本机IP失败: {str(e)}")
return None
async def get_browser_ip_via_playwright(proxy_url=None):
"""使用Playwright获取IP可选择是否使用代理"""
try:
async with async_playwright() as p:
# 启动浏览器
launch_kwargs = {
"headless": True, # 无头模式
}
# 如果提供了代理,则使用代理
if proxy_url:
launch_kwargs["proxy"] = {"server": proxy_url}
browser = await p.chromium.launch(**launch_kwargs)
context = await browser.new_context()
page = await context.new_page()
# 访问IP检测网站
await page.goto('http://httpbin.org/ip', wait_until='networkidle', timeout=10000)
# 获取页面内容
content = await page.content()
# 关闭浏览器
await browser.close()
# 解析IP信息
try:
import re
import json
# 查找JSON内容
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
ip_data = json.loads(json_match.group())
return ip_data.get('origin', 'Unknown')
except:
pass
return 'Parse Error'
except Exception as e:
print(f"通过Playwright获取IP失败: {str(e)}")
return None
async def verify_proxy_usage():
"""验证代理IP使用情况"""
print("="*60)
print("🔍 Playwright代理IP使用验证")
print("="*60)
# 1. 获取本机IP
print("\n1⃣ 获取本机IP地址...")
local_ip = await get_my_ip_requests()
if local_ip:
print(f" ✅ 本机IP: {local_ip}")
else:
print(" ❌ 无法获取本机IP")
return
# 2. 测试不使用代理时的IP
print("\n2⃣ 测试不使用代理时的IP...")
browser_ip_no_proxy = await get_browser_ip_via_playwright()
print(f" 🌐 Playwright无代理IP: {browser_ip_no_proxy}")
# 3. 测试使用代理时的IP
print("\n3⃣ 测试使用代理时的IP...")
# 从代理配置中获取代理信息
from damai_proxy_config import get_proxy_config
for i in range(2):
try:
proxy_config = get_proxy_config(i)
proxy_server = proxy_config['server'].replace('http://', '')
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_server}"
print(f" 代理{i+1}: {proxy_config['server']}")
# 获取使用代理时的IP
browser_ip_with_proxy = await get_browser_ip_via_playwright(proxy_url)
print(f" 🌐 Playwright使用代理{i+1}的IP: {browser_ip_with_proxy}")
# 比较IP地址
if browser_ip_with_proxy == local_ip:
print(f" ❌ 代理{i+1}测试失败: IP与本机IP相同代理未生效")
elif browser_ip_with_proxy == proxy_server.split(':')[0]: # 检查是否是代理服务器IP
print(f" ✅ 代理{i+1}测试成功: 使用了代理IP")
elif browser_ip_with_proxy != 'Parse Error' and browser_ip_with_proxy != local_ip:
print(f" ✅ 代理{i+1}测试成功: IP已改变代理生效")
else:
print(f" ⚠️ 代理{i+1}测试结果不确定: {browser_ip_with_proxy}")
except Exception as e:
print(f" ❌ 代理{i+1}测试出错: {str(e)}")
print() # 空行分隔
async def advanced_proxy_verification():
"""高级代理验证 - 使用多个IP检测服务"""
print("="*60)
print("🔬 高级代理IP验证")
print("="*60)
# IP检测服务列表
ip_services = [
'http://httpbin.org/ip',
'https://api.ipify.org?format=json',
'https://jsonip.com',
'https://httpbin.org/ip'
]
from damai_proxy_config import get_proxy_config
for i in range(2):
try:
proxy_config = get_proxy_config(i)
proxy_server = proxy_config['server'].replace('http://', '')
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_server}"
print(f"\n📊 验证代理 {i+1}: {proxy_config['server']}")
print("-" * 50)
async with async_playwright() as p:
launch_kwargs = {"headless": True, "proxy": {"server": proxy_url}}
browser = await p.chromium.launch(**launch_kwargs)
context = await browser.new_context()
page = await context.new_page()
for service in ip_services:
try:
print(f" 正在测试: {service}")
await page.goto(service, wait_until='networkidle', timeout=10000)
content = await page.content()
# 尝试解析IP
import re
import json
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
ip = data.get('origin') or data.get('ip') or 'Unknown'
print(f"{service}: {ip}")
except:
print(f"{service}: JSON解析失败")
else:
print(f"{service}: 未找到JSON数据")
except Exception as e:
print(f"{service}: {str(e)}")
await browser.close()
except Exception as e:
print(f"❌ 代理{i+1}高级验证失败: {str(e)}")
def show_proxy_format():
"""显示代理格式"""
print("="*60)
print("🔧 Playwright代理格式参考")
print("="*60)
from damai_proxy_config import get_proxy_config
for i in range(2):
proxy_config = get_proxy_config(i)
proxy_server = proxy_config['server'].replace('http://', '')
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_server}"
print(f"\n代理{i+1}:")
print(f" 原始地址: {proxy_config['server']}")
print(f" 用户名: {proxy_config['username']}")
print(f" 密码: {proxy_config['password']}")
print(f" Playwright格式: {proxy_url}")
print(f" 使用示例:")
print(f" browser = await playwright.chromium.launch(")
print(f" proxy={{'server': '{proxy_url}'}}")
print(f" )")
async def main():
"""主函数"""
# 显示代理格式
show_proxy_format()
print("\n" + "="*60)
# 基础验证
await verify_proxy_usage()
# 高级验证(可选,可能会比较耗时)
user_input = input("\n是否进行高级验证? 这将测试多个IP服务 (y/N): ")
if user_input.lower() == 'y':
await advanced_proxy_verification()
print(f"\n{'='*60}")
print("✅ 验证完成!")
print("="*60)
if __name__ == "__main__":
import sys
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
asyncio.run(main())