""" AdsPower + Playwright CDP 集成测试 演示如何通过 CDP 连接到 AdsPower 指纹浏览器 """ from loguru import logger from adspower_client import AdsPowerClient from config import Config from db_manager import SiteManager, ClickManager, InteractionManager import sys import os from datetime import datetime from pathlib import Path # 配置日志 logger.remove() logger.add( sys.stdout, format="{time:HH:mm:ss} | {level: <8} | {message}", level="DEBUG" # 改为DEBUG级别 ) def create_test_folder(test_url: str): """为每次测试创建独立的文件夹""" # 创建 test 目录 test_base_dir = Path("./test") test_base_dir.mkdir(exist_ok=True) # 生成文件夹名:时间_域名 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 提取域名作为文件夹名称的一部分 from urllib.parse import urlparse parsed = urlparse(test_url) domain = parsed.netloc.replace('.', '_').replace(':', '_') # 创建测试文件夹 test_folder = test_base_dir / f"{timestamp}_{domain}" test_folder.mkdir(exist_ok=True) # 在文件夹中创建 info.txt 记录测试信息 info_file = test_folder / "info.txt" with open(info_file, 'w', encoding='utf-8') as f: f.write(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"测试URL: {test_url}\n") f.write(f"测试环境: {Config.ENV}\n") return test_folder def test_adspower_connection(use_proxy: bool = False, proxy_info: dict = None, use_api_v1: bool = False): """测试 AdsPower + Playwright CDP 连接 Args: use_proxy: 是否使用大麦IP代理 proxy_info: 已获取的代理信息 use_api_v1: 是否使用API v1方式更新代理 """ # ==================== 配置区 ==================== # 访问的网页地址,在这里修改 TEST_URL = "https://health.baidu.com/m/detail/ar_2366617956693492811" # IP检测网站,可查看代理是否生效 # 其他可选项: # TEST_URL = "https://www.baidu.com" # 百度 # TEST_URL = "https://www.google.com" # Google # TEST_URL = "https://你的MIP页面地址" # 你的目标网页 # ===================================================== client = AdsPowerClient() site_id = None click_id = None sent_message = None # ============ 新增:创建测试文件夹 ============ test_folder = create_test_folder(TEST_URL) logger.info(f"测试文件夹: {test_folder}") # 配置日志输出到文件 log_file = test_folder / "test.log" logger.add( str(log_file), format="{time:HH:mm:ss} | {level: <8} | {message}", level="DEBUG" ) logger.info("=" * 60) logger.info("开始测试") logger.info("=" * 60) # ============================================ try: # ============ 新增:初始化数据库站点 ============ logger.info("=" * 60) logger.info("初始化: 创建或获取测试站点") logger.info("=" * 60) site_mgr = SiteManager() site = site_mgr.get_site_by_url(TEST_URL) if not site: site_id = site_mgr.add_site( site_url=TEST_URL, site_name="测试站点-Playwright", site_dimension="医疗健康" ) logger.info(f"✅ 创建测试站点: site_id={site_id}") else: site_id = site['id'] logger.info(f"✅ 使用已存在站点: site_id={site_id}") logger.info("") # ============================================ # 0. 先根据环境查询分组,然后查询 Profile 列表 logger.info("=" * 60) logger.info("步骤 0: 根据环境查询 Profile 列表") logger.info("=" * 60) # 获取当前环境对应的分组ID group_id = client.get_group_by_env() if group_id: logger.info(f"当前环境: {Config.ENV}, 分组ID: {group_id}") else: logger.warning(f"未找到环境 {Config.ENV} 对应的分组,将查询所有Profile") # 查询Profile列表(自动使用环境对应的分组) result = client.list_profiles(group_id=group_id) if not result: logger.error("查询 Profile 失败") return False profiles = result.get('data', {}).get('list', []) if not profiles: logger.error(f"在分组 {group_id} 中没有可用的 Profile") logger.error("请在 AdsPower 中创建 Profile 并分配到对应分组") logger.error(f"提示: {Config.ENV} 环境需要创建名为 '{'dev' if Config.ENV == 'development' else 'prod'}' 的分组") return False # 使用第一个 Profile first_profile = profiles[0] profile_id = first_profile.get('profile_id') profile_name = first_profile.get('name', 'N/A') logger.info(f"将使用 Profile: {profile_name} (ID: {profile_id})") logger.info("") # 1. 启动 AdsPower 浏览器(可选使用代理) logger.info("=" * 60) logger.info(f"步骤 1: {'[使用代理] ' if use_proxy else ''}启动 AdsPower 浏览器") if use_proxy: logger.info(f"代理更新方式: {'API v1 (直接传入proxy_config)' if use_api_v1 else 'API v2 (使用proxy_id引用)'}") logger.info("=" * 60) if use_proxy and proxy_info: if use_api_v1: # 使用 API v1 方式:直接传入 proxy_config logger.info("使用 API v1 方式更新代理...") proxy_config_v1 = { "proxy_type": "http", "proxy_host": proxy_info["host"], "proxy_port": proxy_info["port"], "proxy_user": client.DAMAI_USER, "proxy_password": client.DAMAI_PASSWORD, "proxy_soft": "other" } # 直接更新 Profile success = client.update_profile_proxy_v1(profile_id, proxy_config_v1) if not success: logger.warning("更新代理失败 (API v1),将不使用代理启动") else: # 使用 API v2 方式:先创建代理,再引用 logger.info("使用 API v2 方式更新代理...") proxy_config = { "type": "http", "host": proxy_info["host"], "port": proxy_info["port"], "user": client.DAMAI_USER, "password": client.DAMAI_PASSWORD, "ipchecker": "ip2location", "remark": "Damai Auto Proxy" } # 创建代理 proxy_id = client.create_proxy(proxy_config) if proxy_id: # 更新 Profile client.update_profile_proxy(profile_id, proxy_id) else: logger.warning("创建代理失败,将不使用代理启动") browser_info = client.start_browser(user_id=profile_id) else: browser_info = client.start_browser(user_id=profile_id) if not browser_info: logger.error("启动 AdsPower 浏览器失败") return False logger.info(f"浏览器信息: {browser_info}") # 2. 通过 CDP 连接到浏览器 logger.info("") logger.info("=" * 60) logger.info("步骤 2: 通过 CDP 连接到浏览器") logger.info("=" * 60) browser = client.connect_browser(browser_info) if not browser: logger.error("CDP 连接失败") return False logger.info(f"浏览器版本: {browser.version}") logger.info(f"上下文数量: {len(browser.contexts)}") # 3. 获取页面 logger.info("") logger.info("=" * 60) logger.info("步骤 3: 获取浏览器页面") logger.info("=" * 60) page = client.get_page(browser) if not page: logger.error("获取页面失败") return False logger.info(f"页面 URL: {page.url}") # 3.5. 关闭其他标签页,只保留AdsPower启动页 logger.info("") logger.info("=" * 60) logger.info("步骤 3.5: 清理多余标签页") logger.info("=" * 60) context = browser.contexts[0] all_pages = context.pages logger.info(f"当前打开的标签页数: {len(all_pages)}") # 遍历所有页面,关闭非 AdsPower 启动页 closed_count = 0 for p in all_pages: try: page_url = p.url # 保留 AdsPower 启动页 if 'start.adspower.net' in page_url: logger.info(f"保留启动页: {page_url}") else: logger.info(f"关闭标签页: {page_url}") p.close() closed_count += 1 except Exception as e: logger.warning(f"关闭页面失败: {str(e)}") logger.info(f"已关闭 {closed_count} 个标签页") # 重新获取当前页面列表 remaining_pages = context.pages logger.info(f"剩余标签页数: {len(remaining_pages)}") # 如果所有页面都被关闭了,创建一个新页面 if len(remaining_pages) == 0: logger.info("所有页面已关闭,创建新标签页") page = context.new_page() else: # 使用第一个剩余页面 page = remaining_pages[0] logger.info(f"使用剩余页面: {page.url}") # 4. 测试页面操作 logger.info("") logger.info("=" * 60) logger.info("步骤 4: 测试页面操作") logger.info("=" * 60) # 访问配置的网页 logger.info(f"访问测试页面: {TEST_URL}") page.goto(TEST_URL, wait_until='domcontentloaded', timeout=60000) # 等待页面完全加载 logger.info("等待页面完全加载...") import time time.sleep(3) try: page.wait_for_load_state('networkidle', timeout=10000) except Exception: logger.warning("网络空闲超时,继续执行") time.sleep(2) # 获取页面标题 title = page.title() logger.info(f"页面标题: {title}") # 获取页面 URL current_url = page.url logger.info(f"当前 URL: {current_url}") # 截图测试(点击前) screenshot_path = test_folder / "01_before_click.png" page.screenshot(path=str(screenshot_path)) logger.info(f"截图已保存: {screenshot_path}") # 查找并点击广告 logger.info("") logger.info("-" * 60) logger.info("开始查找广告元素...") try: # 查找所有广告元素 ad_selector = 'span.ec-tuiguang.ecfc-tuiguang.xz81bbe' ad_elements = page.locator(ad_selector) ad_count = ad_elements.count() logger.info(f"找到 {ad_count} 个广告元素") if ad_count > 0: # 点击第一个广告 logger.info("准备点击第一个广告...") # 滚动到元素可见 first_ad = ad_elements.first first_ad.scroll_into_view_if_needed() time.sleep(1) # 记录点击前的URL old_url = page.url logger.info(f"点击前URL: {old_url}") # 点击广告(页面内跳转) first_ad.click() logger.info("✅ 已点击第一个广告") # ============ 记录点击到数据库 ============ click_mgr = ClickManager() click_id = click_mgr.record_click( site_id=site_id, site_url=TEST_URL, user_ip=None, device_type='pc' ) logger.info(f"✅ 已记录点击: click_id={click_id}") # ============================================ # 等待页面跳转 time.sleep(3) page.wait_for_load_state('domcontentloaded') # 获取跳转后的URL new_url = page.url new_title = page.title() logger.info(f"跳转后URL: {new_url}") logger.info(f"跳转后标题: {new_title}") # 截图(跳转后) screenshot_path_after = test_folder / "02_after_click.png" page.screenshot(path=str(screenshot_path_after)) logger.info(f"跳转后截图已保存: {screenshot_path_after}") # ============ 新增:发送咨询消息 ============ logger.info("") logger.info("-" * 60) logger.info("开始发送咨询消息...") # 预设消息列表 consultation_messages = [ "我想要预约一个医生,有什么推荐吗?", "我现在本人不在当地,医生什么时候有空,是随时能去吗?有没有推荐的医生。", "咱们医院是周六日是否上班,随时去吗?", "想找医生看看,有没有推荐的医生", "最近很不舒服,也说不出来全部的症状,能不能直接对话医生?" ] # 随机选择一条消息 import random message = random.choice(consultation_messages) logger.info(f"选择的消息: {message}") # 等待输入框加载 time.sleep(2) # 滚动到页面底部,确保输入框可见 page.evaluate("window.scrollTo(0, document.body.scrollHeight)") time.sleep(1) # 输出页面HTML用于调试 logger.info("正在分析页面结构...") html_content = page.content() html_file = test_folder / "page_html.txt" with open(html_file, 'w', encoding='utf-8') as f: f.write(html_content) logger.info(f"页面HTML已保存: {html_file}") # 尝试查找输入框(通用策略) input_selectors = [ # contenteditable 类型 "textarea[contenteditable='true']", "div[contenteditable='true']", "*[contenteditable='true']", # 直接查找textarea "textarea", # 常见的 textarea "textarea[placeholder]", "textarea.input", "textarea[class*='input']", "textarea[class*='text']", "textarea[class*='box']", "textarea[class*='chat']", "textarea[class*='message']", # 常见的 input "input[type='text'][placeholder]", "input[class*='input']", "input[class*='text']", "input[class*='chat']", "input[class*='message']", # 全局备选 "input[type='text']" ] input_found = False for selector in input_selectors: try: logger.debug(f"尝试选择器: {selector}") count = page.locator(selector).count() logger.debug(f" 找到 {count} 个匹配元素") if count > 0: # 遍历所有匹配的元素,找第一个可见的 for i in range(count): try: input_elem = page.locator(selector).nth(i) is_visible = input_elem.is_visible(timeout=1000) logger.debug(f" 元素 {i}: 可见={is_visible}") if is_visible: logger.info(f"✅ 找到可见输入框: {selector} (第{i}个)") # 滚动到输入框可见区域 input_elem.scroll_into_view_if_needed() time.sleep(0.5) # 点击输入框获取焦点 input_elem.click() time.sleep(0.5) # 输入消息 input_elem.fill(message) logger.info("✅ 已输入消息") time.sleep(1) # 保存已发送的消息 sent_message = message input_found = True break except Exception as e2: logger.debug(f" 元素 {i} 失败: {str(e2)}") continue if input_found: break except Exception as e: logger.debug(f" 失败: {str(e)}") continue if not input_found: logger.warning("⚠️ 未找到输入框") debug_screenshot = test_folder / "debug_no_input.png" page.screenshot(path=str(debug_screenshot)) logger.info(f"已保存调试截图: {debug_screenshot}") # 兔底方案:点击页面底部上方位置,然后输入 logger.info("尝试兔底方案:点击页面底部区域...") try: # 获取页面高度 viewport_height = page.viewport_size['height'] # 点击底部上方10px的位置(水平居中) click_x = page.viewport_size['width'] // 2 click_y = viewport_height - 10 logger.debug(f"点击位置: ({click_x}, {click_y})") page.mouse.click(click_x, click_y) time.sleep(1) # 直接输入文本 page.keyboard.type(message, delay=50) logger.info("✅ 已输入消息(兔底方案)") time.sleep(1) sent_message = message input_found = True except Exception as e: logger.error(f"兔底方案失败: {str(e)}") else: # 尝试发送消息 message_sent = False # 方法1: 先尝试按 Enter 键 logger.info("尝试按 Enter 键发送...") try: page.keyboard.press('Enter') logger.info("✅ 已按 Enter 键") time.sleep(2) message_sent = True except Exception as e: logger.warning(f"按 Enter 键失败: {str(e)}") # 方法2: 如果 Enter 键失败,查找并点击发送按钮 if not message_sent: send_button_selectors = [ # 按文本查找 "button:has-text('发送')", "a:has-text('发送')", "span:has-text('发送')", "div:has-text('发送')", # 按类名查找 "button[class*='send']", "button[class*='submit']", "a[class*='send']", "div[class*='send']", "span[class*='send']", # 按类型查找 "button[type='submit']", # 全局备选 "button" ] for selector in send_button_selectors: try: send_btn = page.locator(selector).first if send_btn.is_visible() and send_btn.is_enabled(): send_btn.click() logger.info(f"✅ 已点击发送按钮: {selector}") message_sent = True break except Exception: continue if message_sent: logger.info("✅✅✅ 消息发送成功!") time.sleep(2) # ============ 记录互动到数据库 ============ interaction_mgr = InteractionManager() interaction_id = interaction_mgr.record_interaction( site_id=site_id, click_id=click_id, interaction_type='message', # 修复:使用 'message' 而非 'consultation' reply_content=sent_message, is_successful=True, response_received=False, # 后续可以添加检测逻辑 response_content=None ) logger.info(f"✅ 已记录互动: interaction_id={interaction_id}") # ============================================ # 截图(发送后) screenshot_path_sent = test_folder / "03_after_send.png" page.screenshot(path=str(screenshot_path_sent)) logger.info(f"发送后截图已保存: {screenshot_path_sent}") else: logger.warning("⚠️ 未能发送消息") # ============================================ else: logger.warning("⚠ 未找到广告元素") except Exception as e: logger.error(f"查找/点击广告或发送消息失败: {str(e)}") import traceback traceback.print_exc() # 保存错误时的截图 try: error_screenshot = test_folder / "error.png" page.screenshot(path=str(error_screenshot)) logger.info(f"错误截图已保存: {error_screenshot}") except: pass # 5. 清理资源 logger.info("") logger.info("=" * 60) logger.info("步骤 5: 测试完成 - 查询数据库记录") logger.info("=" * 60) # ============ 查询数据库记录 ============ if site_id: logger.info(f"\n站点ID: {site_id}") logger.info("-" * 60) # 查询点击记录 click_mgr = ClickManager() clicks = click_mgr.get_clicks_by_site(site_id, limit=5) click_count = click_mgr.get_click_count_by_site(site_id) logger.info(f"总点击次数: {click_count}") if clicks: last_click = clicks[0] logger.info(f"最新点击时间: {last_click['click_time']}") # 查询互动记录 interaction_mgr = InteractionManager() interactions = interaction_mgr.get_interactions_by_site(site_id, limit=5) success_count = interaction_mgr.get_successful_interactions_count(site_id) logger.info(f"成功互动次数: {success_count}") if interactions: last_interaction = interactions[0] logger.info(f"最新互动内容: {last_interaction['reply_content']}") logger.info(f"是否收到回复: {'是' if last_interaction['response_received'] else '否'}") # ============================================ logger.info("") # 关闭浏览器前,截图聊天页面最终状态 try: logger.info("截图聊天页面...") # 等待可能的回复消息加载 time.sleep(2) # 滚动到页面顶部,确保看到完整对话 page.evaluate("window.scrollTo(0, 0)") time.sleep(0.5) # 截图整个页面 screenshot_path_final = test_folder / "04_final_chat.png" page.screenshot(path=str(screenshot_path_final), full_page=True) logger.info(f"✅ 聊天页面截图已保存: {screenshot_path_final}") except Exception as screenshot_err: logger.warning(f"截图失败: {str(screenshot_err)}") logger.info("") # 优雅关闭 Playwright 连接,避免 CancelledError try: if browser: logger.debug("关闭 Playwright 浏览器连接...") browser.close() time.sleep(0.5) except Exception as close_err: logger.debug(f"关闭浏览器连接异常: {str(close_err)}") # 根据配置决定是否关闭浏览器 if Config.AUTO_CLOSE_BROWSER: logger.info("正在关闭浏览器...") try: client.stop_browser(user_id=profile_id) logger.info("✅ 浏览器已关闭") except Exception as e: logger.warning(f"关闭浏览器失败: {str(e)}") else: logger.info("浏览器保持运行状态,可继续手动操作") logger.info("如需停止浏览器,请在AdsPower中手动关闭") logger.info("") logger.info("="*60) logger.info("测试完成!") if Config.AUTO_CLOSE_BROWSER: logger.info("浏览器已关闭") else: logger.info("浏览器未关闭") logger.info("="*60) return True except Exception as e: logger.error(f"测试异常: {str(e)}") return False finally: # 注意:不自动清理资源,保持浏览器运行 pass def test_multiple_pages(): """测试多页面操作""" client = AdsPowerClient() profile_id = None try: logger.info("=" * 60) logger.info("测试多页面操作") logger.info("=" * 60) # 根据环境查询 Profile group_id = client.get_group_by_env() result = client.list_profiles(group_id=group_id) if not result: return False profiles = result.get('data', {}).get('list', []) if not profiles: logger.error("没有可用的 Profile") return False profile_id = profiles[0].get('profile_id') # 启动并连接 browser_info = client.start_browser(user_id=profile_id) if not browser_info: return False browser = client.connect_browser(browser_info) if not browser: return False # 获取第一个页面 page1 = client.get_page(browser) logger.info("访问百度...") page1.goto("https://www.baidu.com") logger.info(f"页面1标题: {page1.title()}") # 创建新页面 context = browser.contexts[0] page2 = context.new_page() logger.info("访问必应...") page2.goto("https://www.bing.com") logger.info(f"页面2标题: {page2.title()}") logger.info(f"当前打开的页面数: {len(context.pages)}") # 关闭页面 page2.close() logger.info("已关闭页面2") return True except Exception as e: logger.error(f"测试异常: {str(e)}") return False finally: try: if profile_id: client.stop_browser(user_id=profile_id) else: client.stop_browser() except: pass if __name__ == "__main__": logger.info("开始测试 AdsPower + Playwright CDP 集成") logger.info("") logger.info(f"当前环境: {Config.ENV}") logger.info(f"AdsPower API: {Config.ADSPOWER_API_URL}") logger.info("") # 创建客户端 client = AdsPowerClient() # ==================== 配置区 ==================== # 默认使用代理,如不需要改为 False use_proxy = True # 默认使用 API v2 Direct 方式(0=v2 proxy_id, 1=v1, 2=v2 direct) use_api_v1 = True # True=API v1, False=API v2 # ===================================================== proxy_info = None # 如果使用代理,提前获取 if use_proxy: logger.info("") logger.info(f"使用代理模式: {'API v1 (直接传入proxy_config)' if use_api_v1 else 'API v2 (使用proxy_id引用)'}") logger.info("步骤 0: 提前获取大麦IP代理") proxy_info = client.get_damai_proxy() if not proxy_info: logger.error("获取代理失败,终止测试") sys.exit(1) logger.info(f"代理地址: {proxy_info['host']}:{proxy_info['port']}") logger.info("") # 测试基本连接 if test_adspower_connection(use_proxy=use_proxy, proxy_info=proxy_info, use_api_v1=use_api_v1): logger.info("\n基本连接测试通过\n") else: logger.error("\n基本连接测试失败\n") sys.exit(1) # 测试多页面操作 # if test_multiple_pages(): # logger.info("\n多页面操作测试通过\n") # else: # logger.error("\n多页面操作测试失败\n")