Files
ai_mip/ad_automation.py
2026-01-16 22:06:46 +08:00

577 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import random
from typing import Optional, Tuple
from playwright.sync_api import Page, ElementHandle
from loguru import logger
from config import Config
from pathlib import Path
from datetime import datetime
class MIPAdAutomation:
"""MIP页面广告自动化操作"""
# 预设的咨询语句
CONSULTATION_MESSAGES = [
"我想要预约一个医生,有什么推荐吗?",
"我现在本人不在当地,医生什么时候有空,是随时能去吗?有没有推荐的医生。",
"咱们医院是周六日是否上班,随时去吗?",
"想找医生看看,有没有推荐的区生",
"最近很不舒服,也说不出来全部的症状,能不能直接对话医生?"
]
def __init__(self, page: Page, task_index: int = None):
self.page = page
self.site_id = None # 当前站点ID
self.click_id = None # 当前点击ID
self.task_folder = None # 任务日志目录
# 创建任务日志目录
if task_index:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.task_folder = Path("./test") / f"task_{task_index}_{timestamp}"
self.task_folder.mkdir(parents=True, exist_ok=True)
logger.info(f"任务日志目录: {self.task_folder}")
def check_and_click_ad(self, url: str, site_id: int = None) -> Tuple[bool, bool]:
"""
检查并点击广告
Args:
url: MIP页面链接
site_id: 站点ID用于数据库记录
Returns:
(是否点击成功, 是否获得回复)
"""
self.site_id = site_id
try:
# 访问链接(带重试机制)
max_retries = 2
page_loaded = False
for attempt in range(max_retries):
try:
logger.info(f"访问链接: {url} (第{attempt+1}次尝试)")
self.page.goto(url, wait_until='domcontentloaded', timeout=30000)
page_loaded = True
break
except Exception as goto_err:
if attempt < max_retries - 1:
logger.warning(f"访问超时,尝试刷新页面...")
try:
self.page.reload(wait_until='domcontentloaded', timeout=30000)
logger.info("✅ 页面刷新成功")
page_loaded = True
break
except:
logger.warning(f"刷新失败等待2秒后重试...")
time.sleep(2)
else:
logger.error(f"访问链接失败: {str(goto_err)}")
# 记录访问失败
self._record_click_failure(url, f"访问超时: {str(goto_err)}")
return False, False
if not page_loaded:
self._record_click_failure(url, "页面加载失败")
return False, False
# 等待页面加载
time.sleep(3)
# 检查是否存在商业广告
has_ad, ad_element = self._detect_commercial_ad()
if not has_ad:
logger.info("未检测到商业广告,跳过该链接")
return False, False
# 点击广告
logger.info("检测到商业广告,准备点击")
if not self._click_advertisement(ad_element):
logger.warning("点击广告失败")
return False, False
# 记录点击到数据库
self._record_click(url)
# 发送咨询消息
message_sent = self._send_consultation_message()
# 等待并检查回复
has_reply = self._wait_for_reply()
# 记录互动到数据库
if message_sent:
self._record_interaction(has_reply)
return True, has_reply
except Exception as e:
logger.error(f"处理链接异常: {str(e)}")
return False, False
finally:
# 尝试关闭当前标签页,返回主窗口
self._close_current_tab()
def _detect_commercial_ad(self) -> Tuple[bool, Optional[ElementHandle]]:
"""
检测页面是否存在商业广告
Returns:
(是否存在商业广告, 广告元素)
"""
try:
# 等待评论区加载
time.sleep(2)
# 方法1: 查找包含"广告"标识的元素
# 根据实际页面结构调整选择器
ad_selectors = [
"//div[contains(@class, 'ad') or contains(@class, 'advertisement')]",
"//div[contains(text(), '广告')]",
"//*[contains(text(), '广告')]//ancestor::div[contains(@class, 'card')]",
"//a[contains(@class, 'ad-link')]",
]
for selector in ad_selectors:
try:
elements = self.page.locator(f"xpath={selector}").all()
if elements:
# 检查元素是否可见
for elem in elements:
if elem.is_visible():
# 进一步验证是否是商业广告非AI健康管家
elem_text = elem.inner_text().lower()
if '广告' in elem_text and 'ai健康' not in elem_text:
logger.info("检测到商业广告")
return True, elem
except Exception:
continue
logger.info("未检测到商业广告")
return False, None
except Exception as e:
logger.error(f"检测广告异常: {str(e)}")
return False, None
def _click_advertisement(self, ad_element: ElementHandle) -> bool:
"""
点击广告元素(当前页面导航)
Args:
ad_element: 广告元素
Returns:
是否点击成功
"""
try:
original_url = self.page.url
# 滚动到广告元素可见
ad_element.scroll_into_view_if_needed()
time.sleep(1)
# 直接点击广告(当前页面导航)
logger.info("点击广告...")
ad_element.click()
logger.info("已点击广告")
# 等待页面导航(增加等待时间,支持慢速电脑)
logger.info("等待页面跳转...")
max_wait = 10 # 最多等待10秒
check_interval = 1 # 每秒检查一次
for i in range(max_wait):
time.sleep(check_interval)
if self.page.url != original_url:
logger.info(f"✅ 页面已导航(耗时{i+1}秒): {original_url} -> {self.page.url}")
self.page.wait_for_load_state('domcontentloaded')
break
else:
# 循环正常结束(未跳转)
logger.error(f"❌ 页面URL未变化等待{max_wait}秒后),广告点击失败: {self.page.url}")
return False
# 等待聊天页面加载
time.sleep(2)
return True
except Exception as e:
logger.error(f"点击广告异常: {str(e)}")
return False
def _send_consultation_message(self) -> bool:
"""
在聊天页面发送随机咨询消息
Returns:
是否发送成功
"""
try:
logger.info("准备发送咨询消息...")
# 随机选择一条消息
message = random.choice(self.CONSULTATION_MESSAGES)
logger.info(f"选择的消息: {message}")
# 等待页面加载完成
time.sleep(2)
# 打印当前页面URL
logger.info(f"当前页面: {self.page.url}")
# 常见的输入框选择器优先通过placeholder查找
input_selectors = [
# 优先通过placeholder查找
"textarea[placeholder*='消息']",
"textarea[placeholder*='问题']",
"input[type='text'][placeholder*='消息']",
"input[type='text'][placeholder*='问题']",
"textarea[placeholder*='输入']",
"textarea[placeholder*='发送']",
"input[type='text'][placeholder*='输入']",
"input[type='text'][placeholder*='发送']",
# 次选通过class查找
"textarea[class*='input']",
# 兜底:通用选择器
"div[contenteditable='true']",
"textarea",
"input[type='text']"
]
input_element = None
logger.info("开始查找输入框...")
for selector in input_selectors:
try:
elements = self.page.locator(selector).all()
logger.debug(f"选择器 {selector} 找到 {len(elements)} 个元素")
for elem in elements:
if elem.is_visible():
input_element = elem
logger.info(f"✅ 找到可见输入框: {selector}")
break
if input_element:
break
except Exception as e:
logger.debug(f"选择器 {selector} 失败: {str(e)}")
continue
if not input_element:
logger.warning("❌ 未找到输入框")
# 尝试截图便于调试
try:
if self.task_folder:
screenshot_path = self.task_folder / "debug_no_input.png"
else:
screenshot_path = Path(f"./logs/debug_no_input_{int(time.time())}.png")
self.page.screenshot(path=str(screenshot_path))
logger.info(f"已保存调试截图: {screenshot_path}")
except Exception as e:
logger.warning(f"截图失败: {str(e)}")
# 兜底方案:尝试查找并点击任何可能的输入区域
logger.warning("尝试兜底方案:查找所有可能的输入区域...")
try:
# 先滚动到页面最底部
self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
time.sleep(1)
# 尝试查找所有可能的输入相关元素并点击
fallback_selectors = [
"textarea",
"input[type='text']",
"div[contenteditable='true']",
"div[class*='input']",
"div[class*='textarea']",
"div[class*='message']",
"div[class*='chat']",
"div[id*='input']",
"div[id*='message']"
]
clicked = False
for selector in fallback_selectors:
try:
elements = self.page.locator(selector).all()
logger.debug(f"兜底选择器 {selector} 找到 {len(elements)} 个元素")
for elem in elements:
if elem.is_visible():
# 滚动到元素位置
elem.scroll_into_view_if_needed()
time.sleep(0.5)
# 点击元素
elem.click()
time.sleep(1)
logger.info(f"已点击元素: {selector}")
clicked = True
break
if clicked:
break
except Exception as e:
logger.debug(f"兜底选择器 {selector} 失败: {str(e)}")
continue
if clicked:
# 直接输入消息
self.page.keyboard.type(message, delay=50)
logger.info("✅ 已输入消息(兜底)")
# 直接按回车发送
self.page.keyboard.press('Enter')
logger.info("✅ 已按回车键发送(兜底)")
# 保存已发送的消息内容
self.sent_message = message
time.sleep(2)
return True
else:
logger.error("❌ 兜底方案未找到任何可点击的输入区域")
return False
except Exception as fallback_err:
logger.error(f"兜底方案失败: {str(fallback_err)}")
return False
# 正常流程:点击输入框获取焦点
input_element.click()
time.sleep(0.5)
# 输入消息
input_element.fill(message)
logger.info("✅ 已输入消息")
time.sleep(1)
# 尝试发送消息(优先回车,再尝试按钮)
sent = False
# 方法1优先按回车键发送
try:
logger.info("尝试按回车键发送...")
input_element.press('Enter')
logger.info("✅ 已按回车键发送")
sent = True
time.sleep(1)
except Exception as e:
logger.warning(f"❌ 按回车键失败: {str(e)}")
# 方法2兜底: 尝试找到发送按钮并点击
if not sent:
send_button_selectors = [
"button:has-text('发送')",
"button[class*='send']",
"button[type='submit']",
"div[class*='send']",
"span:has-text('发送')"
]
logger.info("开始查找发送按钮...")
for selector in send_button_selectors:
try:
buttons = self.page.locator(selector).all()
logger.debug(f"选择器 {selector} 找到 {len(buttons)} 个按钮")
for btn in buttons:
if btn.is_visible() and btn.is_enabled():
btn.click()
logger.info(f"✅ 已点击发送按钮: {selector}")
sent = True
break
if sent:
break
except Exception as e:
logger.debug(f"选择器 {selector} 失败: {str(e)}")
continue
if sent:
logger.info("✅ 消息发送成功")
# 保存已发送的消息内容
self.sent_message = message
time.sleep(2) # 等待消息发送完成
return True
else:
logger.warning("❌ 未能发送消息")
# 截图调试
try:
if self.task_folder:
screenshot_path = self.task_folder / "debug_send_failed.png"
else:
screenshot_path = Path(f"./logs/debug_send_failed_{int(time.time())}.png")
self.page.screenshot(path=str(screenshot_path))
logger.info(f"已保存调试截图: {screenshot_path}")
except:
pass
return False
except Exception as e:
logger.error(f"发送消息异常: {str(e)}")
import traceback
traceback.print_exc()
return False
def _record_click(self, site_url: str):
"""记录点击到数据库"""
try:
if not self.site_id:
logger.warning("未设置 site_id跳过点击记录")
return
from db_manager import ClickManager
click_mgr = ClickManager()
self.click_id = click_mgr.record_click(
site_id=self.site_id,
site_url=site_url,
user_ip=None, # 可以后续添加代理IP
device_type='pc'
)
logger.info(f"已记录点击: click_id={self.click_id}")
except Exception as e:
logger.error(f"记录点击失败: {str(e)}")
def _record_click_failure(self, site_url: str, error_message: str):
"""
记录点击失败到数据库
Args:
site_url: 站点URL
error_message: 错误信息
"""
try:
if not self.site_id:
logger.warning("未设置 site_id跳过失败记录")
return
from db_manager import ClickManager
click_mgr = ClickManager()
# 记录点击(失败也计数)
self.click_id = click_mgr.record_click(
site_id=self.site_id,
site_url=site_url,
user_ip=None,
device_type='pc'
)
# 记录互动失败
from db_manager import InteractionManager
interaction_mgr = InteractionManager()
interaction_mgr.record_interaction(
site_id=self.site_id,
click_id=self.click_id,
interaction_type='reply',
reply_content=None,
is_successful=False,
response_received=False,
error_message=error_message
)
logger.info(f"已记录失败: {error_message}")
except Exception as e:
logger.error(f"记录失败异常: {str(e)}")
def _record_interaction(self, response_received: bool):
"""记录互动到数据库"""
try:
if not self.site_id:
logger.warning("未设置 site_id跳过互动记录")
return
from db_manager import InteractionManager
interaction_mgr = InteractionManager()
interaction_id = interaction_mgr.record_interaction(
site_id=self.site_id,
click_id=self.click_id,
interaction_type='message', # 符合数据库ENUM定义reply/comment/message/form_submit/follow/like/share
reply_content=getattr(self, 'sent_message', None),
is_successful=True,
response_received=response_received,
response_content=None # 可以后续添加提取回复内容
)
logger.info(f"已记录互动: interaction_id={interaction_id}, response={response_received}")
except Exception as e:
logger.error(f"记录互动失败: {str(e)}")
def _wait_for_reply(self) -> bool:
"""
等待广告主回复
Returns:
是否收到回复
"""
try:
logger.info(f"等待广告主回复(最多{Config.REPLY_WAIT_TIMEOUT}秒)")
# 检查是否已经自动发送消息
time.sleep(2)
# 等待并检查回复
start_time = time.time()
timeout = Config.REPLY_WAIT_TIMEOUT
# 根据实际页面结构调整回复检测逻辑
# 这里使用轮询方式检查是否有新消息
initial_msg_count = self._count_messages()
while time.time() - start_time < timeout:
time.sleep(2)
current_msg_count = self._count_messages()
# 如果消息数量增加,说明收到了回复
if current_msg_count > initial_msg_count:
logger.info("收到广告主回复")
return True
logger.info("未收到广告主回复(超时)")
return False
except Exception as e:
logger.error(f"等待回复异常: {str(e)}")
return False
def _count_messages(self) -> int:
"""
统计当前页面的消息数量
Returns:
消息数量
"""
try:
# 根据实际页面结构调整选择器
# 这里是示例选择器,需要根据实际情况修改
message_selectors = [
"//div[contains(@class, 'message')]",
"//div[contains(@class, 'chat-message')]",
"//div[contains(@class, 'msg-item')]",
]
for selector in message_selectors:
try:
messages = self.page.locator(f"xpath={selector}").all()
if messages:
return len(messages)
except:
continue
return 0
except Exception as e:
logger.error(f"统计消息数量异常: {str(e)}")
return 0
def _close_current_tab(self):
"""关闭当前标签页并返回主窗口"""
try:
pages = self.page.context.pages
if len(pages) > 1:
self.page.close()
self.page = pages[0]
logger.info("已关闭广告页面")
except Exception as e:
logger.error(f"关闭标签页异常: {str(e)}")
def random_delay(self, min_seconds: int = 2, max_seconds: int = 5):
"""随机延迟,模拟人工操作"""
delay = random.uniform(min_seconds, max_seconds)
time.sleep(delay)