1941 lines
85 KiB
Python
1941 lines
85 KiB
Python
import time
|
||
import random
|
||
import json
|
||
import re
|
||
import requests
|
||
from typing import Optional, Tuple, List, Dict
|
||
from playwright.sync_api import Page, ElementHandle, Response
|
||
from loguru import logger
|
||
from config import Config
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
|
||
|
||
class MIPAdAutomation:
|
||
"""MIP页面广告自动化操作"""
|
||
|
||
# 预设的咨询语句
|
||
CONSULTATION_MESSAGES = [
|
||
"我想要预约一个医生,有什么推荐吗?",
|
||
"我现在本人不在当地,医生什么时候有空,是随时能去吗?有没有推荐的医生。",
|
||
"咱们医院是周六日是否上班,随时去吗?",
|
||
"想找医生看看,有没有推荐的区生",
|
||
"最近很不舒服,也说不出来全部的症状,能不能直接对话医生?"
|
||
]
|
||
|
||
# Ada平台API端点
|
||
ADA_HEARTBEAT_API = 'ada.baidu.com/gateway/message/heartbeat'
|
||
ADA_RECOMMEND_API = 'ada.baidu.com/imlp-extend/agent/getRecommendContent'
|
||
|
||
def __init__(self, page: Page, task_index: int = None):
|
||
self.page = page
|
||
self.site_id = None # 当前站点ID
|
||
self.click_id = None # 当前点击ID
|
||
self.task_folder = None # 任务日志目录
|
||
|
||
# 医生回复相关
|
||
self.doctor_replies: List[Dict] = [] # 存储医生回复
|
||
self.recommend_replies: List[Dict] = [] # 存储推荐回复
|
||
self._response_listener_active = False # 响应监听器状态
|
||
|
||
# 聊天历史(用于AI对话上下文)
|
||
self.chat_history: List[Dict] = []
|
||
|
||
# 浮窗状态
|
||
self._overlay_injected = False
|
||
self.task_index = task_index
|
||
|
||
# 创建任务日志目录
|
||
if task_index:
|
||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
self.task_folder = Path("./test") / f"task_{task_index}_{timestamp}"
|
||
self.task_folder.mkdir(parents=True, exist_ok=True)
|
||
logger.info(f"任务日志目录: {self.task_folder}")
|
||
|
||
def _inject_overlay(self):
|
||
"""注入浮窗到页面"""
|
||
if self._overlay_injected:
|
||
return
|
||
|
||
try:
|
||
js_code = """
|
||
(function() {
|
||
// 检查是否已存在
|
||
if (document.getElementById('mip-progress-overlay')) return;
|
||
|
||
// 创建浮窗容器
|
||
var overlay = document.createElement('div');
|
||
overlay.id = 'mip-progress-overlay';
|
||
overlay.style.cssText = `
|
||
position: fixed;
|
||
top: 10px;
|
||
right: 10px;
|
||
width: 280px;
|
||
background: rgba(0, 0, 0, 0.85);
|
||
color: #fff;
|
||
padding: 12px 15px;
|
||
border-radius: 8px;
|
||
font-family: 'Microsoft YaHei', sans-serif;
|
||
font-size: 13px;
|
||
z-index: 999999;
|
||
box-shadow: 0 4px 12px rgba(0,0,0,0.3);
|
||
line-height: 1.6;
|
||
`;
|
||
|
||
// 标题
|
||
var title = document.createElement('div');
|
||
title.style.cssText = 'font-weight: bold; font-size: 14px; margin-bottom: 8px; color: #4CAF50; border-bottom: 1px solid #444; padding-bottom: 6px;';
|
||
title.innerHTML = '🤖 自动化进度';
|
||
overlay.appendChild(title);
|
||
|
||
// 状态内容
|
||
var content = document.createElement('div');
|
||
content.id = 'mip-progress-content';
|
||
content.innerHTML = '初始化中...';
|
||
overlay.appendChild(content);
|
||
|
||
document.body.appendChild(overlay);
|
||
})();
|
||
"""
|
||
self.page.evaluate(js_code)
|
||
self._overlay_injected = True
|
||
logger.debug("浮窗已注入页面")
|
||
except Exception as e:
|
||
logger.debug(f"注入浮窗失败: {str(e)}")
|
||
|
||
def _update_overlay(self, status: str, details: str = ""):
|
||
"""
|
||
更新浮窗显示内容
|
||
|
||
Args:
|
||
status: 当前状态
|
||
details: 详细信息
|
||
"""
|
||
try:
|
||
self._inject_overlay()
|
||
|
||
task_info = f"任务 #{self.task_index}" if self.task_index else "任务"
|
||
doctor_count = len(self.doctor_replies)
|
||
recommend_count = len(self.recommend_replies)
|
||
|
||
html = f"""
|
||
<div style='margin-bottom: 6px;'><b>{task_info}</b></div>
|
||
<div style='color: #4CAF50;'>📍 {status}</div>
|
||
<div style='color: #aaa; font-size: 12px; margin-top: 4px;'>{details}</div>
|
||
<div style='margin-top: 8px; padding-top: 6px; border-top: 1px solid #444; font-size: 11px; color: #888;'>
|
||
医生消息: {doctor_count} | 推荐回复: {recommend_count}
|
||
</div>
|
||
"""
|
||
|
||
js_code = f"""
|
||
(function() {{
|
||
var content = document.getElementById('mip-progress-content');
|
||
if (content) {{
|
||
content.innerHTML = `{html}`;
|
||
}}
|
||
}})();
|
||
"""
|
||
self.page.evaluate(js_code)
|
||
except Exception as e:
|
||
logger.debug(f"更新浮窗失败: {str(e)}")
|
||
|
||
def _remove_overlay(self):
|
||
"""移除浮窗"""
|
||
try:
|
||
js_code = """
|
||
(function() {
|
||
var overlay = document.getElementById('mip-progress-overlay');
|
||
if (overlay) overlay.remove();
|
||
})();
|
||
"""
|
||
self.page.evaluate(js_code)
|
||
self._overlay_injected = False
|
||
except:
|
||
pass
|
||
|
||
def _human_click(self, element, description: str = ""):
|
||
"""
|
||
模拟真人点击(随机偏移、随机延迟)
|
||
|
||
Args:
|
||
element: 要点击的元素
|
||
description: 描述信息
|
||
"""
|
||
try:
|
||
box = element.bounding_box()
|
||
if not box:
|
||
element.click()
|
||
return
|
||
|
||
# 随机偏移(不要点在正中心)
|
||
offset_x = random.uniform(-box['width'] * 0.3, box['width'] * 0.3)
|
||
offset_y = random.uniform(-box['height'] * 0.3, box['height'] * 0.3)
|
||
|
||
click_x = box['x'] + box['width'] / 2 + offset_x
|
||
click_y = box['y'] + box['height'] / 2 + offset_y
|
||
|
||
# 先移动鼠标到目标位置(模拟真人移动)
|
||
self.page.mouse.move(click_x, click_y, steps=random.randint(5, 15))
|
||
|
||
# 随机延迟后点击
|
||
time.sleep(random.uniform(0.1, 0.3))
|
||
self.page.mouse.click(click_x, click_y)
|
||
|
||
if description:
|
||
logger.debug(f"真人点击: {description} ({click_x:.0f}, {click_y:.0f})")
|
||
|
||
except Exception as e:
|
||
logger.debug(f"真人点击失败,使用普通点击: {str(e)}")
|
||
element.click()
|
||
|
||
def _human_type(self, message: str):
|
||
"""
|
||
模拟真人输入(逐字符输入,随机延迟)
|
||
|
||
Args:
|
||
message: 要输入的消息
|
||
"""
|
||
try:
|
||
for char in message:
|
||
# 随机输入延迟(50-200ms)
|
||
delay = random.uniform(0.05, 0.2)
|
||
self.page.keyboard.type(char, delay=0)
|
||
time.sleep(delay)
|
||
|
||
# 偶尔停顿一下(模拟思考)
|
||
if random.random() < 0.05:
|
||
time.sleep(random.uniform(0.3, 0.8))
|
||
|
||
except Exception as e:
|
||
logger.debug(f"真人输入失败,使用普通输入: {str(e)}")
|
||
self.page.keyboard.type(message, delay=30)
|
||
|
||
def _clean_message_content(self, content: str) -> Optional[str]:
|
||
"""
|
||
清理消息内容,过滤HTML标签和JSON命令
|
||
|
||
Args:
|
||
content: 原始消息内容
|
||
|
||
Returns:
|
||
清理后的内容,如果是无效消息返回None
|
||
"""
|
||
if not content:
|
||
return None
|
||
|
||
# 过滤JSON命令消息
|
||
if content.strip().startswith('{') and '"type":"cmd"' in content:
|
||
return None
|
||
|
||
# 移除HTML标签
|
||
clean_content = re.sub(r'<[^>]+>', '', content)
|
||
|
||
# 去除多余空白
|
||
clean_content = clean_content.strip()
|
||
|
||
# 如果清理后为空,返回None
|
||
if not clean_content:
|
||
return None
|
||
|
||
return clean_content
|
||
|
||
def _setup_response_listener(self):
|
||
"""设置API响应监听器,监听heartbeat和recommend接口"""
|
||
if self._response_listener_active:
|
||
return
|
||
|
||
def handle_response(response: Response):
|
||
try:
|
||
url = response.url
|
||
|
||
# 调试:打印所有 ada.baidu.com 的请求
|
||
if 'ada.baidu.com' in url:
|
||
logger.debug(f"[API请求] {url[:100]}...")
|
||
|
||
# 监听heartbeat API - 获取医生回复
|
||
if self.ADA_HEARTBEAT_API in url and response.status == 200:
|
||
try:
|
||
data = response.json()
|
||
logger.debug(f"[Heartbeat] 响应: {json.dumps(data, ensure_ascii=False)[:500]}")
|
||
|
||
if data.get('status') == 0 and data.get('data', {}).get('talk'):
|
||
talks = data['data']['talk']
|
||
for talk in talks:
|
||
# 检查是否是医生/客服消息
|
||
if talk.get('source') == 'service' or talk.get('msgFrom') == 'service':
|
||
raw_content = talk.get('content', '')
|
||
msg_id = talk.get('messageId', '')
|
||
msg_time = talk.get('messageTime', '')
|
||
|
||
# 清理消息内容(过滤HTML和JSON命令)
|
||
content = self._clean_message_content(raw_content)
|
||
if not content:
|
||
logger.debug(f"[Heartbeat] 跳过无效消息: {raw_content[:50]}...")
|
||
continue
|
||
|
||
# 检查是否是新消息(避免重复)
|
||
if not any(r.get('messageId') == msg_id for r in self.doctor_replies):
|
||
reply_info = {
|
||
'content': content,
|
||
'messageId': msg_id,
|
||
'messageTime': msg_time,
|
||
'source': 'service',
|
||
'received_at': datetime.now().isoformat()
|
||
}
|
||
self.doctor_replies.append(reply_info)
|
||
logger.info(f"[医生回复] 收到新消息: {content[:100]}...")
|
||
except Exception as e:
|
||
logger.debug(f"[Heartbeat] 解析响应失败: {str(e)}")
|
||
|
||
# 监听recommend API - 获取推荐回复
|
||
elif self.ADA_RECOMMEND_API in url and response.status == 200:
|
||
try:
|
||
data = response.json()
|
||
logger.debug(f"[Recommend] 响应: {json.dumps(data, ensure_ascii=False)[:500]}")
|
||
|
||
if data.get('status') == 200 and data.get('data', {}).get('suggestInfo'):
|
||
suggest_info = data['data']['suggestInfo']
|
||
for suggest in suggest_info:
|
||
replies = suggest.get('suggestReply', [])
|
||
for reply in replies:
|
||
reply_id = reply.get('replyId', '')
|
||
# 避免重复
|
||
if not any(r.get('replyId') == reply_id for r in self.recommend_replies):
|
||
self.recommend_replies.append({
|
||
'replyId': reply_id,
|
||
'text': reply.get('text', ''),
|
||
'value': reply.get('value', ''),
|
||
'type': reply.get('type', 'text'),
|
||
'received_at': datetime.now().isoformat()
|
||
})
|
||
if self.recommend_replies:
|
||
logger.info(f"[推荐回复] 收到 {len(replies)} 条推荐: {[r['text'] for r in self.recommend_replies[-len(replies):]]}")
|
||
except Exception as e:
|
||
logger.debug(f"[Recommend] 解析响应失败: {str(e)}")
|
||
|
||
except Exception as e:
|
||
logger.debug(f"处理响应异常: {str(e)}")
|
||
|
||
self.page.on('response', handle_response)
|
||
self._response_listener_active = True
|
||
logger.info("已启用Ada平台API响应监听")
|
||
|
||
def _clear_reply_data(self):
|
||
"""清空回复数据"""
|
||
self.doctor_replies = []
|
||
self.recommend_replies = []
|
||
self.chat_history = []
|
||
self.sent_messages = [] # 记录所有发送的消息
|
||
|
||
def _call_qwen_api(self, doctor_message: str) -> Optional[str]:
|
||
"""
|
||
调用千问大模型生成回复
|
||
|
||
Args:
|
||
doctor_message: 医生/客服的消息
|
||
|
||
Returns:
|
||
生成的回复内容,失败返回None
|
||
"""
|
||
try:
|
||
# 构建系统提示词
|
||
system_prompt = """你是一个正在咨询医疗问题的普通患者。请根据对方(医生/客服)的回复,以患者身份自然地继续对话。
|
||
|
||
要求:
|
||
1. 回复要简短自然,像真实患者一样说话
|
||
2. 可以询问价格、时间、医生资质等问题
|
||
3. 表现出对治疗的关心和一些担忧
|
||
4. 不要暴露自己是AI
|
||
5. 回复长度控制在20-50字以内
|
||
6. 使用口语化的表达
|
||
7. 绝对不要提及任何联系方式,包括:微信、电话、QQ、手机号、邮箱等
|
||
8. 如果对方索要联系方式,委婉拒绝或转移话题"""
|
||
|
||
# 添加医生消息到历史
|
||
self.chat_history.append({
|
||
"role": "user", # 在API调用中,医生消息作为user角色
|
||
"content": f"[医生/客服说]: {doctor_message}"
|
||
})
|
||
|
||
# 构建消息列表
|
||
messages = [{"role": "system", "content": system_prompt}]
|
||
messages.extend(self.chat_history)
|
||
|
||
# 调用API
|
||
headers = {
|
||
"Authorization": f"Bearer {Config.QWEN_API_KEY}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
payload = {
|
||
"model": "qwen-turbo",
|
||
"messages": messages,
|
||
"temperature": 0.8,
|
||
"max_tokens": 150
|
||
}
|
||
|
||
logger.info(f"调用千问API,医生消息: {doctor_message[:50]}...")
|
||
response = requests.post(
|
||
Config.QWEN_API_URL,
|
||
headers=headers,
|
||
json=payload,
|
||
timeout=30
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
reply = result.get('choices', [{}])[0].get('message', {}).get('content', '')
|
||
if reply:
|
||
# 检查回复是否包含敏感词(联系方式相关)
|
||
sensitive_words = ['微信', 'wx', 'WeChat', '电话', '手机', 'QQ', '邮箱', '@', '加我', '联系方式']
|
||
if any(word.lower() in reply.lower() for word in sensitive_words):
|
||
logger.warning(f"AI回复包含敏感词,已过滤: {reply}")
|
||
# 返回一个安全的默认回复
|
||
reply = "好的,我了解了,还想问一下治疗大概需要多长时间呢?"
|
||
|
||
# 添加AI回复到历史
|
||
self.chat_history.append({
|
||
"role": "assistant",
|
||
"content": reply
|
||
})
|
||
logger.info(f"千问API回复: {reply}")
|
||
return reply
|
||
else:
|
||
logger.warning("千问API返回空内容")
|
||
return None
|
||
else:
|
||
logger.error(f"千问API调用失败: {response.status_code} - {response.text}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"调用千问API异常: {str(e)}")
|
||
return None
|
||
|
||
def check_and_click_ad(self, url: str, site_id: int = None) -> Tuple[bool, bool]:
|
||
"""
|
||
检查并点击广告
|
||
|
||
Args:
|
||
url: MIP页面链接
|
||
site_id: 站点ID(用于数据库记录)
|
||
|
||
Returns:
|
||
(是否点击成功, 是否获得回复)
|
||
"""
|
||
self.site_id = site_id
|
||
|
||
# 清空之前的回复数据
|
||
self._clear_reply_data()
|
||
|
||
# 启用API响应监听
|
||
self._setup_response_listener()
|
||
|
||
try:
|
||
# 访问链接(带重试机制)
|
||
max_retries = 2
|
||
page_loaded = False
|
||
self._update_overlay("访问页面", url[:50] + "...")
|
||
for attempt in range(max_retries):
|
||
try:
|
||
logger.info(f"访问链接: {url} (第{attempt+1}次尝试)")
|
||
self.page.goto(url, wait_until='domcontentloaded', timeout=30000)
|
||
page_loaded = True
|
||
break
|
||
except Exception as goto_err:
|
||
if attempt < max_retries - 1:
|
||
logger.warning(f"访问超时,尝试刷新页面...")
|
||
try:
|
||
self.page.reload(wait_until='domcontentloaded', timeout=30000)
|
||
logger.info("✅ 页面刷新成功")
|
||
page_loaded = True
|
||
break
|
||
except:
|
||
logger.warning(f"刷新失败,等待2秒后重试...")
|
||
time.sleep(2)
|
||
else:
|
||
logger.error(f"访问链接失败: {str(goto_err)}")
|
||
# 记录访问失败
|
||
self._record_click_failure(url, f"访问超时: {str(goto_err)}")
|
||
return False, False
|
||
|
||
if not page_loaded:
|
||
self._record_click_failure(url, "页面加载失败")
|
||
return False, False
|
||
|
||
# 等待页面加载
|
||
time.sleep(3)
|
||
|
||
# 检查是否存在商业广告
|
||
self._update_overlay("检测广告", "扫描页面中...")
|
||
has_ad, ad_elements = self._detect_commercial_ad()
|
||
|
||
if not has_ad:
|
||
logger.info("未检测到商业广告,跳过该链接")
|
||
self._update_overlay("未检测到广告", "跳过该链接")
|
||
# 记录无广告
|
||
self._record_click_failure(url, "未检测到商业广告")
|
||
return False, False
|
||
|
||
# 逐个尝试点击广告,直到成功
|
||
self._update_overlay("点击广告", f"检测到 {len(ad_elements)} 个广告")
|
||
logger.info(f"检测到商业广告,准备点击(共 {len(ad_elements)} 个)")
|
||
click_success = False
|
||
|
||
for idx, ad_element in enumerate(ad_elements, 1):
|
||
logger.info(f"尝试点击第 {idx}/{len(ad_elements)} 个广告...")
|
||
self._update_overlay("点击广告", f"尝试第 {idx}/{len(ad_elements)} 个")
|
||
if self._click_advertisement(ad_element):
|
||
logger.info(f"✅ 第 {idx} 个广告点击成功")
|
||
click_success = True
|
||
break
|
||
else:
|
||
logger.warning(f"❌ 第 {idx} 个广告点击失败,尝试下一个...")
|
||
# 等待一下再点下一个
|
||
time.sleep(1)
|
||
|
||
if not click_success:
|
||
logger.warning("所有广告均点击失败")
|
||
self._update_overlay("点击失败", "所有广告均点击失败")
|
||
# 记录点击失败
|
||
self._record_click_failure(url, f"所有广告({len(ad_elements)}个)均点击失败")
|
||
return False, False
|
||
|
||
# 记录点击到数据库
|
||
self._record_click(url)
|
||
|
||
# 等待聊天页面加载
|
||
self._update_overlay("进入聊天", "等待页面加载...")
|
||
logger.info("等待聊天页面加载...")
|
||
time.sleep(3)
|
||
|
||
# 检查是否跳转到非聊天页面
|
||
non_chat_domains = [
|
||
'sp.vejianzhan.com', # 微建站落地页
|
||
# 可以在这里添加更多需要跳过的域名
|
||
]
|
||
current_url = self.page.url.lower()
|
||
for domain in non_chat_domains:
|
||
if domain in current_url:
|
||
logger.info(f"检测到非聊天页面({domain}),判定为点击失败")
|
||
self._update_overlay("非聊天页面", "非聊天页面,跳过")
|
||
self._record_click_failure(url, f"跳转到非聊天页面({domain})")
|
||
return False, False
|
||
|
||
# 直接开始交互(API响应需要页面交互才会触发)
|
||
# 优先尝试点击页面上可见的推荐按钮,没有则发送初始消息
|
||
logger.info("开始首次交互...")
|
||
if not self._try_click_visible_recommend():
|
||
self._send_initial_message()
|
||
|
||
# 执行自动聊天交互(3-5轮,每轮间隔30-90秒)
|
||
self._update_overlay("自动聊天", "开始交互...")
|
||
logger.info("开始自动聊天交互流程...")
|
||
interaction_rounds = self._auto_chat_interaction()
|
||
|
||
# 检查是否收到回复
|
||
has_reply = len(self.doctor_replies) > 0
|
||
|
||
# 记录互动到数据库
|
||
self._record_interaction(has_reply)
|
||
|
||
self._update_overlay("完成", f"交互 {interaction_rounds} 轮,回复: {has_reply}")
|
||
logger.info(f"交互结束,完成 {interaction_rounds} 轮,收到回复: {has_reply}")
|
||
return True, has_reply
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理链接异常: {str(e)}")
|
||
self._update_overlay("异常", str(e)[:30])
|
||
# 记录异常
|
||
try:
|
||
self._record_click_failure(url, f"异常: {str(e)}")
|
||
except:
|
||
pass
|
||
return False, False
|
||
finally:
|
||
# 等待一会再移除浮窗
|
||
time.sleep(2)
|
||
self._remove_overlay()
|
||
# 尝试关闭当前标签页,返回主窗口
|
||
self._close_current_tab()
|
||
|
||
def _detect_commercial_ad(self) -> Tuple[bool, List[ElementHandle]]:
|
||
"""
|
||
检测页面是否存在商业广告
|
||
|
||
Returns:
|
||
(是否存在商业广告, 广告元素列表)
|
||
"""
|
||
try:
|
||
# 等待评论区加载
|
||
time.sleep(2)
|
||
|
||
# 查找包含“广告”标识的元素
|
||
ad_selectors = [
|
||
"//div[contains(@class, 'ad') or contains(@class, 'advertisement')]",
|
||
"//div[contains(text(), '广告')]",
|
||
"//*[contains(text(), '广告')]//ancestor::div[contains(@class, 'card')]",
|
||
"//a[contains(@class, 'ad-link')]",
|
||
]
|
||
|
||
ad_elements = []
|
||
for selector in ad_selectors:
|
||
try:
|
||
elements = self.page.locator(f"xpath={selector}").all()
|
||
if elements:
|
||
# 检查元素是否可见
|
||
for elem in elements:
|
||
if elem.is_visible():
|
||
# 进一步验证是否是商业广告(非AI健康管家)
|
||
try:
|
||
elem_text = elem.inner_text().lower()
|
||
if '广告' in elem_text and 'ai健康' not in elem_text:
|
||
ad_elements.append(elem)
|
||
except:
|
||
continue
|
||
except Exception:
|
||
continue
|
||
|
||
if ad_elements:
|
||
logger.info(f"检测到 {len(ad_elements)} 个商业广告")
|
||
return True, ad_elements
|
||
|
||
logger.info("未检测到商业广告")
|
||
return False, []
|
||
|
||
except Exception as e:
|
||
logger.error(f"检测广告异常: {str(e)}")
|
||
return False, []
|
||
|
||
def _get_ad_info(self, ad_element) -> str:
|
||
"""
|
||
获取广告元素的详细信息
|
||
|
||
Args:
|
||
ad_element: 广告元素
|
||
|
||
Returns:
|
||
广告信息字符串
|
||
"""
|
||
try:
|
||
info_parts = []
|
||
|
||
# 获取广告文本内容
|
||
try:
|
||
text = ad_element.inner_text()
|
||
if text:
|
||
# 清理文本,只取前100字符
|
||
text = text.strip().replace('\n', ' ')[:100]
|
||
info_parts.append(f"文本: {text}")
|
||
except:
|
||
pass
|
||
|
||
# 获取广告链接
|
||
try:
|
||
href = ad_element.get_attribute('href')
|
||
if href:
|
||
info_parts.append(f"链接: {href[:80]}")
|
||
except:
|
||
pass
|
||
|
||
# 获取广告标题
|
||
try:
|
||
title = ad_element.get_attribute('title')
|
||
if title:
|
||
info_parts.append(f"标题: {title}")
|
||
except:
|
||
pass
|
||
|
||
# 尝试获取内部链接
|
||
if not any('链接' in p for p in info_parts):
|
||
try:
|
||
link = ad_element.locator('a').first
|
||
if link:
|
||
href = link.get_attribute('href')
|
||
if href:
|
||
info_parts.append(f"内链: {href[:80]}")
|
||
except:
|
||
pass
|
||
|
||
return ' | '.join(info_parts) if info_parts else "无详细信息"
|
||
|
||
except Exception as e:
|
||
return f"获取信息失败: {str(e)}"
|
||
|
||
def _click_advertisement(self, ad_element: ElementHandle) -> bool:
|
||
"""
|
||
点击广告元素(当前页面导航)
|
||
|
||
Args:
|
||
ad_element: 广告元素
|
||
|
||
Returns:
|
||
是否点击成功
|
||
"""
|
||
try:
|
||
original_url = self.page.url
|
||
|
||
# 获取广告详细信息
|
||
ad_info = self._get_ad_info(ad_element)
|
||
logger.info(f"广告信息: {ad_info}")
|
||
|
||
# 滚动到广告元素可见
|
||
ad_element.scroll_into_view_if_needed()
|
||
time.sleep(1)
|
||
|
||
# 直接点击广告(当前页面导航)
|
||
logger.info("点击广告...")
|
||
ad_element.click()
|
||
logger.info("已点击广告")
|
||
|
||
# 等待页面导航(增加等待时间,支持慢速电脑)
|
||
logger.info("等待页面跳转...")
|
||
max_wait = 10 # 最多等待10秒
|
||
check_interval = 1 # 每秒检查一次
|
||
|
||
for i in range(max_wait):
|
||
time.sleep(check_interval)
|
||
if self.page.url != original_url:
|
||
logger.info(f"✅ 页面已导航(耗时{i+1}秒): {original_url} -> {self.page.url}")
|
||
|
||
# 尝试等待页面加载完成,但不强制要求
|
||
try:
|
||
logger.info("等待页面加载...")
|
||
self.page.wait_for_load_state('domcontentloaded', timeout=10000)
|
||
logger.info("✅ 页面加载完成")
|
||
except Exception as load_err:
|
||
# 页面加载超时不判定为失败,继续执行
|
||
# 因为聊天页面可能已经可用(API响应已经在接收)
|
||
logger.warning(f"⚠️ 页面加载超时,但URL已跳转,继续执行...")
|
||
|
||
break
|
||
else:
|
||
# 循环正常结束(未跳转)
|
||
logger.error(f"❌ 页面URL未变化(等待{max_wait}秒后),广告点击失败: {self.page.url}")
|
||
return False
|
||
|
||
# 等待聊天页面加载
|
||
time.sleep(2)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"点击广告异常: {str(e)}")
|
||
return False
|
||
|
||
def _send_consultation_message(self) -> bool:
|
||
"""
|
||
在聊天页面发送随机咨询消息
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
try:
|
||
logger.info("准备发送咨询消息...")
|
||
|
||
# 随机选择一条消息
|
||
message = random.choice(self.CONSULTATION_MESSAGES)
|
||
logger.info(f"选择的消息: {message}")
|
||
|
||
# 等待页面加载完成
|
||
time.sleep(2)
|
||
|
||
# 打印当前页面URL
|
||
logger.info(f"当前页面: {self.page.url}")
|
||
|
||
# 常见的输入框选择器(优先通过placeholder查找)
|
||
input_selectors = [
|
||
# 优先:通过placeholder查找
|
||
"textarea[placeholder*='消息']",
|
||
"textarea[placeholder*='问题']",
|
||
"input[type='text'][placeholder*='消息']",
|
||
"input[type='text'][placeholder*='问题']",
|
||
"textarea[placeholder*='输入']",
|
||
"textarea[placeholder*='发送']",
|
||
"input[type='text'][placeholder*='输入']",
|
||
"input[type='text'][placeholder*='发送']",
|
||
# 次选:通过class查找
|
||
"textarea[class*='input']",
|
||
# 兜底:通用选择器
|
||
"div[contenteditable='true']",
|
||
"textarea",
|
||
"input[type='text']"
|
||
]
|
||
|
||
input_element = None
|
||
logger.info("开始查找输入框...")
|
||
for selector in input_selectors:
|
||
try:
|
||
elements = self.page.locator(selector).all()
|
||
logger.debug(f"选择器 {selector} 找到 {len(elements)} 个元素")
|
||
for elem in elements:
|
||
if elem.is_visible():
|
||
input_element = elem
|
||
logger.info(f"✅ 找到可见输入框: {selector}")
|
||
break
|
||
if input_element:
|
||
break
|
||
except Exception as e:
|
||
logger.debug(f"选择器 {selector} 失败: {str(e)}")
|
||
continue
|
||
|
||
if not input_element:
|
||
logger.warning("❌ 未找到输入框")
|
||
# 尝试截图便于调试
|
||
try:
|
||
if self.task_folder:
|
||
screenshot_path = self.task_folder / "debug_no_input.png"
|
||
else:
|
||
screenshot_path = Path(f"./logs/debug_no_input_{int(time.time())}.png")
|
||
self.page.screenshot(path=str(screenshot_path))
|
||
logger.info(f"已保存调试截图: {screenshot_path}")
|
||
except Exception as e:
|
||
logger.warning(f"截图失败: {str(e)}")
|
||
|
||
# 兜底方案:尝试查找并点击任何可能的输入区域
|
||
logger.warning("尝试兜底方案:查找所有可能的输入区域...")
|
||
try:
|
||
# 先滚动到页面最底部
|
||
self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
||
time.sleep(1)
|
||
|
||
# 尝试查找所有可能的输入相关元素并点击
|
||
fallback_selectors = [
|
||
"textarea",
|
||
"input[type='text']",
|
||
"div[contenteditable='true']",
|
||
"div[class*='input']",
|
||
"div[class*='textarea']",
|
||
"div[class*='message']",
|
||
"div[class*='chat']",
|
||
"div[id*='input']",
|
||
"div[id*='message']"
|
||
]
|
||
|
||
clicked = False
|
||
for selector in fallback_selectors:
|
||
try:
|
||
elements = self.page.locator(selector).all()
|
||
logger.debug(f"兜底选择器 {selector} 找到 {len(elements)} 个元素")
|
||
for elem in elements:
|
||
if elem.is_visible():
|
||
# 滚动到元素位置
|
||
elem.scroll_into_view_if_needed()
|
||
time.sleep(0.5)
|
||
# 点击元素
|
||
elem.click()
|
||
time.sleep(1)
|
||
logger.info(f"已点击元素: {selector}")
|
||
clicked = True
|
||
break
|
||
if clicked:
|
||
break
|
||
except Exception as e:
|
||
logger.debug(f"兜底选择器 {selector} 失败: {str(e)}")
|
||
continue
|
||
|
||
if clicked:
|
||
# 使用真人模拟输入
|
||
self._human_type(message)
|
||
logger.info("✅ 已输入消息(兜底)")
|
||
|
||
# 直接按回车发送
|
||
self.page.keyboard.press('Enter')
|
||
logger.info("✅ 已按回车键发送(兜底)")
|
||
|
||
# 保存已发送的消息内容
|
||
self.sent_message = message
|
||
time.sleep(2)
|
||
return True
|
||
else:
|
||
logger.error("❌ 兜底方案未找到任何可点击的输入区域")
|
||
return False
|
||
|
||
except Exception as fallback_err:
|
||
logger.error(f"兜底方案失败: {str(fallback_err)}")
|
||
return False
|
||
|
||
# 正常流程:点击输入框获取焦点
|
||
input_element.click()
|
||
time.sleep(0.5)
|
||
|
||
# 使用真人模拟输入
|
||
self._human_type(message)
|
||
logger.info("✅ 已输入消息")
|
||
time.sleep(1)
|
||
|
||
# 尝试发送消息(优先回车,再尝试按钮)
|
||
sent = False
|
||
|
||
# 方法1(优先):按回车键发送
|
||
try:
|
||
logger.info("尝试按回车键发送...")
|
||
input_element.press('Enter')
|
||
logger.info("✅ 已按回车键发送")
|
||
sent = True
|
||
time.sleep(1)
|
||
except Exception as e:
|
||
logger.warning(f"❌ 按回车键失败: {str(e)}")
|
||
|
||
# 方法2(兜底): 尝试找到发送按钮并点击
|
||
if not sent:
|
||
send_button_selectors = [
|
||
"button:has-text('发送')",
|
||
"button[class*='send']",
|
||
"button[type='submit']",
|
||
"div[class*='send']",
|
||
"span:has-text('发送')"
|
||
]
|
||
|
||
logger.info("开始查找发送按钮...")
|
||
for selector in send_button_selectors:
|
||
try:
|
||
buttons = self.page.locator(selector).all()
|
||
logger.debug(f"选择器 {selector} 找到 {len(buttons)} 个按钮")
|
||
for btn in buttons:
|
||
if btn.is_visible() and btn.is_enabled():
|
||
btn.click()
|
||
logger.info(f"✅ 已点击发送按钮: {selector}")
|
||
sent = True
|
||
break
|
||
if sent:
|
||
break
|
||
except Exception as e:
|
||
logger.debug(f"选择器 {selector} 失败: {str(e)}")
|
||
continue
|
||
|
||
if sent:
|
||
logger.info("✅ 消息发送成功")
|
||
# 保存已发送的消息内容
|
||
self.sent_message = message
|
||
time.sleep(2) # 等待消息发送完成
|
||
return True
|
||
else:
|
||
logger.warning("❌ 未能发送消息")
|
||
# 截图调试
|
||
try:
|
||
if self.task_folder:
|
||
screenshot_path = self.task_folder / "debug_send_failed.png"
|
||
else:
|
||
screenshot_path = Path(f"./logs/debug_send_failed_{int(time.time())}.png")
|
||
self.page.screenshot(path=str(screenshot_path))
|
||
logger.info(f"已保存调试截图: {screenshot_path}")
|
||
except:
|
||
pass
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送消息异常: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def _record_click(self, site_url: str):
|
||
"""记录点击到数据库"""
|
||
try:
|
||
if not self.site_id:
|
||
logger.warning("未设置 site_id,跳过点击记录")
|
||
return
|
||
|
||
from db_manager import ClickManager
|
||
click_mgr = ClickManager()
|
||
self.click_id = click_mgr.record_click(
|
||
site_id=self.site_id,
|
||
site_url=site_url,
|
||
user_ip=None, # 可以后续添加代理IP
|
||
device_type='pc'
|
||
)
|
||
logger.info(f"已记录点击: click_id={self.click_id}")
|
||
except Exception as e:
|
||
logger.error(f"记录点击失败: {str(e)}")
|
||
|
||
def _record_click_failure(self, site_url: str, error_message: str):
|
||
"""
|
||
记录点击失败到数据库
|
||
|
||
Args:
|
||
site_url: 站点URL
|
||
error_message: 错误信息
|
||
"""
|
||
try:
|
||
if not self.site_id:
|
||
logger.warning("未设置 site_id,跳过失败记录")
|
||
return
|
||
|
||
from db_manager import ClickManager
|
||
click_mgr = ClickManager()
|
||
# 记录点击(失败也计数)
|
||
self.click_id = click_mgr.record_click(
|
||
site_id=self.site_id,
|
||
site_url=site_url,
|
||
user_ip=None,
|
||
device_type='pc'
|
||
)
|
||
|
||
# 记录互动失败
|
||
from db_manager import InteractionManager
|
||
interaction_mgr = InteractionManager()
|
||
interaction_mgr.record_interaction(
|
||
site_id=self.site_id,
|
||
click_id=self.click_id,
|
||
interaction_type='reply',
|
||
reply_content=None,
|
||
is_successful=False,
|
||
response_received=False,
|
||
error_message=error_message
|
||
)
|
||
logger.info(f"已记录失败: {error_message}")
|
||
except Exception as e:
|
||
logger.error(f"记录失败异常: {str(e)}")
|
||
|
||
def _record_interaction(self, response_received: bool):
|
||
"""记录互动到数据库(包含医生回复内容)"""
|
||
try:
|
||
if not self.site_id:
|
||
logger.warning("未设置 site_id,跳过互动记录")
|
||
return
|
||
|
||
from db_manager import InteractionManager
|
||
interaction_mgr = InteractionManager()
|
||
|
||
# 获取完整聊天记录
|
||
full_chat_log = self._get_full_chat_log()
|
||
if full_chat_log:
|
||
logger.info(f"完整聊天记录 ({len(self.doctor_replies)}条医生消息):")
|
||
logger.debug(full_chat_log[:500])
|
||
|
||
interaction_id = interaction_mgr.record_interaction(
|
||
site_id=self.site_id,
|
||
click_id=self.click_id,
|
||
interaction_type='message', # 符合数据库ENUM定义:reply/comment/message/form_submit/follow/like/share
|
||
reply_content=getattr(self, 'sent_message', None),
|
||
is_successful=True,
|
||
response_received=response_received,
|
||
response_content=full_chat_log # 保存完整聊天记录
|
||
)
|
||
logger.info(f"已记录互动: interaction_id={interaction_id}, response={response_received}")
|
||
|
||
# 记录详细的回复信息到日志
|
||
if self.doctor_replies:
|
||
logger.info(f"本次共收到 {len(self.doctor_replies)} 条医生回复:")
|
||
for idx, reply in enumerate(self.doctor_replies, 1):
|
||
logger.info(f" [{idx}] {reply.get('content', '')[:100]}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"记录互动失败: {str(e)}")
|
||
|
||
def _wait_for_reply(self) -> bool:
|
||
"""
|
||
等待广告主回复(通过监听heartbeat API)
|
||
|
||
Returns:
|
||
是否收到回复
|
||
"""
|
||
try:
|
||
logger.info(f"等待广告主回复(最多{Config.REPLY_WAIT_TIMEOUT}秒)")
|
||
|
||
# 记录等待开始时的回复数量
|
||
initial_reply_count = len(self.doctor_replies)
|
||
|
||
# 等待并检查回复
|
||
start_time = time.time()
|
||
timeout = Config.REPLY_WAIT_TIMEOUT
|
||
check_interval = 2 # 每2秒检查一次
|
||
|
||
while time.time() - start_time < timeout:
|
||
time.sleep(check_interval)
|
||
|
||
# 检查是否有新的医生回复(通过heartbeat API监听获取)
|
||
if len(self.doctor_replies) > initial_reply_count:
|
||
new_replies = self.doctor_replies[initial_reply_count:]
|
||
logger.info(f"收到 {len(new_replies)} 条医生回复")
|
||
for reply in new_replies:
|
||
logger.info(f" - {reply.get('content', '')[:100]}")
|
||
|
||
# 尝试发送推荐回复进行二次互动
|
||
self._try_click_recommend_reply()
|
||
|
||
return True
|
||
|
||
# 打印等待进度
|
||
elapsed = int(time.time() - start_time)
|
||
if elapsed % 10 == 0 and elapsed > 0:
|
||
logger.info(f"等待中... ({elapsed}/{timeout}秒)")
|
||
|
||
logger.info("未收到广告主回复(超时)")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"等待回复异常: {str(e)}")
|
||
return False
|
||
|
||
def _try_click_recommend_reply(self) -> bool:
|
||
"""
|
||
尝试点击推荐回复按钮
|
||
|
||
Returns:
|
||
是否点击成功
|
||
"""
|
||
try:
|
||
# 检查是否有推荐回复
|
||
if not self.recommend_replies:
|
||
logger.info("暂无推荐回复")
|
||
return False
|
||
|
||
# 需要过滤的关键词(电话相关)
|
||
filter_keywords = ['电话', '拨打', '致电', '来电', '通话', '微信', '加微', 'wx', 'WeChat', '满意度', '评价', '好评', '差评']
|
||
|
||
# 获取推荐回复文本列表(过滤电话相关)
|
||
recommend_texts = []
|
||
for recommend in self.recommend_replies:
|
||
text = recommend.get('text', '') or recommend.get('value', '')
|
||
if text:
|
||
# 过滤电话相关
|
||
if any(kw in text for kw in filter_keywords):
|
||
logger.debug(f"跳过电话相关推荐: {text}")
|
||
continue
|
||
recommend_texts.append(text)
|
||
|
||
if not recommend_texts:
|
||
logger.warning("推荐回复内容为空(或全是电话相关)")
|
||
return False
|
||
|
||
logger.info(f"查找推荐回复按钮: {recommend_texts}")
|
||
|
||
# 推荐回复按钮的选择器
|
||
button_selectors = [
|
||
# 常见的推荐回复按钮选择器
|
||
"div[class*='suggest'] button",
|
||
"div[class*='suggest'] div[class*='item']",
|
||
"div[class*='recommend'] button",
|
||
"div[class*='recommend'] div[class*='item']",
|
||
"div[class*='quick'] button",
|
||
"div[class*='quick'] div[class*='reply']",
|
||
"button[class*='suggest']",
|
||
"button[class*='recommend']",
|
||
"div[class*='bubble'] span",
|
||
"div[class*='reply-item']",
|
||
"span[class*='suggest']",
|
||
]
|
||
|
||
# 遍历选择器查找按钮
|
||
for selector in button_selectors:
|
||
try:
|
||
elements = self.page.locator(selector).all()
|
||
for elem in elements:
|
||
if elem.is_visible():
|
||
try:
|
||
elem_text = elem.inner_text().strip()
|
||
# 检查按钮文本是否匹配推荐回复
|
||
for recommend_text in recommend_texts:
|
||
if recommend_text in elem_text or elem_text in recommend_text:
|
||
logger.info(f"找到推荐回复按钮: {elem_text}")
|
||
elem.click()
|
||
logger.info(f"✅ 已点击推荐回复: {elem_text}")
|
||
self.sent_recommend_reply = elem_text
|
||
self.sent_message = f"[推荐回复] {elem_text}"
|
||
if not hasattr(self, 'sent_messages'):
|
||
self.sent_messages = []
|
||
self.sent_messages.append({'role': '我方(推荐回复)', 'content': elem_text})
|
||
time.sleep(1)
|
||
return True
|
||
except:
|
||
continue
|
||
except Exception as e:
|
||
logger.debug(f"选择器 {selector} 失败: {str(e)}")
|
||
continue
|
||
|
||
# 如果没有找到匹配的按钮,尝试通过文本内容直接查找
|
||
logger.info("尝试通过文本内容查找推荐回复按钮...")
|
||
for recommend_text in recommend_texts:
|
||
try:
|
||
# 使用XPath通过文本内容查找
|
||
xpath_selectors = [
|
||
f"//*[contains(text(), '{recommend_text[:10]}')]",
|
||
f"//button[contains(text(), '{recommend_text[:10]}')]",
|
||
f"//span[contains(text(), '{recommend_text[:10]}')]",
|
||
f"//div[contains(text(), '{recommend_text[:10]}')]",
|
||
]
|
||
|
||
for xpath in xpath_selectors:
|
||
try:
|
||
elements = self.page.locator(f"xpath={xpath}").all()
|
||
for elem in elements:
|
||
if elem.is_visible():
|
||
# 检查元素是否可点击(不是整个容器)
|
||
box = elem.bounding_box()
|
||
if box and box['width'] < 500 and box['height'] < 100:
|
||
logger.info(f"找到推荐回复元素: {recommend_text[:20]}")
|
||
elem.click()
|
||
logger.info(f"✅ 已点击推荐回复: {recommend_text}")
|
||
self.sent_recommend_reply = recommend_text
|
||
self.sent_message = f"[推荐回复] {recommend_text}"
|
||
if not hasattr(self, 'sent_messages'):
|
||
self.sent_messages = []
|
||
self.sent_messages.append({'role': '我方(推荐回复)', 'content': recommend_text})
|
||
time.sleep(1)
|
||
return True
|
||
except:
|
||
continue
|
||
except Exception as e:
|
||
logger.debug(f"文本查找失败: {str(e)}")
|
||
continue
|
||
|
||
logger.warning("未找到可点击的推荐回复按钮")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"点击推荐回复异常: {str(e)}")
|
||
return False
|
||
|
||
def _try_click_visible_recommend(self) -> bool:
|
||
"""
|
||
遍历每条带推荐回复的消息,从中随机选一个点击
|
||
|
||
Returns:
|
||
是否点击成功
|
||
"""
|
||
try:
|
||
# 需要过滤的关键词
|
||
filter_keywords = ['电话', '拨打', '致电', '来电', '通话', '微信', '加微', 'wx', 'WeChat', '满意度', '评价', '好评', '差评']
|
||
|
||
# 查找所有推荐回复组(每组对应一条消息的推荐)
|
||
recommend_group_selectors = [
|
||
"div.gt-jmy-h5-c-msg-tag",
|
||
"div[class*='msg-tag']",
|
||
"div[class*='suggest-reply']",
|
||
"div[class*='quick-reply']",
|
||
"div[class*='recommend-reply']",
|
||
]
|
||
|
||
clicked_count = 0
|
||
|
||
for group_selector in recommend_group_selectors:
|
||
try:
|
||
groups = self.page.locator(group_selector).all()
|
||
for group in groups:
|
||
if not group.is_visible():
|
||
continue
|
||
|
||
# 获取该组内的所有推荐选项
|
||
options = group.locator("span.content-text").all()
|
||
if not options:
|
||
options = group.locator("span").all()
|
||
if not options:
|
||
options = group.locator("button").all()
|
||
if not options:
|
||
options = group.locator("div[class*='item']").all()
|
||
|
||
# 收集该组内可用的选项
|
||
available_options = []
|
||
for opt in options:
|
||
try:
|
||
if opt.is_visible():
|
||
text = opt.inner_text().strip()
|
||
# 过滤敏感词
|
||
if any(kw in text for kw in filter_keywords):
|
||
continue
|
||
if text and len(text) < 30:
|
||
available_options.append({'elem': opt, 'text': text})
|
||
except:
|
||
continue
|
||
|
||
# 从该组中随机选一个点击
|
||
if available_options:
|
||
selected = random.choice(available_options)
|
||
logger.info(f"推荐选项: {[o['text'] for o in available_options]},选择: {selected['text']}")
|
||
self._human_click(selected['elem'], f"推荐回复: {selected['text']}")
|
||
logger.info(f"✅ 已点击推荐: {selected['text']}")
|
||
self.sent_recommend_reply = selected['text']
|
||
self.sent_message = f"[推荐回复] {selected['text']}" # 记录到sent_message
|
||
if not hasattr(self, 'sent_messages'):
|
||
self.sent_messages = []
|
||
self.sent_messages.append({'role': '我方(推荐回复)', 'content': selected['text']})
|
||
clicked_count += 1
|
||
# 随机延迟2-5秒,模拟真人操作
|
||
delay = random.uniform(2, 5)
|
||
time.sleep(delay)
|
||
except:
|
||
continue
|
||
|
||
if clicked_count > 0:
|
||
logger.info(f"共点击 {clicked_count} 个推荐回复")
|
||
return True
|
||
|
||
# 兜底:使用通用选择器查找
|
||
button_selectors = [
|
||
"div[class*='suggest'] span",
|
||
"div[class*='recommend'] span",
|
||
"div[class*='quick-reply'] span",
|
||
"button[class*='suggest']",
|
||
"div[class*='reply-item']",
|
||
"span[class*='reply']",
|
||
]
|
||
|
||
available_buttons = []
|
||
for selector in button_selectors:
|
||
try:
|
||
elements = self.page.locator(selector).all()
|
||
for elem in elements:
|
||
if elem.is_visible():
|
||
try:
|
||
text = elem.inner_text().strip()
|
||
if any(kw in text for kw in filter_keywords):
|
||
continue
|
||
if text and len(text) < 30:
|
||
box = elem.bounding_box()
|
||
if box and box['width'] < 200 and box['height'] < 60:
|
||
if not any(b['text'] == text for b in available_buttons):
|
||
available_buttons.append({'elem': elem, 'text': text})
|
||
except:
|
||
continue
|
||
except:
|
||
continue
|
||
|
||
if available_buttons:
|
||
selected = random.choice(available_buttons)
|
||
logger.info(f"找到 {len(available_buttons)} 个推荐按钮,选择: {selected['text']}")
|
||
# 随机延迟1-3秒后点击
|
||
time.sleep(random.uniform(1, 3))
|
||
self._human_click(selected['elem'], f"推荐按钮: {selected['text']}")
|
||
logger.info(f"✅ 已点击推荐按钮: {selected['text']}")
|
||
self.sent_recommend_reply = selected['text']
|
||
if not hasattr(self, 'sent_messages'):
|
||
self.sent_messages = []
|
||
self.sent_messages.append({'role': '我方(推荐回复)', 'content': selected['text']})
|
||
# 点击后随机延迟2-4秒
|
||
time.sleep(random.uniform(2, 4))
|
||
return True
|
||
|
||
logger.debug("未找到可见的推荐按钮")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"查找推荐按钮异常: {str(e)}")
|
||
return False
|
||
|
||
def _count_dom_recommend_buttons(self) -> int:
|
||
"""
|
||
统计页面DOM中的推荐按钮数量
|
||
|
||
Returns:
|
||
推荐按钮数量
|
||
"""
|
||
try:
|
||
count = 0
|
||
# 基于实际页面结构的选择器
|
||
selectors = [
|
||
"div.gt-jmy-h5-c-msg-tag span.content-text",
|
||
"div[class*='msg-tag'] span.content-text",
|
||
]
|
||
|
||
for selector in selectors:
|
||
try:
|
||
elements = self.page.locator(selector).all()
|
||
for elem in elements:
|
||
if elem.is_visible():
|
||
count += 1
|
||
except:
|
||
continue
|
||
|
||
return count
|
||
except:
|
||
return 0
|
||
|
||
def _get_latest_doctor_message_from_dom(self) -> Optional[str]:
|
||
"""
|
||
从DOM获取最新的医生消息,并添加到doctor_replies
|
||
|
||
Returns:
|
||
最新医生消息内容
|
||
"""
|
||
try:
|
||
# 基于实际页面结构的选择器
|
||
selectors = [
|
||
"div.msg-container-normal div.mip-sjh-text",
|
||
"div[class*='msg-container'] div.mip-sjh-text",
|
||
"div[class*='bot-msg'] div.mip-sjh-text",
|
||
]
|
||
|
||
for selector in selectors:
|
||
try:
|
||
elements = self.page.locator(selector).all()
|
||
if elements:
|
||
# 获取最后一个(最新的)消息
|
||
last_elem = elements[-1]
|
||
if last_elem.is_visible():
|
||
text = last_elem.inner_text().strip()
|
||
if text:
|
||
# 检查是否已存在(避免重复)
|
||
if not any(r.get('content') == text for r in self.doctor_replies):
|
||
self.doctor_replies.append({
|
||
'content': text,
|
||
'messageId': f'dom_{len(self.doctor_replies)}',
|
||
'source': 'dom',
|
||
'received_at': datetime.now().isoformat()
|
||
})
|
||
logger.debug(f"[DOM] 添加医生消息: {text[:50]}...")
|
||
return text
|
||
except:
|
||
continue
|
||
|
||
return None
|
||
except:
|
||
return None
|
||
|
||
def _send_initial_message(self):
|
||
"""发送初始咨询消息"""
|
||
initial_message = random.choice(self.CONSULTATION_MESSAGES)
|
||
if self._send_message_to_chat(initial_message):
|
||
logger.info(f"✅ 已发送初始消息: {initial_message}")
|
||
self.sent_message = initial_message
|
||
else:
|
||
logger.warning("初始消息发送失败")
|
||
|
||
def _send_message_to_chat(self, message: str) -> bool:
|
||
"""
|
||
在聊天页面发送消息
|
||
|
||
Args:
|
||
message: 要发送的消息
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
try:
|
||
# 查找输入框(多种选择器,按优先级排列)
|
||
input_selectors = [
|
||
# 基于实际页面结构(自定义输入框组件)
|
||
"div.gt-jmy-h5-bot-text-input",
|
||
"div.text-input",
|
||
"div.input-area",
|
||
"div.fake-input",
|
||
# 基于class名称
|
||
"textarea.chat-input",
|
||
"textarea[class*='input']",
|
||
"textarea[class*='textarea']",
|
||
"div[class*='input'] textarea",
|
||
"div[class*='chat'] textarea",
|
||
# 基于placeholder
|
||
"textarea[placeholder*='消息']",
|
||
"textarea[placeholder*='问题']",
|
||
"textarea[placeholder*='输入']",
|
||
"textarea[placeholder*='说点']",
|
||
"textarea[placeholder*='描述']",
|
||
"input[type='text'][placeholder*='消息']",
|
||
"input[type='text'][placeholder*='输入']",
|
||
# 基于contenteditable
|
||
"div[contenteditable='true']",
|
||
# 通用兜底
|
||
"textarea",
|
||
"input[type='text']"
|
||
]
|
||
|
||
# 最多重试3次
|
||
for retry in range(3):
|
||
input_element = None
|
||
is_custom_input = False # 标记是否是自定义输入框
|
||
|
||
for selector in input_selectors:
|
||
try:
|
||
elements = self.page.locator(selector).all()
|
||
for elem in elements:
|
||
if elem.is_visible():
|
||
box = elem.bounding_box()
|
||
if box and box['height'] > 20:
|
||
input_element = elem
|
||
# 检查是否是自定义输入框
|
||
if 'gt-jmy' in selector or 'fake-input' in selector or 'text-input' in selector:
|
||
is_custom_input = True
|
||
logger.debug(f"找到输入框: {selector}, 自定义: {is_custom_input}")
|
||
break
|
||
if input_element:
|
||
break
|
||
except:
|
||
continue
|
||
|
||
if input_element:
|
||
break
|
||
|
||
# 没找到,等待后重试
|
||
if retry < 2:
|
||
logger.info(f"未找到输入框,等待2秒后重试... ({retry+1}/3)")
|
||
time.sleep(2)
|
||
try:
|
||
self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
||
except:
|
||
pass
|
||
|
||
# 兜底方案:点击页面底部中心位置激活输入框
|
||
if not input_element:
|
||
logger.info("尝试兜底方案:点击页面底部中心位置...")
|
||
try:
|
||
# 获取页面尺寸
|
||
viewport = self.page.viewport_size
|
||
if viewport:
|
||
# 点击底部中心上方一点的位置(大约底部往上100px)
|
||
click_x = viewport['width'] // 2
|
||
click_y = viewport['height'] - 100
|
||
self.page.mouse.click(click_x, click_y)
|
||
logger.info(f"点击位置: ({click_x}, {click_y})")
|
||
time.sleep(0.5)
|
||
|
||
# 使用真人模拟输入
|
||
self._human_type(message)
|
||
time.sleep(0.3)
|
||
self.page.keyboard.press('Enter')
|
||
self.sent_message = message # 记录发送内容
|
||
if not hasattr(self, 'sent_messages'):
|
||
self.sent_messages = []
|
||
self.sent_messages.append({'role': '我方', 'content': message})
|
||
logger.info(f"✅ 已发送消息(兜底方案): {message[:50]}...")
|
||
time.sleep(1)
|
||
return True
|
||
except Exception as e:
|
||
logger.warning(f"兜底方案失败: {str(e)}")
|
||
|
||
logger.warning("所有方案均失败")
|
||
return False
|
||
|
||
# 点击输入框获取焦点
|
||
input_element.click()
|
||
time.sleep(0.5)
|
||
|
||
# 使用真人模拟输入
|
||
logger.debug("使用真人模拟输入...")
|
||
self._human_type(message)
|
||
|
||
time.sleep(0.5)
|
||
|
||
# 发送消息:先尝试点击发送按钮,再尝试按回车
|
||
sent = False
|
||
|
||
# 方法1:点击发送按钮
|
||
send_btn_selectors = [
|
||
"div.send-btn",
|
||
"div.icon.send-btn",
|
||
"button.send-btn",
|
||
"span.send-btn",
|
||
"div[class*='send']",
|
||
"button[class*='send']",
|
||
]
|
||
|
||
for btn_selector in send_btn_selectors:
|
||
try:
|
||
btn = self.page.locator(btn_selector).first
|
||
if btn and btn.is_visible():
|
||
btn.click()
|
||
logger.debug(f"点击发送按钮: {btn_selector}")
|
||
sent = True
|
||
break
|
||
except:
|
||
continue
|
||
|
||
# 方法2:按回车键
|
||
if not sent:
|
||
try:
|
||
self.page.keyboard.press('Enter')
|
||
sent = True
|
||
except:
|
||
pass
|
||
|
||
if sent:
|
||
self.sent_message = message # 记录发送内容
|
||
if not hasattr(self, 'sent_messages'):
|
||
self.sent_messages = []
|
||
self.sent_messages.append({'role': '我方', 'content': message})
|
||
logger.info(f"✅ 已发送消息: {message[:50]}...")
|
||
time.sleep(1)
|
||
return True
|
||
else:
|
||
logger.warning("发送消息失败")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送消息异常: {str(e)}")
|
||
return False
|
||
|
||
def _wait_for_new_doctor_reply(self, timeout: int = 60) -> Optional[str]:
|
||
"""
|
||
等待新的医生回复
|
||
|
||
Args:
|
||
timeout: 等待超时时间(秒)
|
||
|
||
Returns:
|
||
新的医生回复内容,超时返回None
|
||
"""
|
||
try:
|
||
initial_count = len(self.doctor_replies)
|
||
start_time = time.time()
|
||
|
||
while time.time() - start_time < timeout:
|
||
time.sleep(2)
|
||
|
||
# 检查是否有新回复
|
||
if len(self.doctor_replies) > initial_count:
|
||
# 获取最新的回复
|
||
new_reply = self.doctor_replies[-1]
|
||
content = new_reply.get('content', '')
|
||
if content:
|
||
logger.info(f"收到新医生回复: {content[:50]}...")
|
||
return content
|
||
|
||
# 打印等待进度
|
||
elapsed = int(time.time() - start_time)
|
||
if elapsed % 15 == 0 and elapsed > 0:
|
||
logger.info(f"等待医生回复... ({elapsed}/{timeout}秒)")
|
||
|
||
logger.info("等待医生回复超时")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"等待医生回复异常: {str(e)}")
|
||
return None
|
||
|
||
def _auto_chat_interaction(self) -> int:
|
||
"""
|
||
自动聊天交互(3-5轮)
|
||
|
||
流程:
|
||
1. 持续监控API响应,收到推荐回复立即点击
|
||
2. 如果没有推荐回复但有医生回复,使用AI生成回复
|
||
3. 重复3-5轮,每轮间隔30-90秒
|
||
|
||
Returns:
|
||
实际完成的交互轮数
|
||
"""
|
||
try:
|
||
# 随机决定交互轮数(3-5轮)
|
||
target_rounds = random.randint(3, 5)
|
||
completed_rounds = 0
|
||
|
||
logger.info(f"开始自动聊天交互,目标轮数: {target_rounds}")
|
||
|
||
for round_num in range(1, target_rounds + 1):
|
||
logger.info(f"=== 第 {round_num}/{target_rounds} 轮交互 ===")
|
||
self._update_overlay("聊天交互", f"第 {round_num}/{target_rounds} 轮")
|
||
|
||
# 记录本轮开始时的状态
|
||
initial_recommend_count = len(self.recommend_replies)
|
||
initial_doctor_count = len(self.doctor_replies)
|
||
initial_dom_button_count = self._count_dom_recommend_buttons()
|
||
round_completed = False
|
||
last_doctor_reply = None
|
||
no_recommend_count = 0 # 连续找不到推荐按钮的次数
|
||
|
||
# 持续监控,最多等待60秒
|
||
start_time = time.time()
|
||
timeout = 60
|
||
|
||
while time.time() - start_time < timeout and not round_completed:
|
||
time.sleep(1) # 每秒检查一次
|
||
|
||
# 每次循环都尝试点击可见的推荐按钮(遍历所有消息)
|
||
clicked = self._try_click_visible_recommend()
|
||
if clicked:
|
||
logger.info(f"✅ 第 {round_num} 轮点击了推荐回复")
|
||
completed_rounds += 1
|
||
round_completed = True
|
||
no_recommend_count = 0
|
||
# 更新计数
|
||
initial_recommend_count = len(self.recommend_replies)
|
||
initial_dom_button_count = self._count_dom_recommend_buttons()
|
||
continue
|
||
else:
|
||
no_recommend_count += 1
|
||
|
||
# 检查API是否有新的推荐回复
|
||
if len(self.recommend_replies) > initial_recommend_count:
|
||
logger.info("[API] 检测到新推荐回复...")
|
||
initial_recommend_count = len(self.recommend_replies)
|
||
no_recommend_count = 0
|
||
# 尝试点击
|
||
if self._try_click_visible_recommend() or self._try_click_recommend_reply():
|
||
logger.info(f"✅ 第 {round_num} 轮使用推荐回复完成")
|
||
completed_rounds += 1
|
||
round_completed = True
|
||
|
||
# 检查DOM是否有新的推荐按钮
|
||
if not round_completed:
|
||
current_dom_count = self._count_dom_recommend_buttons()
|
||
if current_dom_count > initial_dom_button_count:
|
||
logger.info(f"[DOM] 检测到新推荐按钮 ({initial_dom_button_count} -> {current_dom_count})...")
|
||
initial_dom_button_count = current_dom_count
|
||
no_recommend_count = 0
|
||
if self._try_click_visible_recommend():
|
||
logger.info(f"✅ 第 {round_num} 轮通过DOM点击推荐完成")
|
||
completed_rounds += 1
|
||
round_completed = True
|
||
|
||
# 记录医生回复(用于AI生成)
|
||
if len(self.doctor_replies) > initial_doctor_count:
|
||
last_doctor_reply = self.doctor_replies[-1].get('content', '')
|
||
initial_doctor_count = len(self.doctor_replies)
|
||
|
||
if not last_doctor_reply:
|
||
dom_doctor_msg = self._get_latest_doctor_message_from_dom()
|
||
if dom_doctor_msg:
|
||
last_doctor_reply = dom_doctor_msg
|
||
|
||
# 如果收到医生回复但连续10秒找不到推荐按钮,提前使用AI
|
||
if not round_completed and last_doctor_reply and no_recommend_count >= 10:
|
||
logger.info(f"已收到医生回复但连续{no_recommend_count}秒无推荐按钮,使用AI回复...")
|
||
ai_reply = self._call_qwen_api(last_doctor_reply)
|
||
if ai_reply and self._send_message_to_chat(ai_reply):
|
||
logger.info(f"✅ 第 {round_num} 轮使用AI回复完成")
|
||
completed_rounds += 1
|
||
round_completed = True
|
||
else:
|
||
logger.warning("AI回复发送失败,继续等待推荐按钮...")
|
||
no_recommend_count = 0 # 重置计数,继续尝试
|
||
|
||
# 如果连续25秒没有任何响应(无推荐按钮也无医生回复),主动发消息
|
||
if not round_completed and not last_doctor_reply and no_recommend_count >= 25:
|
||
# 尝试获取历史医生消息
|
||
history_doctor_msg = None
|
||
if self.doctor_replies:
|
||
history_doctor_msg = self.doctor_replies[-1].get('content', '')
|
||
if not history_doctor_msg:
|
||
history_doctor_msg = self._get_latest_doctor_message_from_dom()
|
||
|
||
if history_doctor_msg:
|
||
logger.info(f"连续{no_recommend_count}秒无新响应,使用历史消息生成AI回复...")
|
||
ai_reply = self._call_qwen_api(history_doctor_msg)
|
||
if ai_reply and self._send_message_to_chat(ai_reply):
|
||
logger.info(f"✅ 第 {round_num} 轮使用AI回复历史消息完成")
|
||
completed_rounds += 1
|
||
round_completed = True
|
||
else:
|
||
no_recommend_count = 0
|
||
else:
|
||
# 完全没有历史消息,发送激活消息
|
||
logger.info(f"连续{no_recommend_count}秒无响应且无历史消息,发送激活消息...")
|
||
fallback_messages = [
|
||
"您好,请问还在吗?",
|
||
"想咨询一下具体情况",
|
||
"请问医生什么时候有空呢?",
|
||
"我想了解一下治疗方案"
|
||
]
|
||
fallback_msg = random.choice(fallback_messages)
|
||
if self._send_message_to_chat(fallback_msg):
|
||
logger.info(f"✅ 第 {round_num} 轮发送激活消息: {fallback_msg}")
|
||
completed_rounds += 1
|
||
round_completed = True
|
||
else:
|
||
no_recommend_count = 0
|
||
|
||
# 打印等待进度
|
||
elapsed = int(time.time() - start_time)
|
||
if elapsed % 15 == 0 and elapsed > 0:
|
||
logger.info(f"等待中... ({elapsed}/{timeout}秒)")
|
||
|
||
# 如果本轮没有通过推荐回复完成,尝试使用AI
|
||
if not round_completed:
|
||
if last_doctor_reply:
|
||
logger.info("无可用推荐回复,使用千问AI生成回复...")
|
||
ai_reply = self._call_qwen_api(last_doctor_reply)
|
||
if ai_reply and self._send_message_to_chat(ai_reply):
|
||
logger.info(f"✅ 第 {round_num} 轮使用AI回复完成")
|
||
completed_rounds += 1
|
||
round_completed = True
|
||
else:
|
||
logger.warning(f"第 {round_num} 轮AI回复发送失败,尝试点击推荐按钮...")
|
||
# 再次尝试点击推荐按钮
|
||
if self._try_click_visible_recommend():
|
||
logger.info(f"✅ 第 {round_num} 轮通过点击推荐按钮完成")
|
||
completed_rounds += 1
|
||
round_completed = True
|
||
else:
|
||
logger.warning(f"第 {round_num} 轮失败,继续下一轮...")
|
||
else:
|
||
# 当前轮没有收到新回复,尝试用历史消息生成回复
|
||
logger.warning(f"第 {round_num} 轮未收到新回复,尝试使用历史消息...")
|
||
|
||
# 先尝试点击推荐按钮
|
||
if self._try_click_visible_recommend():
|
||
logger.info(f"✅ 第 {round_num} 轮通过点击推荐按钮完成")
|
||
completed_rounds += 1
|
||
round_completed = True
|
||
else:
|
||
# 尝试获取历史医生消息
|
||
history_doctor_msg = None
|
||
if self.doctor_replies:
|
||
history_doctor_msg = self.doctor_replies[-1].get('content', '')
|
||
if not history_doctor_msg:
|
||
history_doctor_msg = self._get_latest_doctor_message_from_dom()
|
||
|
||
if history_doctor_msg:
|
||
logger.info("使用历史消息生成AI回复...")
|
||
ai_reply = self._call_qwen_api(history_doctor_msg)
|
||
if ai_reply and self._send_message_to_chat(ai_reply):
|
||
logger.info(f"✅ 第 {round_num} 轮使用AI回复历史消息完成")
|
||
completed_rounds += 1
|
||
round_completed = True
|
||
else:
|
||
# 完全没有历史消息,发送激活消息
|
||
fallback_messages = [
|
||
"您好,请问还在吗?",
|
||
"想咨询一下具体情况",
|
||
"请问医生什么时候有空呢?",
|
||
"我想了解一下治疗方案"
|
||
]
|
||
fallback_msg = random.choice(fallback_messages)
|
||
if self._send_message_to_chat(fallback_msg):
|
||
logger.info(f"✅ 第 {round_num} 轮发送激活消息: {fallback_msg}")
|
||
completed_rounds += 1
|
||
round_completed = True
|
||
else:
|
||
logger.warning(f"第 {round_num} 轮发送消息失败")
|
||
|
||
# 如果还有下一轮,随机等待30-90秒
|
||
if round_num < target_rounds:
|
||
wait_seconds = random.randint(30, 90)
|
||
logger.info(f"等待 {wait_seconds} 秒后进行下一轮...")
|
||
time.sleep(wait_seconds)
|
||
|
||
logger.info(f"自动聊天交互完成,共 {completed_rounds}/{target_rounds} 轮")
|
||
return completed_rounds
|
||
|
||
except Exception as e:
|
||
logger.error(f"自动聊天交互异常: {str(e)}")
|
||
return completed_rounds if 'completed_rounds' in locals() else 0
|
||
|
||
def _get_doctor_reply_content(self) -> Optional[str]:
|
||
"""获取医生回复内容(合并所有回复)"""
|
||
if not self.doctor_replies:
|
||
return None
|
||
|
||
# 合并所有回复内容
|
||
contents = [r.get('content', '') for r in self.doctor_replies if r.get('content')]
|
||
return '\n'.join(contents) if contents else None
|
||
|
||
def _get_full_chat_log(self) -> Optional[str]:
|
||
"""
|
||
获取完整聊天记录(格式化为文本)
|
||
|
||
数据来源:
|
||
1. doctor_replies - API监听/DOM采集的医生消息
|
||
2. chat_history - 与千问API的对话历史(user=医生, assistant=我方AI回复)
|
||
3. sent_messages - 所有发送的消息(包括推荐回复和AI回复)
|
||
4. sent_recommend_reply - 点击的推荐回复(兼容旧代码)
|
||
5. sent_message - 最后发送的消息(兼容旧代码)
|
||
|
||
Returns:
|
||
格式化的聊天记录文本
|
||
"""
|
||
try:
|
||
chat_lines = []
|
||
seen_contents = set() # 用于去重
|
||
|
||
# 1. 从 chat_history 中提取对话(包含医生消息和AI回复)
|
||
for i, msg in enumerate(self.chat_history):
|
||
role = msg.get('role', '')
|
||
content = msg.get('content', '')
|
||
|
||
if not content:
|
||
continue
|
||
|
||
if role == 'user':
|
||
# 医生消息,去掉前缀 "[医生/客服说]: "
|
||
if content.startswith('[医生/客服说]:'):
|
||
content = content.replace('[医生/客服说]:', '').strip()
|
||
elif content.startswith('[医生/客服说]'):
|
||
content = content.replace('[医生/客服说]', '').strip()
|
||
|
||
if content and content not in seen_contents:
|
||
chat_lines.append(f"[医生] {content}")
|
||
seen_contents.add(content)
|
||
|
||
elif role == 'assistant':
|
||
# 我方AI回复
|
||
if content and content not in seen_contents:
|
||
chat_lines.append(f"[我方(AI)] {content}")
|
||
seen_contents.add(content)
|
||
|
||
# 2. 补充 doctor_replies 中的消息(可能有些消息没进入 chat_history)
|
||
for reply in self.doctor_replies:
|
||
content = reply.get('content', '')
|
||
if content and content not in seen_contents:
|
||
chat_lines.append(f"[医生] {content}")
|
||
seen_contents.add(content)
|
||
|
||
# 3. 从 sent_messages 获取所有发送的消息
|
||
if hasattr(self, 'sent_messages') and self.sent_messages:
|
||
for msg in self.sent_messages:
|
||
content = msg.get('content', '')
|
||
role = msg.get('role', '我方')
|
||
if content and content not in seen_contents:
|
||
chat_lines.append(f"[{role}] {content}")
|
||
seen_contents.add(content)
|
||
|
||
# 4. 添加发送的推荐回复(兼容旧代码)
|
||
if hasattr(self, 'sent_recommend_reply') and self.sent_recommend_reply:
|
||
content = self.sent_recommend_reply
|
||
if content not in seen_contents:
|
||
chat_lines.append(f"[我方(推荐回复)] {content}")
|
||
seen_contents.add(content)
|
||
|
||
# 5. 添加最后发送的消息(兼容旧代码)
|
||
if hasattr(self, 'sent_message') and self.sent_message:
|
||
content = self.sent_message
|
||
if not content.startswith('[推荐回复]') and content not in seen_contents:
|
||
chat_lines.append(f"[我方] {content}")
|
||
seen_contents.add(content)
|
||
|
||
result = '\n'.join(chat_lines) if chat_lines else None
|
||
|
||
# 调试日志
|
||
logger.info(f"聊天记录汇总: chat_history={len(self.chat_history)}条, doctor_replies={len(self.doctor_replies)}条, sent_messages={len(getattr(self, 'sent_messages', []))}条, 输出={len(chat_lines)}行")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取完整聊天记录失败: {str(e)}")
|
||
return None
|
||
|
||
def _count_messages(self) -> int:
|
||
"""
|
||
统计当前页面的消息数量
|
||
|
||
Returns:
|
||
消息数量
|
||
"""
|
||
try:
|
||
# 根据实际页面结构调整选择器
|
||
# 这里是示例选择器,需要根据实际情况修改
|
||
message_selectors = [
|
||
"//div[contains(@class, 'message')]",
|
||
"//div[contains(@class, 'chat-message')]",
|
||
"//div[contains(@class, 'msg-item')]",
|
||
]
|
||
|
||
for selector in message_selectors:
|
||
try:
|
||
messages = self.page.locator(f"xpath={selector}").all()
|
||
if messages:
|
||
return len(messages)
|
||
except:
|
||
continue
|
||
|
||
return 0
|
||
|
||
except Exception as e:
|
||
logger.error(f"统计消息数量异常: {str(e)}")
|
||
return 0
|
||
|
||
def _close_current_tab(self):
|
||
"""关闭当前标签页并返回主窗口"""
|
||
try:
|
||
pages = self.page.context.pages
|
||
if len(pages) > 1:
|
||
self.page.close()
|
||
self.page = pages[0]
|
||
logger.info("已关闭广告页面")
|
||
except Exception as e:
|
||
logger.error(f"关闭标签页异常: {str(e)}")
|
||
|
||
def random_delay(self, min_seconds: int = 2, max_seconds: int = 5):
|
||
"""随机延迟,模拟人工操作"""
|
||
delay = random.uniform(min_seconds, max_seconds)
|
||
time.sleep(delay)
|