Files
ai_wht_wechat/backend/test_headless_mode.py
2026-01-06 19:36:42 +08:00

356 lines
14 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
使用代理并开启有头模式的示例
展示如何在使用代理的同时开启浏览器界面
"""
import asyncio
from playwright.async_api import async_playwright
import sys
async def test_proxy_with_headless_false(proxy_index: int = 0):
"""使用代理并开启有头模式测试"""
print(f"\n{'='*60}")
print(f"🔍 测试代理 + 有头模式")
print(f"{'='*60}")
# 从代理配置获取代理信息
from damai_proxy_config import get_proxy_config
proxy_config = get_proxy_config(proxy_index)
proxy_server = proxy_config['server'].replace('http://', '')
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_server}"
print(f"✅ 使用代理: 代理{proxy_index + 1}")
print(f" 代理服务器: {proxy_config['server']}")
print(f" 有头模式: 开启")
try:
async with async_playwright() as p:
# 配置代理
proxy_parts = proxy_url.replace('http://', '').replace('https://', '').split('@')
if len(proxy_parts) == 2:
auth_part = proxy_parts[0]
server_part = proxy_parts[1]
username, password = auth_part.split(':')
proxy_config_obj = {
"server": f"http://{server_part}",
"username": username,
"password": password
}
else:
proxy_config_obj = {"server": proxy_url}
print(f" 配置的代理对象: {proxy_config_obj}")
# 启动浏览器 - 使用有头模式
browser = await p.chromium.launch(
headless=False, # 有头模式,可以看到浏览器界面
proxy=proxy_config_obj
)
# 创建上下文
context = await browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
)
# 创建页面
page = await context.new_page()
print(f"\n🌐 访问百度测试代理连接...")
try:
await page.goto('https://www.baidu.com', wait_until='networkidle', timeout=15000)
await asyncio.sleep(2)
title = await page.title()
url = page.url
print(f" ✅ 百度访问成功")
print(f" 标题: {title}")
print(f" URL: {url}")
except Exception as e:
print(f" ❌ 百度访问失败: {str(e)}")
print(f"\n🌐 访问小红书创作者平台...")
try:
await page.goto('https://creator.xiaohongshu.com/login', wait_until='networkidle', timeout=15000)
await asyncio.sleep(3)
title = await page.title()
url = page.url
content_len = len(await page.content())
print(f" 访问结果:")
print(f" 标题: {title}")
print(f" URL: {url}")
print(f" 内容长度: {content_len} 字符")
if content_len == 0:
print(f" ⚠️ 页面内容为空")
else:
print(f" ✅ 页面加载成功")
except Exception as e:
print(f" ❌ 小红书访问失败: {str(e)}")
print(f"\n⏸️ 浏览器保持打开状态,您可以观察页面")
print(f" 代理正在生效,您可以看到浏览器界面")
print(f" 按 Enter 键关闭浏览器...")
# 等待用户输入
input()
await browser.close()
print(f"✅ 浏览器已关闭")
except Exception as e:
print(f"❌ 测试过程异常: {str(e)}")
import traceback
traceback.print_exc()
async def test_xhs_login_with_headless_false(phone: str, proxy_index: int = 0):
"""
使用有头模式测试小红书登录流程
Args:
phone: 手机号
proxy_index: 代理索引 (0 或 1)
"""
print(f"\n{'='*60}")
print(f"📱 使用有头模式测试小红书登录")
print(f"{'='*60}")
# 从代理配置获取代理信息
from damai_proxy_config import get_proxy_config
proxy_config = get_proxy_config(proxy_index)
proxy_server = proxy_config['server'].replace('http://', '')
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_server}"
print(f"✅ 使用代理: 代理{proxy_index + 1}")
print(f" 代理服务器: {proxy_config['server']}")
print(f" 手机号: {phone}")
print(f" 有头模式: 开启")
# 创建登录服务,使用有头模式
from xhs_login import XHSLoginService
login_service = XHSLoginService(use_pool=False) # 不使用池,便于调试
try:
# 初始化浏览器(使用代理 + 有头模式)
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
# 注意XHSLoginService 内部使用了浏览器池模式,我们先看看如何修改它来支持有头模式
print(" 正在启动浏览器(使用代理 + 有头模式)...")
# 直接使用Playwright创建有头模式的浏览器
async with async_playwright() as p:
# 配置代理
proxy_parts = proxy_url.replace('http://', '').replace('https://', '').split('@')
if len(proxy_parts) == 2:
auth_part = proxy_parts[0]
server_part = proxy_parts[1]
username, password = auth_part.split(':')
proxy_config_obj = {
"server": f"http://{server_part}",
"username": username,
"password": password
}
else:
proxy_config_obj = {"server": proxy_url}
# 启动浏览器 - 有头模式
browser = await p.chromium.launch(
headless=False, # 有头模式
proxy=proxy_config_obj
)
context = await browser.new_context(
user_agent=user_agent,
viewport={'width': 1280, 'height': 720}
)
page = await context.new_page()
print("✅ 浏览器启动成功(有头模式 + 代理)")
# 访问小红书登录页面
print(f"\n🌐 访问小红书创作者平台登录页...")
await page.goto('https://creator.xiaohongshu.com/login', wait_until='networkidle', timeout=30000)
await asyncio.sleep(2)
print(f"✅ 进入登录页面")
print(f" 当前URL: {page.url}")
# 查找手机号输入框
print(f"\n🔍 查找手机号输入框...")
try:
# 尝试多种选择器
phone_input_selectors = [
'input[placeholder="手机号"]',
'input[placeholder*="手机"]',
'input[type="tel"]',
'input[type="text"]'
]
phone_input = None
for selector in phone_input_selectors:
try:
phone_input = await page.wait_for_selector(selector, timeout=3000)
if phone_input:
print(f" ✅ 找到手机号输入框: {selector}")
break
except:
continue
if phone_input:
# 输入手机号
await phone_input.fill(phone)
print(f" ✅ 已输入手机号: {phone}")
# 等待界面更新
await asyncio.sleep(1)
# 查找发送验证码按钮
print(f"\n🔍 查找发送验证码按钮...")
code_button_selectors = [
'text="发送验证码"',
'text="获取验证码"',
'button:has-text("验证码")',
'button:has-text("发送")',
'div:has-text("验证码")'
]
code_button = None
for selector in code_button_selectors:
try:
code_button = await page.wait_for_selector(selector, timeout=3000)
if code_button:
print(f" ✅ 找到验证码按钮: {selector}")
break
except:
continue
if code_button:
print(f"\n 已找到手机号输入框和验证码按钮")
print(f" 您可以在浏览器中手动点击发送验证码")
print(f" 验证码将发送到: {phone}")
print(f"\n⏸️ 浏览器保持打开状态,您可以手动操作")
print(f" 按 Enter 键关闭浏览器...")
input()
else:
print(f" ❌ 未找到发送验证码按钮")
else:
print(f" ❌ 未找到手机号输入框")
print(f"\n📄 页面上可用的输入框:")
inputs = await page.query_selector_all('input')
for i, inp in enumerate(inputs):
try:
placeholder = await inp.get_attribute('placeholder')
input_type = await inp.get_attribute('type')
print(f" 输入框 {i+1}: type={input_type}, placeholder={placeholder}")
except:
continue
except Exception as e:
print(f" ❌ 操作失败: {str(e)}")
# 保持浏览器打开供用户观察
print(f"\n⏸️ 浏览器保持打开状态,您可以观察页面元素")
print(f" 按 Enter 键关闭浏览器...")
input()
await browser.close()
print(f"✅ 浏览器已关闭")
except Exception as e:
print(f"❌ 测试过程异常: {str(e)}")
import traceback
traceback.print_exc()
def show_headless_comparison():
"""显示有头模式和无头模式的对比"""
print("="*60)
print("💡 有头模式 vs 无头模式对比")
print("="*60)
print("\n有头模式 (headless=False):")
print(" ✅ 优点:")
print(" • 可以看到浏览器界面,便于调试")
print(" • 可以观察页面加载过程")
print(" • 可以手动与页面交互")
print(" • 有助于识别页面元素选择器")
print("")
print(" ❌ 缺点:")
print(" • 占用屏幕空间")
print(" • 可能影响用户其他操作")
print(" • 资源消耗稍大")
print("\n无头模式 (headless=True):")
print(" ✅ 优点:")
print(" • 不显示浏览器界面,后台运行")
print(" • 资源消耗较少")
print(" • 适合自动化任务")
print(" • 可以在服务器环境运行")
print("")
print(" ❌ 缺点:")
print(" • 无法直观看到页面")
print(" • 调试相对困难")
print("\n🎯 使用建议:")
print(" • 开发调试时使用有头模式")
print(" • 生产环境使用无头模式")
print(" • 代理配置在两种模式下都有效")
async def main():
"""主函数"""
show_headless_comparison()
print(f"\n{'='*60}")
print("🎯 选择测试模式")
print("="*60)
print("\n1. 基础代理 + 有头模式测试")
print("2. 小红书登录 + 有头模式测试")
try:
choice = input("\n请选择测试模式 (1-2, 默认为1): ").strip()
if choice not in ['1', '2']:
choice = '1'
proxy_choice = input("请选择代理 (0 或 1, 默认为0): ").strip()
if proxy_choice not in ['0', '1']:
proxy_choice = '0'
proxy_idx = int(proxy_choice)
if choice == '1':
await test_proxy_with_headless_false(proxy_idx)
elif choice == '2':
phone = input("请输入手机号: ").strip()
if not phone:
print("❌ 手机号不能为空")
return
await test_xhs_login_with_headless_false(phone, proxy_idx)
print(f"\n{'='*60}")
print("✅ 测试完成!")
print("="*60)
except KeyboardInterrupt:
print("\n\n⚠️ 测试被用户中断")
except Exception as e:
print(f"\n❌ 测试过程中出现错误: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
# Windows环境下设置事件循环策略
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
# 运行测试
asyncio.run(main())