Files
ai_mip/test_adspower_playwright.py
2026-01-16 22:06:46 +08:00

778 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
AdsPower + Playwright CDP 集成测试
演示如何通过 CDP 连接到 AdsPower 指纹浏览器
"""
from loguru import logger
from adspower_client import AdsPowerClient
from config import Config
from db_manager import SiteManager, ClickManager, InteractionManager
import sys
import os
from datetime import datetime
from pathlib import Path
# 配置日志
logger.remove()
logger.add(
sys.stdout,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>",
level="DEBUG" # 改为DEBUG级别
)
def create_test_folder(test_url: str):
"""为每次测试创建独立的文件夹"""
# 创建 test 目录
test_base_dir = Path("./test")
test_base_dir.mkdir(exist_ok=True)
# 生成文件夹名时间_域名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 提取域名作为文件夹名称的一部分
from urllib.parse import urlparse
parsed = urlparse(test_url)
domain = parsed.netloc.replace('.', '_').replace(':', '_')
# 创建测试文件夹
test_folder = test_base_dir / f"{timestamp}_{domain}"
test_folder.mkdir(exist_ok=True)
# 在文件夹中创建 info.txt 记录测试信息
info_file = test_folder / "info.txt"
with open(info_file, 'w', encoding='utf-8') as f:
f.write(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"测试URL: {test_url}\n")
f.write(f"测试环境: {Config.ENV}\n")
return test_folder
def test_adspower_connection(use_proxy: bool = False, proxy_info: dict = None, use_api_v1: bool = False):
"""测试 AdsPower + Playwright CDP 连接
Args:
use_proxy: 是否使用大麦IP代理
proxy_info: 已获取的代理信息
use_api_v1: 是否使用API v1方式更新代理
"""
# ==================== 配置区 ====================
# 访问的网页地址,在这里修改
TEST_URL = "https://health.baidu.com/m/detail/ar_2366617956693492811" # IP检测网站可查看代理是否生效
# 其他可选项:
# TEST_URL = "https://www.baidu.com" # 百度
# TEST_URL = "https://www.google.com" # Google
# TEST_URL = "https://你的MIP页面地址" # 你的目标网页
# =====================================================
client = AdsPowerClient()
site_id = None
click_id = None
sent_message = None
# ============ 新增:创建测试文件夹 ============
test_folder = create_test_folder(TEST_URL)
logger.info(f"测试文件夹: {test_folder}")
# 配置日志输出到文件
log_file = test_folder / "test.log"
logger.add(
str(log_file),
format="{time:HH:mm:ss} | {level: <8} | {message}",
level="DEBUG"
)
logger.info("=" * 60)
logger.info("开始测试")
logger.info("=" * 60)
# ============================================
try:
# ============ 新增:初始化数据库站点 ============
logger.info("=" * 60)
logger.info("初始化: 创建或获取测试站点")
logger.info("=" * 60)
site_mgr = SiteManager()
site = site_mgr.get_site_by_url(TEST_URL)
if not site:
site_id = site_mgr.add_site(
site_url=TEST_URL,
site_name="测试站点-Playwright",
site_dimension="医疗健康"
)
logger.info(f"✅ 创建测试站点: site_id={site_id}")
else:
site_id = site['id']
logger.info(f"✅ 使用已存在站点: site_id={site_id}")
logger.info("")
# ============================================
# 0. 先根据环境查询分组,然后查询 Profile 列表
logger.info("=" * 60)
logger.info("步骤 0: 根据环境查询 Profile 列表")
logger.info("=" * 60)
# 获取当前环境对应的分组ID
group_id = client.get_group_by_env()
if group_id:
logger.info(f"当前环境: {Config.ENV}, 分组ID: {group_id}")
else:
logger.warning(f"未找到环境 {Config.ENV} 对应的分组,将查询所有Profile")
# 查询Profile列表(自动使用环境对应的分组)
result = client.list_profiles(group_id=group_id)
if not result:
logger.error("查询 Profile 失败")
return False
profiles = result.get('data', {}).get('list', [])
if not profiles:
logger.error(f"在分组 {group_id} 中没有可用的 Profile")
logger.error("请在 AdsPower 中创建 Profile 并分配到对应分组")
logger.error(f"提示: {Config.ENV} 环境需要创建名为 '{'dev' if Config.ENV == 'development' else 'prod'}' 的分组")
return False
# 使用第一个 Profile
first_profile = profiles[0]
profile_id = first_profile.get('profile_id')
profile_name = first_profile.get('name', 'N/A')
logger.info(f"将使用 Profile: {profile_name} (ID: {profile_id})")
logger.info("")
# 1. 启动 AdsPower 浏览器(可选使用代理)
logger.info("=" * 60)
logger.info(f"步骤 1: {'[使用代理] ' if use_proxy else ''}启动 AdsPower 浏览器")
if use_proxy:
logger.info(f"代理更新方式: {'API v1 (直接传入proxy_config)' if use_api_v1 else 'API v2 (使用proxy_id引用)'}")
logger.info("=" * 60)
if use_proxy and proxy_info:
if use_api_v1:
# 使用 API v1 方式:直接传入 proxy_config
logger.info("使用 API v1 方式更新代理...")
proxy_config_v1 = {
"proxy_type": "http",
"proxy_host": proxy_info["host"],
"proxy_port": proxy_info["port"],
"proxy_user": client.DAMAI_USER,
"proxy_password": client.DAMAI_PASSWORD,
"proxy_soft": "other"
}
# 直接更新 Profile
success = client.update_profile_proxy_v1(profile_id, proxy_config_v1)
if not success:
logger.warning("更新代理失败 (API v1),将不使用代理启动")
else:
# 使用 API v2 方式:先创建代理,再引用
logger.info("使用 API v2 方式更新代理...")
proxy_config = {
"type": "http",
"host": proxy_info["host"],
"port": proxy_info["port"],
"user": client.DAMAI_USER,
"password": client.DAMAI_PASSWORD,
"ipchecker": "ip2location",
"remark": "Damai Auto Proxy"
}
# 创建代理
proxy_id = client.create_proxy(proxy_config)
if proxy_id:
# 更新 Profile
client.update_profile_proxy(profile_id, proxy_id)
else:
logger.warning("创建代理失败,将不使用代理启动")
browser_info = client.start_browser(user_id=profile_id)
else:
browser_info = client.start_browser(user_id=profile_id)
if not browser_info:
logger.error("启动 AdsPower 浏览器失败")
return False
logger.info(f"浏览器信息: {browser_info}")
# 2. 通过 CDP 连接到浏览器
logger.info("")
logger.info("=" * 60)
logger.info("步骤 2: 通过 CDP 连接到浏览器")
logger.info("=" * 60)
browser = client.connect_browser(browser_info)
if not browser:
logger.error("CDP 连接失败")
return False
logger.info(f"浏览器版本: {browser.version}")
logger.info(f"上下文数量: {len(browser.contexts)}")
# 3. 获取页面
logger.info("")
logger.info("=" * 60)
logger.info("步骤 3: 获取浏览器页面")
logger.info("=" * 60)
page = client.get_page(browser)
if not page:
logger.error("获取页面失败")
return False
logger.info(f"页面 URL: {page.url}")
# 3.5. 关闭其他标签页只保留AdsPower启动页
logger.info("")
logger.info("=" * 60)
logger.info("步骤 3.5: 清理多余标签页")
logger.info("=" * 60)
context = browser.contexts[0]
all_pages = context.pages
logger.info(f"当前打开的标签页数: {len(all_pages)}")
# 遍历所有页面,关闭非 AdsPower 启动页
closed_count = 0
for p in all_pages:
try:
page_url = p.url
# 保留 AdsPower 启动页
if 'start.adspower.net' in page_url:
logger.info(f"保留启动页: {page_url}")
else:
logger.info(f"关闭标签页: {page_url}")
p.close()
closed_count += 1
except Exception as e:
logger.warning(f"关闭页面失败: {str(e)}")
logger.info(f"已关闭 {closed_count} 个标签页")
# 重新获取当前页面列表
remaining_pages = context.pages
logger.info(f"剩余标签页数: {len(remaining_pages)}")
# 如果所有页面都被关闭了,创建一个新页面
if len(remaining_pages) == 0:
logger.info("所有页面已关闭,创建新标签页")
page = context.new_page()
else:
# 使用第一个剩余页面
page = remaining_pages[0]
logger.info(f"使用剩余页面: {page.url}")
# 4. 测试页面操作
logger.info("")
logger.info("=" * 60)
logger.info("步骤 4: 测试页面操作")
logger.info("=" * 60)
# 访问配置的网页
logger.info(f"访问测试页面: {TEST_URL}")
page.goto(TEST_URL, wait_until='domcontentloaded', timeout=60000)
# 等待页面完全加载
logger.info("等待页面完全加载...")
import time
time.sleep(3)
try:
page.wait_for_load_state('networkidle', timeout=10000)
except Exception:
logger.warning("网络空闲超时,继续执行")
time.sleep(2)
# 获取页面标题
title = page.title()
logger.info(f"页面标题: {title}")
# 获取页面 URL
current_url = page.url
logger.info(f"当前 URL: {current_url}")
# 截图测试(点击前)
screenshot_path = test_folder / "01_before_click.png"
page.screenshot(path=str(screenshot_path))
logger.info(f"截图已保存: {screenshot_path}")
# 查找并点击广告
logger.info("")
logger.info("-" * 60)
logger.info("开始查找广告元素...")
try:
# 查找所有广告元素
ad_selector = 'span.ec-tuiguang.ecfc-tuiguang.xz81bbe'
ad_elements = page.locator(ad_selector)
ad_count = ad_elements.count()
logger.info(f"找到 {ad_count} 个广告元素")
if ad_count > 0:
# 点击第一个广告
logger.info("准备点击第一个广告...")
# 滚动到元素可见
first_ad = ad_elements.first
first_ad.scroll_into_view_if_needed()
time.sleep(1)
# 记录点击前的URL
old_url = page.url
logger.info(f"点击前URL: {old_url}")
# 点击广告(页面内跳转)
first_ad.click()
logger.info("✅ 已点击第一个广告")
# ============ 记录点击到数据库 ============
click_mgr = ClickManager()
click_id = click_mgr.record_click(
site_id=site_id,
site_url=TEST_URL,
user_ip=None,
device_type='pc'
)
logger.info(f"✅ 已记录点击: click_id={click_id}")
# ============================================
# 等待页面跳转
time.sleep(3)
page.wait_for_load_state('domcontentloaded')
# 获取跳转后的URL
new_url = page.url
new_title = page.title()
logger.info(f"跳转后URL: {new_url}")
logger.info(f"跳转后标题: {new_title}")
# 截图(跳转后)
screenshot_path_after = test_folder / "02_after_click.png"
page.screenshot(path=str(screenshot_path_after))
logger.info(f"跳转后截图已保存: {screenshot_path_after}")
# ============ 新增:发送咨询消息 ============
logger.info("")
logger.info("-" * 60)
logger.info("开始发送咨询消息...")
# 预设消息列表
consultation_messages = [
"我想要预约一个医生,有什么推荐吗?",
"我现在本人不在当地,医生什么时候有空,是随时能去吗?有没有推荐的医生。",
"咱们医院是周六日是否上班,随时去吗?",
"想找医生看看,有没有推荐的医生",
"最近很不舒服,也说不出来全部的症状,能不能直接对话医生?"
]
# 随机选择一条消息
import random
message = random.choice(consultation_messages)
logger.info(f"选择的消息: {message}")
# 等待输入框加载
time.sleep(2)
# 滚动到页面底部,确保输入框可见
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
time.sleep(1)
# 输出页面HTML用于调试
logger.info("正在分析页面结构...")
html_content = page.content()
html_file = test_folder / "page_html.txt"
with open(html_file, 'w', encoding='utf-8') as f:
f.write(html_content)
logger.info(f"页面HTML已保存: {html_file}")
# 尝试查找输入框(通用策略)
input_selectors = [
# contenteditable 类型
"textarea[contenteditable='true']",
"div[contenteditable='true']",
"*[contenteditable='true']",
# 直接查找textarea
"textarea",
# 常见的 textarea
"textarea[placeholder]",
"textarea.input",
"textarea[class*='input']",
"textarea[class*='text']",
"textarea[class*='box']",
"textarea[class*='chat']",
"textarea[class*='message']",
# 常见的 input
"input[type='text'][placeholder]",
"input[class*='input']",
"input[class*='text']",
"input[class*='chat']",
"input[class*='message']",
# 全局备选
"input[type='text']"
]
input_found = False
for selector in input_selectors:
try:
logger.debug(f"尝试选择器: {selector}")
count = page.locator(selector).count()
logger.debug(f" 找到 {count} 个匹配元素")
if count > 0:
# 遍历所有匹配的元素,找第一个可见的
for i in range(count):
try:
input_elem = page.locator(selector).nth(i)
is_visible = input_elem.is_visible(timeout=1000)
logger.debug(f" 元素 {i}: 可见={is_visible}")
if is_visible:
logger.info(f"✅ 找到可见输入框: {selector} (第{i}个)")
# 滚动到输入框可见区域
input_elem.scroll_into_view_if_needed()
time.sleep(0.5)
# 点击输入框获取焦点
input_elem.click()
time.sleep(0.5)
# 输入消息
input_elem.fill(message)
logger.info("✅ 已输入消息")
time.sleep(1)
# 保存已发送的消息
sent_message = message
input_found = True
break
except Exception as e2:
logger.debug(f" 元素 {i} 失败: {str(e2)}")
continue
if input_found:
break
except Exception as e:
logger.debug(f" 失败: {str(e)}")
continue
if not input_found:
logger.warning("⚠️ 未找到输入框")
debug_screenshot = test_folder / "debug_no_input.png"
page.screenshot(path=str(debug_screenshot))
logger.info(f"已保存调试截图: {debug_screenshot}")
# 兔底方案:点击页面底部上方位置,然后输入
logger.info("尝试兔底方案:点击页面底部区域...")
try:
# 获取页面高度
viewport_height = page.viewport_size['height']
# 点击底部上方10px的位置水平居中
click_x = page.viewport_size['width'] // 2
click_y = viewport_height - 10
logger.debug(f"点击位置: ({click_x}, {click_y})")
page.mouse.click(click_x, click_y)
time.sleep(1)
# 直接输入文本
page.keyboard.type(message, delay=50)
logger.info("✅ 已输入消息(兔底方案)")
time.sleep(1)
sent_message = message
input_found = True
except Exception as e:
logger.error(f"兔底方案失败: {str(e)}")
else:
# 尝试发送消息
message_sent = False
# 方法1: 先尝试按 Enter 键
logger.info("尝试按 Enter 键发送...")
try:
page.keyboard.press('Enter')
logger.info("✅ 已按 Enter 键")
time.sleep(2)
message_sent = True
except Exception as e:
logger.warning(f"按 Enter 键失败: {str(e)}")
# 方法2: 如果 Enter 键失败,查找并点击发送按钮
if not message_sent:
send_button_selectors = [
# 按文本查找
"button:has-text('发送')",
"a:has-text('发送')",
"span:has-text('发送')",
"div:has-text('发送')",
# 按类名查找
"button[class*='send']",
"button[class*='submit']",
"a[class*='send']",
"div[class*='send']",
"span[class*='send']",
# 按类型查找
"button[type='submit']",
# 全局备选
"button"
]
for selector in send_button_selectors:
try:
send_btn = page.locator(selector).first
if send_btn.is_visible() and send_btn.is_enabled():
send_btn.click()
logger.info(f"✅ 已点击发送按钮: {selector}")
message_sent = True
break
except Exception:
continue
if message_sent:
logger.info("✅✅✅ 消息发送成功!")
time.sleep(2)
# ============ 记录互动到数据库 ============
interaction_mgr = InteractionManager()
interaction_id = interaction_mgr.record_interaction(
site_id=site_id,
click_id=click_id,
interaction_type='message', # 修复:使用 'message' 而非 'consultation'
reply_content=sent_message,
is_successful=True,
response_received=False, # 后续可以添加检测逻辑
response_content=None
)
logger.info(f"✅ 已记录互动: interaction_id={interaction_id}")
# ============================================
# 截图(发送后)
screenshot_path_sent = test_folder / "03_after_send.png"
page.screenshot(path=str(screenshot_path_sent))
logger.info(f"发送后截图已保存: {screenshot_path_sent}")
else:
logger.warning("⚠️ 未能发送消息")
# ============================================
else:
logger.warning("⚠ 未找到广告元素")
except Exception as e:
logger.error(f"查找/点击广告或发送消息失败: {str(e)}")
import traceback
traceback.print_exc()
# 保存错误时的截图
try:
error_screenshot = test_folder / "error.png"
page.screenshot(path=str(error_screenshot))
logger.info(f"错误截图已保存: {error_screenshot}")
except:
pass
# 5. 清理资源
logger.info("")
logger.info("=" * 60)
logger.info("步骤 5: 测试完成 - 查询数据库记录")
logger.info("=" * 60)
# ============ 查询数据库记录 ============
if site_id:
logger.info(f"\n站点ID: {site_id}")
logger.info("-" * 60)
# 查询点击记录
click_mgr = ClickManager()
clicks = click_mgr.get_clicks_by_site(site_id, limit=5)
click_count = click_mgr.get_click_count_by_site(site_id)
logger.info(f"总点击次数: {click_count}")
if clicks:
last_click = clicks[0]
logger.info(f"最新点击时间: {last_click['click_time']}")
# 查询互动记录
interaction_mgr = InteractionManager()
interactions = interaction_mgr.get_interactions_by_site(site_id, limit=5)
success_count = interaction_mgr.get_successful_interactions_count(site_id)
logger.info(f"成功互动次数: {success_count}")
if interactions:
last_interaction = interactions[0]
logger.info(f"最新互动内容: {last_interaction['reply_content']}")
logger.info(f"是否收到回复: {'' if last_interaction['response_received'] else ''}")
# ============================================
logger.info("")
# 关闭浏览器前,截图聊天页面最终状态
try:
logger.info("截图聊天页面...")
# 等待可能的回复消息加载
time.sleep(2)
# 滚动到页面顶部,确保看到完整对话
page.evaluate("window.scrollTo(0, 0)")
time.sleep(0.5)
# 截图整个页面
screenshot_path_final = test_folder / "04_final_chat.png"
page.screenshot(path=str(screenshot_path_final), full_page=True)
logger.info(f"✅ 聊天页面截图已保存: {screenshot_path_final}")
except Exception as screenshot_err:
logger.warning(f"截图失败: {str(screenshot_err)}")
logger.info("")
# 优雅关闭 Playwright 连接,避免 CancelledError
try:
if browser:
logger.debug("关闭 Playwright 浏览器连接...")
browser.close()
time.sleep(0.5)
except Exception as close_err:
logger.debug(f"关闭浏览器连接异常: {str(close_err)}")
# 根据配置决定是否关闭浏览器
if Config.AUTO_CLOSE_BROWSER:
logger.info("正在关闭浏览器...")
try:
client.stop_browser(user_id=profile_id)
logger.info("✅ 浏览器已关闭")
except Exception as e:
logger.warning(f"关闭浏览器失败: {str(e)}")
else:
logger.info("浏览器保持运行状态,可继续手动操作")
logger.info("如需停止浏览器请在AdsPower中手动关闭")
logger.info("")
logger.info("="*60)
logger.info("测试完成!")
if Config.AUTO_CLOSE_BROWSER:
logger.info("浏览器已关闭")
else:
logger.info("浏览器未关闭")
logger.info("="*60)
return True
except Exception as e:
logger.error(f"测试异常: {str(e)}")
return False
finally:
# 注意:不自动清理资源,保持浏览器运行
pass
def test_multiple_pages():
"""测试多页面操作"""
client = AdsPowerClient()
profile_id = None
try:
logger.info("=" * 60)
logger.info("测试多页面操作")
logger.info("=" * 60)
# 根据环境查询 Profile
group_id = client.get_group_by_env()
result = client.list_profiles(group_id=group_id)
if not result:
return False
profiles = result.get('data', {}).get('list', [])
if not profiles:
logger.error("没有可用的 Profile")
return False
profile_id = profiles[0].get('profile_id')
# 启动并连接
browser_info = client.start_browser(user_id=profile_id)
if not browser_info:
return False
browser = client.connect_browser(browser_info)
if not browser:
return False
# 获取第一个页面
page1 = client.get_page(browser)
logger.info("访问百度...")
page1.goto("https://www.baidu.com")
logger.info(f"页面1标题: {page1.title()}")
# 创建新页面
context = browser.contexts[0]
page2 = context.new_page()
logger.info("访问必应...")
page2.goto("https://www.bing.com")
logger.info(f"页面2标题: {page2.title()}")
logger.info(f"当前打开的页面数: {len(context.pages)}")
# 关闭页面
page2.close()
logger.info("已关闭页面2")
return True
except Exception as e:
logger.error(f"测试异常: {str(e)}")
return False
finally:
try:
if profile_id:
client.stop_browser(user_id=profile_id)
else:
client.stop_browser()
except:
pass
if __name__ == "__main__":
logger.info("开始测试 AdsPower + Playwright CDP 集成")
logger.info("")
logger.info(f"当前环境: {Config.ENV}")
logger.info(f"AdsPower API: {Config.ADSPOWER_API_URL}")
logger.info("")
# 创建客户端
client = AdsPowerClient()
# ==================== 配置区 ====================
# 默认使用代理,如不需要改为 False
use_proxy = True
# 默认使用 API v2 Direct 方式0=v2 proxy_id, 1=v1, 2=v2 direct
use_api_v1 = True # True=API v1, False=API v2
# =====================================================
proxy_info = None
# 如果使用代理,提前获取
if use_proxy:
logger.info("")
logger.info(f"使用代理模式: {'API v1 (直接传入proxy_config)' if use_api_v1 else 'API v2 (使用proxy_id引用)'}")
logger.info("步骤 0: 提前获取大麦IP代理")
proxy_info = client.get_damai_proxy()
if not proxy_info:
logger.error("获取代理失败,终止测试")
sys.exit(1)
logger.info(f"代理地址: {proxy_info['host']}:{proxy_info['port']}")
logger.info("")
# 测试基本连接
if test_adspower_connection(use_proxy=use_proxy, proxy_info=proxy_info, use_api_v1=use_api_v1):
logger.info("\n基本连接测试通过\n")
else:
logger.error("\n基本连接测试失败\n")
sys.exit(1)
# 测试多页面操作
# if test_multiple_pages():
# logger.info("\n多页面操作测试通过\n")
# else:
# logger.error("\n多页面操作测试失败\n")