import time import random from typing import Optional, Tuple, List from playwright.sync_api import Page, ElementHandle from loguru import logger from config import Config from pathlib import Path from datetime import datetime class MIPAdAutomation: """MIP页面广告自动化操作""" # 预设的咨询语句 CONSULTATION_MESSAGES = [ "我想要预约一个医生,有什么推荐吗?", "我现在本人不在当地,医生什么时候有空,是随时能去吗?有没有推荐的医生。", "咱们医院是周六日是否上班,随时去吗?", "想找医生看看,有没有推荐的区生", "最近很不舒服,也说不出来全部的症状,能不能直接对话医生?" ] def __init__(self, page: Page, task_index: int = None): self.page = page self.site_id = None # 当前站点ID self.click_id = None # 当前点击ID self.task_folder = None # 任务日志目录 # 创建任务日志目录 if task_index: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') self.task_folder = Path("./test") / f"task_{task_index}_{timestamp}" self.task_folder.mkdir(parents=True, exist_ok=True) logger.info(f"任务日志目录: {self.task_folder}") def check_and_click_ad(self, url: str, site_id: int = None) -> Tuple[bool, bool]: """ 检查并点击广告 Args: url: MIP页面链接 site_id: 站点ID(用于数据库记录) Returns: (是否点击成功, 是否获得回复) """ self.site_id = site_id try: # 访问链接(带重试机制) max_retries = 2 page_loaded = False for attempt in range(max_retries): try: logger.info(f"访问链接: {url} (第{attempt+1}次尝试)") self.page.goto(url, wait_until='domcontentloaded', timeout=30000) page_loaded = True break except Exception as goto_err: if attempt < max_retries - 1: logger.warning(f"访问超时,尝试刷新页面...") try: self.page.reload(wait_until='domcontentloaded', timeout=30000) logger.info("✅ 页面刷新成功") page_loaded = True break except: logger.warning(f"刷新失败,等待2秒后重试...") time.sleep(2) else: logger.error(f"访问链接失败: {str(goto_err)}") # 记录访问失败 self._record_click_failure(url, f"访问超时: {str(goto_err)}") return False, False if not page_loaded: self._record_click_failure(url, "页面加载失败") return False, False # 等待页面加载 time.sleep(3) # 检查是否存在商业广告 has_ad, ad_elements = self._detect_commercial_ad() if not has_ad: logger.info("未检测到商业广告,跳过该链接") # 记录无广告 self._record_click_failure(url, "未检测到商业广告") return False, False # 逐个尝试点击广告,直到成功 logger.info(f"检测到商业广告,准备点击(共 {len(ad_elements)} 个)") click_success = False for idx, ad_element in enumerate(ad_elements, 1): logger.info(f"尝试点击第 {idx}/{len(ad_elements)} 个广告...") if self._click_advertisement(ad_element): logger.info(f"✅ 第 {idx} 个广告点击成功") click_success = True break else: logger.warning(f"❌ 第 {idx} 个广告点击失败,尝试下一个...") # 等待一下再点下一个 time.sleep(1) if not click_success: logger.warning("所有广告均点击失败") # 记录点击失败 self._record_click_failure(url, f"所有广告({len(ad_elements)}个)均点击失败") return False, False # 记录点击到数据库 self._record_click(url) # 发送咨询消息 message_sent = self._send_consultation_message() # 等待并检查回复 has_reply = self._wait_for_reply() # 记录互动到数据库 if message_sent: self._record_interaction(has_reply) return True, has_reply except Exception as e: logger.error(f"处理链接异常: {str(e)}") # 记录异常 try: self._record_click_failure(url, f"异常: {str(e)}") except: pass return False, False finally: # 尝试关闭当前标签页,返回主窗口 self._close_current_tab() def _detect_commercial_ad(self) -> Tuple[bool, List[ElementHandle]]: """ 检测页面是否存在商业广告 Returns: (是否存在商业广告, 广告元素列表) """ try: # 等待评论区加载 time.sleep(2) # 查找包含“广告”标识的元素 ad_selectors = [ "//div[contains(@class, 'ad') or contains(@class, 'advertisement')]", "//div[contains(text(), '广告')]", "//*[contains(text(), '广告')]//ancestor::div[contains(@class, 'card')]", "//a[contains(@class, 'ad-link')]", ] ad_elements = [] for selector in ad_selectors: try: elements = self.page.locator(f"xpath={selector}").all() if elements: # 检查元素是否可见 for elem in elements: if elem.is_visible(): # 进一步验证是否是商业广告(非AI健康管家) try: elem_text = elem.inner_text().lower() if '广告' in elem_text and 'ai健康' not in elem_text: ad_elements.append(elem) except: continue except Exception: continue if ad_elements: logger.info(f"检测到 {len(ad_elements)} 个商业广告") return True, ad_elements logger.info("未检测到商业广告") return False, [] except Exception as e: logger.error(f"检测广告异常: {str(e)}") return False, [] def _click_advertisement(self, ad_element: ElementHandle) -> bool: """ 点击广告元素(当前页面导航) Args: ad_element: 广告元素 Returns: 是否点击成功 """ try: original_url = self.page.url # 滚动到广告元素可见 ad_element.scroll_into_view_if_needed() time.sleep(1) # 直接点击广告(当前页面导航) logger.info("点击广告...") ad_element.click() logger.info("已点击广告") # 等待页面导航(增加等待时间,支持慢速电脑) logger.info("等待页面跳转...") max_wait = 10 # 最多等待10秒 check_interval = 1 # 每秒检查一次 for i in range(max_wait): time.sleep(check_interval) if self.page.url != original_url: logger.info(f"✅ 页面已导航(耗时{i+1}秒): {original_url} -> {self.page.url}") # 等待页面加载完成(最多15秒) try: logger.info("等待页面加载完成...") self.page.wait_for_load_state('domcontentloaded', timeout=15000) logger.info("✅ 页面加载完成") except Exception as load_err: logger.warning(f"⚠️ 页面加载超时,尝试刷新页面...") try: self.page.reload(wait_until='domcontentloaded', timeout=15000) logger.info("✅ 页面刷新成功") except Exception as refresh_err: logger.error(f"❌ 页面刷新失败: {str(refresh_err)}") return False break else: # 循环正常结束(未跳转) logger.error(f"❌ 页面URL未变化(等待{max_wait}秒后),广告点击失败: {self.page.url}") return False # 等待聊天页面加载 time.sleep(2) return True except Exception as e: logger.error(f"点击广告异常: {str(e)}") return False def _send_consultation_message(self) -> bool: """ 在聊天页面发送随机咨询消息 Returns: 是否发送成功 """ try: logger.info("准备发送咨询消息...") # 随机选择一条消息 message = random.choice(self.CONSULTATION_MESSAGES) logger.info(f"选择的消息: {message}") # 等待页面加载完成 time.sleep(2) # 打印当前页面URL logger.info(f"当前页面: {self.page.url}") # 常见的输入框选择器(优先通过placeholder查找) input_selectors = [ # 优先:通过placeholder查找 "textarea[placeholder*='消息']", "textarea[placeholder*='问题']", "input[type='text'][placeholder*='消息']", "input[type='text'][placeholder*='问题']", "textarea[placeholder*='输入']", "textarea[placeholder*='发送']", "input[type='text'][placeholder*='输入']", "input[type='text'][placeholder*='发送']", # 次选:通过class查找 "textarea[class*='input']", # 兜底:通用选择器 "div[contenteditable='true']", "textarea", "input[type='text']" ] input_element = None logger.info("开始查找输入框...") for selector in input_selectors: try: elements = self.page.locator(selector).all() logger.debug(f"选择器 {selector} 找到 {len(elements)} 个元素") for elem in elements: if elem.is_visible(): input_element = elem logger.info(f"✅ 找到可见输入框: {selector}") break if input_element: break except Exception as e: logger.debug(f"选择器 {selector} 失败: {str(e)}") continue if not input_element: logger.warning("❌ 未找到输入框") # 尝试截图便于调试 try: if self.task_folder: screenshot_path = self.task_folder / "debug_no_input.png" else: screenshot_path = Path(f"./logs/debug_no_input_{int(time.time())}.png") self.page.screenshot(path=str(screenshot_path)) logger.info(f"已保存调试截图: {screenshot_path}") except Exception as e: logger.warning(f"截图失败: {str(e)}") # 兜底方案:尝试查找并点击任何可能的输入区域 logger.warning("尝试兜底方案:查找所有可能的输入区域...") try: # 先滚动到页面最底部 self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)") time.sleep(1) # 尝试查找所有可能的输入相关元素并点击 fallback_selectors = [ "textarea", "input[type='text']", "div[contenteditable='true']", "div[class*='input']", "div[class*='textarea']", "div[class*='message']", "div[class*='chat']", "div[id*='input']", "div[id*='message']" ] clicked = False for selector in fallback_selectors: try: elements = self.page.locator(selector).all() logger.debug(f"兜底选择器 {selector} 找到 {len(elements)} 个元素") for elem in elements: if elem.is_visible(): # 滚动到元素位置 elem.scroll_into_view_if_needed() time.sleep(0.5) # 点击元素 elem.click() time.sleep(1) logger.info(f"已点击元素: {selector}") clicked = True break if clicked: break except Exception as e: logger.debug(f"兜底选择器 {selector} 失败: {str(e)}") continue if clicked: # 直接输入消息 self.page.keyboard.type(message, delay=50) logger.info("✅ 已输入消息(兜底)") # 直接按回车发送 self.page.keyboard.press('Enter') logger.info("✅ 已按回车键发送(兜底)") # 保存已发送的消息内容 self.sent_message = message time.sleep(2) return True else: logger.error("❌ 兜底方案未找到任何可点击的输入区域") return False except Exception as fallback_err: logger.error(f"兜底方案失败: {str(fallback_err)}") return False # 正常流程:点击输入框获取焦点 input_element.click() time.sleep(0.5) # 输入消息 input_element.fill(message) logger.info("✅ 已输入消息") time.sleep(1) # 尝试发送消息(优先回车,再尝试按钮) sent = False # 方法1(优先):按回车键发送 try: logger.info("尝试按回车键发送...") input_element.press('Enter') logger.info("✅ 已按回车键发送") sent = True time.sleep(1) except Exception as e: logger.warning(f"❌ 按回车键失败: {str(e)}") # 方法2(兜底): 尝试找到发送按钮并点击 if not sent: send_button_selectors = [ "button:has-text('发送')", "button[class*='send']", "button[type='submit']", "div[class*='send']", "span:has-text('发送')" ] logger.info("开始查找发送按钮...") for selector in send_button_selectors: try: buttons = self.page.locator(selector).all() logger.debug(f"选择器 {selector} 找到 {len(buttons)} 个按钮") for btn in buttons: if btn.is_visible() and btn.is_enabled(): btn.click() logger.info(f"✅ 已点击发送按钮: {selector}") sent = True break if sent: break except Exception as e: logger.debug(f"选择器 {selector} 失败: {str(e)}") continue if sent: logger.info("✅ 消息发送成功") # 保存已发送的消息内容 self.sent_message = message time.sleep(2) # 等待消息发送完成 return True else: logger.warning("❌ 未能发送消息") # 截图调试 try: if self.task_folder: screenshot_path = self.task_folder / "debug_send_failed.png" else: screenshot_path = Path(f"./logs/debug_send_failed_{int(time.time())}.png") self.page.screenshot(path=str(screenshot_path)) logger.info(f"已保存调试截图: {screenshot_path}") except: pass return False except Exception as e: logger.error(f"发送消息异常: {str(e)}") import traceback traceback.print_exc() return False def _record_click(self, site_url: str): """记录点击到数据库""" try: if not self.site_id: logger.warning("未设置 site_id,跳过点击记录") return from db_manager import ClickManager click_mgr = ClickManager() self.click_id = click_mgr.record_click( site_id=self.site_id, site_url=site_url, user_ip=None, # 可以后续添加代理IP device_type='pc' ) logger.info(f"已记录点击: click_id={self.click_id}") except Exception as e: logger.error(f"记录点击失败: {str(e)}") def _record_click_failure(self, site_url: str, error_message: str): """ 记录点击失败到数据库 Args: site_url: 站点URL error_message: 错误信息 """ try: if not self.site_id: logger.warning("未设置 site_id,跳过失败记录") return from db_manager import ClickManager click_mgr = ClickManager() # 记录点击(失败也计数) self.click_id = click_mgr.record_click( site_id=self.site_id, site_url=site_url, user_ip=None, device_type='pc' ) # 记录互动失败 from db_manager import InteractionManager interaction_mgr = InteractionManager() interaction_mgr.record_interaction( site_id=self.site_id, click_id=self.click_id, interaction_type='reply', reply_content=None, is_successful=False, response_received=False, error_message=error_message ) logger.info(f"已记录失败: {error_message}") except Exception as e: logger.error(f"记录失败异常: {str(e)}") def _record_interaction(self, response_received: bool): """记录互动到数据库""" try: if not self.site_id: logger.warning("未设置 site_id,跳过互动记录") return from db_manager import InteractionManager interaction_mgr = InteractionManager() interaction_id = interaction_mgr.record_interaction( site_id=self.site_id, click_id=self.click_id, interaction_type='message', # 符合数据库ENUM定义:reply/comment/message/form_submit/follow/like/share reply_content=getattr(self, 'sent_message', None), is_successful=True, response_received=response_received, response_content=None # 可以后续添加提取回复内容 ) logger.info(f"已记录互动: interaction_id={interaction_id}, response={response_received}") except Exception as e: logger.error(f"记录互动失败: {str(e)}") def _wait_for_reply(self) -> bool: """ 等待广告主回复 Returns: 是否收到回复 """ try: logger.info(f"等待广告主回复(最多{Config.REPLY_WAIT_TIMEOUT}秒)") # 检查是否已经自动发送消息 time.sleep(2) # 等待并检查回复 start_time = time.time() timeout = Config.REPLY_WAIT_TIMEOUT # 根据实际页面结构调整回复检测逻辑 # 这里使用轮询方式检查是否有新消息 initial_msg_count = self._count_messages() while time.time() - start_time < timeout: time.sleep(2) current_msg_count = self._count_messages() # 如果消息数量增加,说明收到了回复 if current_msg_count > initial_msg_count: logger.info("收到广告主回复") return True logger.info("未收到广告主回复(超时)") return False except Exception as e: logger.error(f"等待回复异常: {str(e)}") return False def _count_messages(self) -> int: """ 统计当前页面的消息数量 Returns: 消息数量 """ try: # 根据实际页面结构调整选择器 # 这里是示例选择器,需要根据实际情况修改 message_selectors = [ "//div[contains(@class, 'message')]", "//div[contains(@class, 'chat-message')]", "//div[contains(@class, 'msg-item')]", ] for selector in message_selectors: try: messages = self.page.locator(f"xpath={selector}").all() if messages: return len(messages) except: continue return 0 except Exception as e: logger.error(f"统计消息数量异常: {str(e)}") return 0 def _close_current_tab(self): """关闭当前标签页并返回主窗口""" try: pages = self.page.context.pages if len(pages) > 1: self.page.close() self.page = pages[0] logger.info("已关闭广告页面") except Exception as e: logger.error(f"关闭标签页异常: {str(e)}") def random_delay(self, min_seconds: int = 2, max_seconds: int = 5): """随机延迟,模拟人工操作""" delay = random.uniform(min_seconds, max_seconds) time.sleep(delay)