Files
ai_mip/ad_automation.py

1941 lines
85 KiB
Python
Raw Permalink Normal View History

2026-01-13 18:59:26 +08:00
import time
import random
2026-02-24 12:46:35 +08:00
import json
import re
import requests
from typing import Optional, Tuple, List, Dict
from playwright.sync_api import Page, ElementHandle, Response
2026-01-13 18:59:26 +08:00
from loguru import logger
from config import Config
2026-01-16 22:06:46 +08:00
from pathlib import Path
from datetime import datetime
2026-01-13 18:59:26 +08:00
class MIPAdAutomation:
"""MIP页面广告自动化操作"""
2026-01-16 22:06:46 +08:00
# 预设的咨询语句
CONSULTATION_MESSAGES = [
"我想要预约一个医生,有什么推荐吗?",
"我现在本人不在当地,医生什么时候有空,是随时能去吗?有没有推荐的医生。",
"咱们医院是周六日是否上班,随时去吗?",
"想找医生看看,有没有推荐的区生",
"最近很不舒服,也说不出来全部的症状,能不能直接对话医生?"
]
2026-02-24 12:46:35 +08:00
# Ada平台API端点
ADA_HEARTBEAT_API = 'ada.baidu.com/gateway/message/heartbeat'
ADA_RECOMMEND_API = 'ada.baidu.com/imlp-extend/agent/getRecommendContent'
2026-01-16 22:06:46 +08:00
def __init__(self, page: Page, task_index: int = None):
2026-01-13 18:59:26 +08:00
self.page = page
2026-01-16 22:06:46 +08:00
self.site_id = None # 当前站点ID
self.click_id = None # 当前点击ID
self.task_folder = None # 任务日志目录
2026-02-24 12:46:35 +08:00
# 医生回复相关
self.doctor_replies: List[Dict] = [] # 存储医生回复
self.recommend_replies: List[Dict] = [] # 存储推荐回复
self._response_listener_active = False # 响应监听器状态
# 聊天历史用于AI对话上下文
self.chat_history: List[Dict] = []
# 浮窗状态
self._overlay_injected = False
self.task_index = task_index
2026-01-16 22:06:46 +08:00
# 创建任务日志目录
if task_index:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.task_folder = Path("./test") / f"task_{task_index}_{timestamp}"
self.task_folder.mkdir(parents=True, exist_ok=True)
logger.info(f"任务日志目录: {self.task_folder}")
2026-02-24 12:46:35 +08:00
def _inject_overlay(self):
"""注入浮窗到页面"""
if self._overlay_injected:
return
try:
js_code = """
(function() {
// 检查是否已存在
if (document.getElementById('mip-progress-overlay')) return;
// 创建浮窗容器
var overlay = document.createElement('div');
overlay.id = 'mip-progress-overlay';
overlay.style.cssText = `
position: fixed;
top: 10px;
right: 10px;
width: 280px;
background: rgba(0, 0, 0, 0.85);
color: #fff;
padding: 12px 15px;
border-radius: 8px;
font-family: 'Microsoft YaHei', sans-serif;
font-size: 13px;
z-index: 999999;
box-shadow: 0 4px 12px rgba(0,0,0,0.3);
line-height: 1.6;
`;
// 标题
var title = document.createElement('div');
title.style.cssText = 'font-weight: bold; font-size: 14px; margin-bottom: 8px; color: #4CAF50; border-bottom: 1px solid #444; padding-bottom: 6px;';
title.innerHTML = '🤖 自动化进度';
overlay.appendChild(title);
// 状态内容
var content = document.createElement('div');
content.id = 'mip-progress-content';
content.innerHTML = '初始化中...';
overlay.appendChild(content);
document.body.appendChild(overlay);
})();
"""
self.page.evaluate(js_code)
self._overlay_injected = True
logger.debug("浮窗已注入页面")
except Exception as e:
logger.debug(f"注入浮窗失败: {str(e)}")
def _update_overlay(self, status: str, details: str = ""):
"""
更新浮窗显示内容
Args:
status: 当前状态
details: 详细信息
"""
try:
self._inject_overlay()
task_info = f"任务 #{self.task_index}" if self.task_index else "任务"
doctor_count = len(self.doctor_replies)
recommend_count = len(self.recommend_replies)
html = f"""
<div style='margin-bottom: 6px;'><b>{task_info}</b></div>
<div style='color: #4CAF50;'>📍 {status}</div>
<div style='color: #aaa; font-size: 12px; margin-top: 4px;'>{details}</div>
<div style='margin-top: 8px; padding-top: 6px; border-top: 1px solid #444; font-size: 11px; color: #888;'>
医生消息: {doctor_count} | 推荐回复: {recommend_count}
</div>
"""
js_code = f"""
(function() {{
var content = document.getElementById('mip-progress-content');
if (content) {{
content.innerHTML = `{html}`;
}}
}})();
"""
self.page.evaluate(js_code)
except Exception as e:
logger.debug(f"更新浮窗失败: {str(e)}")
def _remove_overlay(self):
"""移除浮窗"""
try:
js_code = """
(function() {
var overlay = document.getElementById('mip-progress-overlay');
if (overlay) overlay.remove();
})();
"""
self.page.evaluate(js_code)
self._overlay_injected = False
except:
pass
def _human_click(self, element, description: str = ""):
"""
模拟真人点击随机偏移随机延迟
Args:
element: 要点击的元素
description: 描述信息
"""
try:
box = element.bounding_box()
if not box:
element.click()
return
# 随机偏移(不要点在正中心)
offset_x = random.uniform(-box['width'] * 0.3, box['width'] * 0.3)
offset_y = random.uniform(-box['height'] * 0.3, box['height'] * 0.3)
click_x = box['x'] + box['width'] / 2 + offset_x
click_y = box['y'] + box['height'] / 2 + offset_y
# 先移动鼠标到目标位置(模拟真人移动)
self.page.mouse.move(click_x, click_y, steps=random.randint(5, 15))
# 随机延迟后点击
time.sleep(random.uniform(0.1, 0.3))
self.page.mouse.click(click_x, click_y)
if description:
logger.debug(f"真人点击: {description} ({click_x:.0f}, {click_y:.0f})")
except Exception as e:
logger.debug(f"真人点击失败,使用普通点击: {str(e)}")
element.click()
def _human_type(self, message: str):
"""
模拟真人输入逐字符输入随机延迟
Args:
message: 要输入的消息
"""
try:
for char in message:
# 随机输入延迟50-200ms
delay = random.uniform(0.05, 0.2)
self.page.keyboard.type(char, delay=0)
time.sleep(delay)
# 偶尔停顿一下(模拟思考)
if random.random() < 0.05:
time.sleep(random.uniform(0.3, 0.8))
except Exception as e:
logger.debug(f"真人输入失败,使用普通输入: {str(e)}")
self.page.keyboard.type(message, delay=30)
def _clean_message_content(self, content: str) -> Optional[str]:
"""
清理消息内容过滤HTML标签和JSON命令
Args:
content: 原始消息内容
Returns:
清理后的内容如果是无效消息返回None
"""
if not content:
return None
# 过滤JSON命令消息
if content.strip().startswith('{') and '"type":"cmd"' in content:
return None
# 移除HTML标签
clean_content = re.sub(r'<[^>]+>', '', content)
# 去除多余空白
clean_content = clean_content.strip()
# 如果清理后为空返回None
if not clean_content:
return None
return clean_content
def _setup_response_listener(self):
"""设置API响应监听器监听heartbeat和recommend接口"""
if self._response_listener_active:
return
def handle_response(response: Response):
try:
url = response.url
# 调试:打印所有 ada.baidu.com 的请求
if 'ada.baidu.com' in url:
logger.debug(f"[API请求] {url[:100]}...")
# 监听heartbeat API - 获取医生回复
if self.ADA_HEARTBEAT_API in url and response.status == 200:
try:
data = response.json()
logger.debug(f"[Heartbeat] 响应: {json.dumps(data, ensure_ascii=False)[:500]}")
if data.get('status') == 0 and data.get('data', {}).get('talk'):
talks = data['data']['talk']
for talk in talks:
# 检查是否是医生/客服消息
if talk.get('source') == 'service' or talk.get('msgFrom') == 'service':
raw_content = talk.get('content', '')
msg_id = talk.get('messageId', '')
msg_time = talk.get('messageTime', '')
# 清理消息内容过滤HTML和JSON命令
content = self._clean_message_content(raw_content)
if not content:
logger.debug(f"[Heartbeat] 跳过无效消息: {raw_content[:50]}...")
continue
# 检查是否是新消息(避免重复)
if not any(r.get('messageId') == msg_id for r in self.doctor_replies):
reply_info = {
'content': content,
'messageId': msg_id,
'messageTime': msg_time,
'source': 'service',
'received_at': datetime.now().isoformat()
}
self.doctor_replies.append(reply_info)
logger.info(f"[医生回复] 收到新消息: {content[:100]}...")
except Exception as e:
logger.debug(f"[Heartbeat] 解析响应失败: {str(e)}")
# 监听recommend API - 获取推荐回复
elif self.ADA_RECOMMEND_API in url and response.status == 200:
try:
data = response.json()
logger.debug(f"[Recommend] 响应: {json.dumps(data, ensure_ascii=False)[:500]}")
if data.get('status') == 200 and data.get('data', {}).get('suggestInfo'):
suggest_info = data['data']['suggestInfo']
for suggest in suggest_info:
replies = suggest.get('suggestReply', [])
for reply in replies:
reply_id = reply.get('replyId', '')
# 避免重复
if not any(r.get('replyId') == reply_id for r in self.recommend_replies):
self.recommend_replies.append({
'replyId': reply_id,
'text': reply.get('text', ''),
'value': reply.get('value', ''),
'type': reply.get('type', 'text'),
'received_at': datetime.now().isoformat()
})
if self.recommend_replies:
logger.info(f"[推荐回复] 收到 {len(replies)} 条推荐: {[r['text'] for r in self.recommend_replies[-len(replies):]]}")
except Exception as e:
logger.debug(f"[Recommend] 解析响应失败: {str(e)}")
except Exception as e:
logger.debug(f"处理响应异常: {str(e)}")
self.page.on('response', handle_response)
self._response_listener_active = True
logger.info("已启用Ada平台API响应监听")
def _clear_reply_data(self):
"""清空回复数据"""
self.doctor_replies = []
self.recommend_replies = []
self.chat_history = []
self.sent_messages = [] # 记录所有发送的消息
def _call_qwen_api(self, doctor_message: str) -> Optional[str]:
"""
调用千问大模型生成回复
Args:
doctor_message: 医生/客服的消息
Returns:
生成的回复内容失败返回None
"""
try:
# 构建系统提示词
system_prompt = """你是一个正在咨询医疗问题的普通患者。请根据对方(医生/客服)的回复,以患者身份自然地继续对话。
要求
1. 回复要简短自然像真实患者一样说话
2. 可以询问价格时间医生资质等问题
3. 表现出对治疗的关心和一些担忧
4. 不要暴露自己是AI
5. 回复长度控制在20-50字以内
6. 使用口语化的表达
7. 绝对不要提及任何联系方式包括微信电话QQ手机号邮箱等
8. 如果对方索要联系方式委婉拒绝或转移话题"""
# 添加医生消息到历史
self.chat_history.append({
"role": "user", # 在API调用中医生消息作为user角色
"content": f"[医生/客服说]: {doctor_message}"
})
# 构建消息列表
messages = [{"role": "system", "content": system_prompt}]
messages.extend(self.chat_history)
# 调用API
headers = {
"Authorization": f"Bearer {Config.QWEN_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "qwen-turbo",
"messages": messages,
"temperature": 0.8,
"max_tokens": 150
}
logger.info(f"调用千问API医生消息: {doctor_message[:50]}...")
response = requests.post(
Config.QWEN_API_URL,
headers=headers,
json=payload,
timeout=30
)
if response.status_code == 200:
result = response.json()
reply = result.get('choices', [{}])[0].get('message', {}).get('content', '')
if reply:
# 检查回复是否包含敏感词(联系方式相关)
sensitive_words = ['微信', 'wx', 'WeChat', '电话', '手机', 'QQ', '邮箱', '@', '加我', '联系方式']
if any(word.lower() in reply.lower() for word in sensitive_words):
logger.warning(f"AI回复包含敏感词已过滤: {reply}")
# 返回一个安全的默认回复
reply = "好的,我了解了,还想问一下治疗大概需要多长时间呢?"
# 添加AI回复到历史
self.chat_history.append({
"role": "assistant",
"content": reply
})
logger.info(f"千问API回复: {reply}")
return reply
else:
logger.warning("千问API返回空内容")
return None
else:
logger.error(f"千问API调用失败: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"调用千问API异常: {str(e)}")
return None
2026-01-13 18:59:26 +08:00
2026-01-16 22:06:46 +08:00
def check_and_click_ad(self, url: str, site_id: int = None) -> Tuple[bool, bool]:
2026-01-13 18:59:26 +08:00
"""
检查并点击广告
Args:
url: MIP页面链接
2026-01-16 22:06:46 +08:00
site_id: 站点ID用于数据库记录
2026-01-13 18:59:26 +08:00
Returns:
(是否点击成功, 是否获得回复)
"""
2026-01-16 22:06:46 +08:00
self.site_id = site_id
2026-02-24 12:46:35 +08:00
# 清空之前的回复数据
self._clear_reply_data()
# 启用API响应监听
self._setup_response_listener()
2026-01-13 18:59:26 +08:00
try:
2026-01-16 22:06:46 +08:00
# 访问链接(带重试机制)
max_retries = 2
page_loaded = False
2026-02-24 12:46:35 +08:00
self._update_overlay("访问页面", url[:50] + "...")
2026-01-16 22:06:46 +08:00
for attempt in range(max_retries):
try:
logger.info(f"访问链接: {url} (第{attempt+1}次尝试)")
self.page.goto(url, wait_until='domcontentloaded', timeout=30000)
page_loaded = True
break
except Exception as goto_err:
if attempt < max_retries - 1:
logger.warning(f"访问超时,尝试刷新页面...")
try:
self.page.reload(wait_until='domcontentloaded', timeout=30000)
logger.info("✅ 页面刷新成功")
page_loaded = True
break
except:
logger.warning(f"刷新失败等待2秒后重试...")
time.sleep(2)
else:
logger.error(f"访问链接失败: {str(goto_err)}")
# 记录访问失败
self._record_click_failure(url, f"访问超时: {str(goto_err)}")
return False, False
if not page_loaded:
self._record_click_failure(url, "页面加载失败")
return False, False
2026-01-13 18:59:26 +08:00
# 等待页面加载
time.sleep(3)
# 检查是否存在商业广告
2026-02-24 12:46:35 +08:00
self._update_overlay("检测广告", "扫描页面中...")
2026-01-19 09:28:03 +08:00
has_ad, ad_elements = self._detect_commercial_ad()
2026-01-13 18:59:26 +08:00
if not has_ad:
logger.info("未检测到商业广告,跳过该链接")
2026-02-24 12:46:35 +08:00
self._update_overlay("未检测到广告", "跳过该链接")
2026-01-17 13:11:40 +08:00
# 记录无广告
self._record_click_failure(url, "未检测到商业广告")
2026-01-13 18:59:26 +08:00
return False, False
2026-01-19 09:28:03 +08:00
# 逐个尝试点击广告,直到成功
2026-02-24 12:46:35 +08:00
self._update_overlay("点击广告", f"检测到 {len(ad_elements)} 个广告")
2026-01-19 09:28:03 +08:00
logger.info(f"检测到商业广告,准备点击(共 {len(ad_elements)} 个)")
click_success = False
for idx, ad_element in enumerate(ad_elements, 1):
logger.info(f"尝试点击第 {idx}/{len(ad_elements)} 个广告...")
2026-02-24 12:46:35 +08:00
self._update_overlay("点击广告", f"尝试第 {idx}/{len(ad_elements)}")
2026-01-19 09:28:03 +08:00
if self._click_advertisement(ad_element):
logger.info(f"✅ 第 {idx} 个广告点击成功")
click_success = True
break
else:
logger.warning(f"❌ 第 {idx} 个广告点击失败,尝试下一个...")
# 等待一下再点下一个
time.sleep(1)
if not click_success:
logger.warning("所有广告均点击失败")
2026-02-24 12:46:35 +08:00
self._update_overlay("点击失败", "所有广告均点击失败")
2026-01-17 13:11:40 +08:00
# 记录点击失败
2026-01-19 09:28:03 +08:00
self._record_click_failure(url, f"所有广告({len(ad_elements)}个)均点击失败")
2026-01-13 18:59:26 +08:00
return False, False
2026-01-16 22:06:46 +08:00
# 记录点击到数据库
self._record_click(url)
2026-02-24 12:46:35 +08:00
# 等待聊天页面加载
self._update_overlay("进入聊天", "等待页面加载...")
logger.info("等待聊天页面加载...")
time.sleep(3)
2026-01-16 22:06:46 +08:00
2026-02-24 12:46:35 +08:00
# 检查是否跳转到非聊天页面
non_chat_domains = [
'sp.vejianzhan.com', # 微建站落地页
# 可以在这里添加更多需要跳过的域名
]
current_url = self.page.url.lower()
for domain in non_chat_domains:
if domain in current_url:
logger.info(f"检测到非聊天页面({domain}),判定为点击失败")
self._update_overlay("非聊天页面", "非聊天页面,跳过")
self._record_click_failure(url, f"跳转到非聊天页面({domain})")
return False, False
# 直接开始交互API响应需要页面交互才会触发
# 优先尝试点击页面上可见的推荐按钮,没有则发送初始消息
logger.info("开始首次交互...")
if not self._try_click_visible_recommend():
self._send_initial_message()
# 执行自动聊天交互3-5轮每轮间隔30-90秒
self._update_overlay("自动聊天", "开始交互...")
logger.info("开始自动聊天交互流程...")
interaction_rounds = self._auto_chat_interaction()
# 检查是否收到回复
has_reply = len(self.doctor_replies) > 0
2026-01-13 18:59:26 +08:00
2026-01-16 22:06:46 +08:00
# 记录互动到数据库
2026-02-24 12:46:35 +08:00
self._record_interaction(has_reply)
2026-01-16 22:06:46 +08:00
2026-02-24 12:46:35 +08:00
self._update_overlay("完成", f"交互 {interaction_rounds} 轮,回复: {has_reply}")
logger.info(f"交互结束,完成 {interaction_rounds} 轮,收到回复: {has_reply}")
2026-01-13 18:59:26 +08:00
return True, has_reply
except Exception as e:
logger.error(f"处理链接异常: {str(e)}")
2026-02-24 12:46:35 +08:00
self._update_overlay("异常", str(e)[:30])
2026-01-17 13:11:40 +08:00
# 记录异常
try:
self._record_click_failure(url, f"异常: {str(e)}")
except:
pass
2026-01-13 18:59:26 +08:00
return False, False
finally:
2026-02-24 12:46:35 +08:00
# 等待一会再移除浮窗
time.sleep(2)
self._remove_overlay()
2026-01-13 18:59:26 +08:00
# 尝试关闭当前标签页,返回主窗口
self._close_current_tab()
2026-01-19 09:28:03 +08:00
def _detect_commercial_ad(self) -> Tuple[bool, List[ElementHandle]]:
2026-01-13 18:59:26 +08:00
"""
检测页面是否存在商业广告
2026-01-19 09:28:03 +08:00
2026-01-13 18:59:26 +08:00
Returns:
2026-01-19 09:28:03 +08:00
(是否存在商业广告, 广告元素列表)
2026-01-13 18:59:26 +08:00
"""
try:
# 等待评论区加载
time.sleep(2)
2026-01-19 09:28:03 +08:00
# 查找包含“广告”标识的元素
2026-01-13 18:59:26 +08:00
ad_selectors = [
"//div[contains(@class, 'ad') or contains(@class, 'advertisement')]",
"//div[contains(text(), '广告')]",
"//*[contains(text(), '广告')]//ancestor::div[contains(@class, 'card')]",
"//a[contains(@class, 'ad-link')]",
]
2026-01-19 09:28:03 +08:00
ad_elements = []
2026-01-13 18:59:26 +08:00
for selector in ad_selectors:
try:
elements = self.page.locator(f"xpath={selector}").all()
if elements:
# 检查元素是否可见
for elem in elements:
if elem.is_visible():
# 进一步验证是否是商业广告非AI健康管家
2026-01-19 09:28:03 +08:00
try:
elem_text = elem.inner_text().lower()
if '广告' in elem_text and 'ai健康' not in elem_text:
ad_elements.append(elem)
except:
continue
2026-01-13 18:59:26 +08:00
except Exception:
continue
2026-01-19 09:28:03 +08:00
if ad_elements:
logger.info(f"检测到 {len(ad_elements)} 个商业广告")
return True, ad_elements
2026-01-13 18:59:26 +08:00
logger.info("未检测到商业广告")
2026-01-19 09:28:03 +08:00
return False, []
2026-01-13 18:59:26 +08:00
except Exception as e:
logger.error(f"检测广告异常: {str(e)}")
2026-01-19 09:28:03 +08:00
return False, []
2026-01-13 18:59:26 +08:00
2026-02-24 12:46:35 +08:00
def _get_ad_info(self, ad_element) -> str:
"""
获取广告元素的详细信息
Args:
ad_element: 广告元素
Returns:
广告信息字符串
"""
try:
info_parts = []
# 获取广告文本内容
try:
text = ad_element.inner_text()
if text:
# 清理文本只取前100字符
text = text.strip().replace('\n', ' ')[:100]
info_parts.append(f"文本: {text}")
except:
pass
# 获取广告链接
try:
href = ad_element.get_attribute('href')
if href:
info_parts.append(f"链接: {href[:80]}")
except:
pass
# 获取广告标题
try:
title = ad_element.get_attribute('title')
if title:
info_parts.append(f"标题: {title}")
except:
pass
# 尝试获取内部链接
if not any('链接' in p for p in info_parts):
try:
link = ad_element.locator('a').first
if link:
href = link.get_attribute('href')
if href:
info_parts.append(f"内链: {href[:80]}")
except:
pass
return ' | '.join(info_parts) if info_parts else "无详细信息"
except Exception as e:
return f"获取信息失败: {str(e)}"
2026-01-13 18:59:26 +08:00
def _click_advertisement(self, ad_element: ElementHandle) -> bool:
"""
2026-01-16 22:06:46 +08:00
点击广告元素当前页面导航
2026-01-13 18:59:26 +08:00
Args:
ad_element: 广告元素
Returns:
是否点击成功
"""
try:
2026-01-16 22:06:46 +08:00
original_url = self.page.url
2026-01-13 18:59:26 +08:00
2026-02-24 12:46:35 +08:00
# 获取广告详细信息
ad_info = self._get_ad_info(ad_element)
logger.info(f"广告信息: {ad_info}")
2026-01-13 18:59:26 +08:00
# 滚动到广告元素可见
ad_element.scroll_into_view_if_needed()
time.sleep(1)
2026-01-16 22:06:46 +08:00
# 直接点击广告(当前页面导航)
logger.info("点击广告...")
ad_element.click()
logger.info("已点击广告")
# 等待页面导航(增加等待时间,支持慢速电脑)
logger.info("等待页面跳转...")
max_wait = 10 # 最多等待10秒
check_interval = 1 # 每秒检查一次
2026-01-13 18:59:26 +08:00
2026-01-16 22:06:46 +08:00
for i in range(max_wait):
time.sleep(check_interval)
if self.page.url != original_url:
logger.info(f"✅ 页面已导航(耗时{i+1}秒): {original_url} -> {self.page.url}")
2026-01-19 09:28:03 +08:00
2026-02-24 12:46:35 +08:00
# 尝试等待页面加载完成,但不强制要求
2026-01-19 09:28:03 +08:00
try:
2026-02-24 12:46:35 +08:00
logger.info("等待页面加载...")
self.page.wait_for_load_state('domcontentloaded', timeout=10000)
2026-01-19 09:28:03 +08:00
logger.info("✅ 页面加载完成")
except Exception as load_err:
2026-02-24 12:46:35 +08:00
# 页面加载超时不判定为失败,继续执行
# 因为聊天页面可能已经可用API响应已经在接收
logger.warning(f"⚠️ 页面加载超时但URL已跳转继续执行...")
2026-01-19 09:28:03 +08:00
2026-01-16 22:06:46 +08:00
break
else:
# 循环正常结束(未跳转)
logger.error(f"❌ 页面URL未变化等待{max_wait}秒后),广告点击失败: {self.page.url}")
return False
2026-01-13 18:59:26 +08:00
2026-01-16 22:06:46 +08:00
# 等待聊天页面加载
time.sleep(2)
2026-01-13 18:59:26 +08:00
return True
except Exception as e:
logger.error(f"点击广告异常: {str(e)}")
return False
2026-01-16 22:06:46 +08:00
def _send_consultation_message(self) -> bool:
"""
在聊天页面发送随机咨询消息
Returns:
是否发送成功
"""
try:
logger.info("准备发送咨询消息...")
# 随机选择一条消息
message = random.choice(self.CONSULTATION_MESSAGES)
logger.info(f"选择的消息: {message}")
# 等待页面加载完成
time.sleep(2)
# 打印当前页面URL
logger.info(f"当前页面: {self.page.url}")
# 常见的输入框选择器优先通过placeholder查找
input_selectors = [
# 优先通过placeholder查找
"textarea[placeholder*='消息']",
"textarea[placeholder*='问题']",
"input[type='text'][placeholder*='消息']",
"input[type='text'][placeholder*='问题']",
"textarea[placeholder*='输入']",
"textarea[placeholder*='发送']",
"input[type='text'][placeholder*='输入']",
"input[type='text'][placeholder*='发送']",
# 次选通过class查找
"textarea[class*='input']",
# 兜底:通用选择器
"div[contenteditable='true']",
"textarea",
"input[type='text']"
]
input_element = None
logger.info("开始查找输入框...")
for selector in input_selectors:
try:
elements = self.page.locator(selector).all()
logger.debug(f"选择器 {selector} 找到 {len(elements)} 个元素")
for elem in elements:
if elem.is_visible():
input_element = elem
logger.info(f"✅ 找到可见输入框: {selector}")
break
if input_element:
break
except Exception as e:
logger.debug(f"选择器 {selector} 失败: {str(e)}")
continue
if not input_element:
logger.warning("❌ 未找到输入框")
# 尝试截图便于调试
try:
if self.task_folder:
screenshot_path = self.task_folder / "debug_no_input.png"
else:
screenshot_path = Path(f"./logs/debug_no_input_{int(time.time())}.png")
self.page.screenshot(path=str(screenshot_path))
logger.info(f"已保存调试截图: {screenshot_path}")
except Exception as e:
logger.warning(f"截图失败: {str(e)}")
# 兜底方案:尝试查找并点击任何可能的输入区域
logger.warning("尝试兜底方案:查找所有可能的输入区域...")
try:
# 先滚动到页面最底部
self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
time.sleep(1)
# 尝试查找所有可能的输入相关元素并点击
fallback_selectors = [
"textarea",
"input[type='text']",
"div[contenteditable='true']",
"div[class*='input']",
"div[class*='textarea']",
"div[class*='message']",
"div[class*='chat']",
"div[id*='input']",
"div[id*='message']"
]
clicked = False
for selector in fallback_selectors:
try:
elements = self.page.locator(selector).all()
logger.debug(f"兜底选择器 {selector} 找到 {len(elements)} 个元素")
for elem in elements:
if elem.is_visible():
# 滚动到元素位置
elem.scroll_into_view_if_needed()
time.sleep(0.5)
# 点击元素
elem.click()
time.sleep(1)
logger.info(f"已点击元素: {selector}")
clicked = True
break
if clicked:
break
except Exception as e:
logger.debug(f"兜底选择器 {selector} 失败: {str(e)}")
continue
if clicked:
2026-02-24 12:46:35 +08:00
# 使用真人模拟输入
self._human_type(message)
2026-01-16 22:06:46 +08:00
logger.info("✅ 已输入消息(兜底)")
# 直接按回车发送
self.page.keyboard.press('Enter')
logger.info("✅ 已按回车键发送(兜底)")
# 保存已发送的消息内容
self.sent_message = message
time.sleep(2)
return True
else:
logger.error("❌ 兜底方案未找到任何可点击的输入区域")
return False
except Exception as fallback_err:
logger.error(f"兜底方案失败: {str(fallback_err)}")
return False
# 正常流程:点击输入框获取焦点
input_element.click()
time.sleep(0.5)
2026-02-24 12:46:35 +08:00
# 使用真人模拟输入
self._human_type(message)
2026-01-16 22:06:46 +08:00
logger.info("✅ 已输入消息")
time.sleep(1)
# 尝试发送消息(优先回车,再尝试按钮)
sent = False
# 方法1优先按回车键发送
try:
logger.info("尝试按回车键发送...")
input_element.press('Enter')
logger.info("✅ 已按回车键发送")
sent = True
time.sleep(1)
except Exception as e:
logger.warning(f"❌ 按回车键失败: {str(e)}")
# 方法2兜底: 尝试找到发送按钮并点击
if not sent:
send_button_selectors = [
"button:has-text('发送')",
"button[class*='send']",
"button[type='submit']",
"div[class*='send']",
"span:has-text('发送')"
]
logger.info("开始查找发送按钮...")
for selector in send_button_selectors:
try:
buttons = self.page.locator(selector).all()
logger.debug(f"选择器 {selector} 找到 {len(buttons)} 个按钮")
for btn in buttons:
if btn.is_visible() and btn.is_enabled():
btn.click()
logger.info(f"✅ 已点击发送按钮: {selector}")
sent = True
break
if sent:
break
except Exception as e:
logger.debug(f"选择器 {selector} 失败: {str(e)}")
continue
if sent:
logger.info("✅ 消息发送成功")
# 保存已发送的消息内容
self.sent_message = message
time.sleep(2) # 等待消息发送完成
return True
else:
logger.warning("❌ 未能发送消息")
# 截图调试
try:
if self.task_folder:
screenshot_path = self.task_folder / "debug_send_failed.png"
else:
screenshot_path = Path(f"./logs/debug_send_failed_{int(time.time())}.png")
self.page.screenshot(path=str(screenshot_path))
logger.info(f"已保存调试截图: {screenshot_path}")
except:
pass
return False
except Exception as e:
logger.error(f"发送消息异常: {str(e)}")
import traceback
traceback.print_exc()
return False
def _record_click(self, site_url: str):
"""记录点击到数据库"""
try:
if not self.site_id:
logger.warning("未设置 site_id跳过点击记录")
return
from db_manager import ClickManager
click_mgr = ClickManager()
self.click_id = click_mgr.record_click(
site_id=self.site_id,
site_url=site_url,
user_ip=None, # 可以后续添加代理IP
device_type='pc'
)
logger.info(f"已记录点击: click_id={self.click_id}")
except Exception as e:
logger.error(f"记录点击失败: {str(e)}")
def _record_click_failure(self, site_url: str, error_message: str):
"""
记录点击失败到数据库
Args:
site_url: 站点URL
error_message: 错误信息
"""
try:
if not self.site_id:
logger.warning("未设置 site_id跳过失败记录")
return
from db_manager import ClickManager
click_mgr = ClickManager()
# 记录点击(失败也计数)
self.click_id = click_mgr.record_click(
site_id=self.site_id,
site_url=site_url,
user_ip=None,
device_type='pc'
)
# 记录互动失败
from db_manager import InteractionManager
interaction_mgr = InteractionManager()
interaction_mgr.record_interaction(
site_id=self.site_id,
click_id=self.click_id,
interaction_type='reply',
reply_content=None,
is_successful=False,
response_received=False,
error_message=error_message
)
logger.info(f"已记录失败: {error_message}")
except Exception as e:
logger.error(f"记录失败异常: {str(e)}")
def _record_interaction(self, response_received: bool):
2026-02-24 12:46:35 +08:00
"""记录互动到数据库(包含医生回复内容)"""
2026-01-16 22:06:46 +08:00
try:
if not self.site_id:
logger.warning("未设置 site_id跳过互动记录")
return
from db_manager import InteractionManager
interaction_mgr = InteractionManager()
2026-02-24 12:46:35 +08:00
# 获取完整聊天记录
full_chat_log = self._get_full_chat_log()
if full_chat_log:
logger.info(f"完整聊天记录 ({len(self.doctor_replies)}条医生消息):")
logger.debug(full_chat_log[:500])
2026-01-16 22:06:46 +08:00
interaction_id = interaction_mgr.record_interaction(
site_id=self.site_id,
click_id=self.click_id,
interaction_type='message', # 符合数据库ENUM定义reply/comment/message/form_submit/follow/like/share
reply_content=getattr(self, 'sent_message', None),
is_successful=True,
response_received=response_received,
2026-02-24 12:46:35 +08:00
response_content=full_chat_log # 保存完整聊天记录
2026-01-16 22:06:46 +08:00
)
logger.info(f"已记录互动: interaction_id={interaction_id}, response={response_received}")
2026-02-24 12:46:35 +08:00
# 记录详细的回复信息到日志
if self.doctor_replies:
logger.info(f"本次共收到 {len(self.doctor_replies)} 条医生回复:")
for idx, reply in enumerate(self.doctor_replies, 1):
logger.info(f" [{idx}] {reply.get('content', '')[:100]}")
2026-01-16 22:06:46 +08:00
except Exception as e:
logger.error(f"记录互动失败: {str(e)}")
2026-01-13 18:59:26 +08:00
def _wait_for_reply(self) -> bool:
"""
2026-02-24 12:46:35 +08:00
等待广告主回复通过监听heartbeat API
2026-01-13 18:59:26 +08:00
Returns:
是否收到回复
"""
try:
logger.info(f"等待广告主回复(最多{Config.REPLY_WAIT_TIMEOUT}秒)")
2026-02-24 12:46:35 +08:00
# 记录等待开始时的回复数量
initial_reply_count = len(self.doctor_replies)
2026-01-13 18:59:26 +08:00
# 等待并检查回复
start_time = time.time()
timeout = Config.REPLY_WAIT_TIMEOUT
2026-02-24 12:46:35 +08:00
check_interval = 2 # 每2秒检查一次
2026-01-13 18:59:26 +08:00
while time.time() - start_time < timeout:
2026-02-24 12:46:35 +08:00
time.sleep(check_interval)
2026-01-13 18:59:26 +08:00
2026-02-24 12:46:35 +08:00
# 检查是否有新的医生回复通过heartbeat API监听获取
if len(self.doctor_replies) > initial_reply_count:
new_replies = self.doctor_replies[initial_reply_count:]
logger.info(f"收到 {len(new_replies)} 条医生回复")
for reply in new_replies:
logger.info(f" - {reply.get('content', '')[:100]}")
# 尝试发送推荐回复进行二次互动
self._try_click_recommend_reply()
2026-01-13 18:59:26 +08:00
return True
2026-02-24 12:46:35 +08:00
# 打印等待进度
elapsed = int(time.time() - start_time)
if elapsed % 10 == 0 and elapsed > 0:
logger.info(f"等待中... ({elapsed}/{timeout}秒)")
2026-01-13 18:59:26 +08:00
logger.info("未收到广告主回复(超时)")
return False
except Exception as e:
logger.error(f"等待回复异常: {str(e)}")
return False
2026-02-24 12:46:35 +08:00
def _try_click_recommend_reply(self) -> bool:
"""
尝试点击推荐回复按钮
Returns:
是否点击成功
"""
try:
# 检查是否有推荐回复
if not self.recommend_replies:
logger.info("暂无推荐回复")
return False
# 需要过滤的关键词(电话相关)
filter_keywords = ['电话', '拨打', '致电', '来电', '通话', '微信', '加微', 'wx', 'WeChat', '满意度', '评价', '好评', '差评']
# 获取推荐回复文本列表(过滤电话相关)
recommend_texts = []
for recommend in self.recommend_replies:
text = recommend.get('text', '') or recommend.get('value', '')
if text:
# 过滤电话相关
if any(kw in text for kw in filter_keywords):
logger.debug(f"跳过电话相关推荐: {text}")
continue
recommend_texts.append(text)
if not recommend_texts:
logger.warning("推荐回复内容为空(或全是电话相关)")
return False
logger.info(f"查找推荐回复按钮: {recommend_texts}")
# 推荐回复按钮的选择器
button_selectors = [
# 常见的推荐回复按钮选择器
"div[class*='suggest'] button",
"div[class*='suggest'] div[class*='item']",
"div[class*='recommend'] button",
"div[class*='recommend'] div[class*='item']",
"div[class*='quick'] button",
"div[class*='quick'] div[class*='reply']",
"button[class*='suggest']",
"button[class*='recommend']",
"div[class*='bubble'] span",
"div[class*='reply-item']",
"span[class*='suggest']",
]
# 遍历选择器查找按钮
for selector in button_selectors:
try:
elements = self.page.locator(selector).all()
for elem in elements:
if elem.is_visible():
try:
elem_text = elem.inner_text().strip()
# 检查按钮文本是否匹配推荐回复
for recommend_text in recommend_texts:
if recommend_text in elem_text or elem_text in recommend_text:
logger.info(f"找到推荐回复按钮: {elem_text}")
elem.click()
logger.info(f"✅ 已点击推荐回复: {elem_text}")
self.sent_recommend_reply = elem_text
self.sent_message = f"[推荐回复] {elem_text}"
if not hasattr(self, 'sent_messages'):
self.sent_messages = []
self.sent_messages.append({'role': '我方(推荐回复)', 'content': elem_text})
time.sleep(1)
return True
except:
continue
except Exception as e:
logger.debug(f"选择器 {selector} 失败: {str(e)}")
continue
# 如果没有找到匹配的按钮,尝试通过文本内容直接查找
logger.info("尝试通过文本内容查找推荐回复按钮...")
for recommend_text in recommend_texts:
try:
# 使用XPath通过文本内容查找
xpath_selectors = [
f"//*[contains(text(), '{recommend_text[:10]}')]",
f"//button[contains(text(), '{recommend_text[:10]}')]",
f"//span[contains(text(), '{recommend_text[:10]}')]",
f"//div[contains(text(), '{recommend_text[:10]}')]",
]
for xpath in xpath_selectors:
try:
elements = self.page.locator(f"xpath={xpath}").all()
for elem in elements:
if elem.is_visible():
# 检查元素是否可点击(不是整个容器)
box = elem.bounding_box()
if box and box['width'] < 500 and box['height'] < 100:
logger.info(f"找到推荐回复元素: {recommend_text[:20]}")
elem.click()
logger.info(f"✅ 已点击推荐回复: {recommend_text}")
self.sent_recommend_reply = recommend_text
self.sent_message = f"[推荐回复] {recommend_text}"
if not hasattr(self, 'sent_messages'):
self.sent_messages = []
self.sent_messages.append({'role': '我方(推荐回复)', 'content': recommend_text})
time.sleep(1)
return True
except:
continue
except Exception as e:
logger.debug(f"文本查找失败: {str(e)}")
continue
logger.warning("未找到可点击的推荐回复按钮")
return False
except Exception as e:
logger.error(f"点击推荐回复异常: {str(e)}")
return False
def _try_click_visible_recommend(self) -> bool:
"""
遍历每条带推荐回复的消息从中随机选一个点击
Returns:
是否点击成功
"""
try:
# 需要过滤的关键词
filter_keywords = ['电话', '拨打', '致电', '来电', '通话', '微信', '加微', 'wx', 'WeChat', '满意度', '评价', '好评', '差评']
# 查找所有推荐回复组(每组对应一条消息的推荐)
recommend_group_selectors = [
"div.gt-jmy-h5-c-msg-tag",
"div[class*='msg-tag']",
"div[class*='suggest-reply']",
"div[class*='quick-reply']",
"div[class*='recommend-reply']",
]
clicked_count = 0
for group_selector in recommend_group_selectors:
try:
groups = self.page.locator(group_selector).all()
for group in groups:
if not group.is_visible():
continue
# 获取该组内的所有推荐选项
options = group.locator("span.content-text").all()
if not options:
options = group.locator("span").all()
if not options:
options = group.locator("button").all()
if not options:
options = group.locator("div[class*='item']").all()
# 收集该组内可用的选项
available_options = []
for opt in options:
try:
if opt.is_visible():
text = opt.inner_text().strip()
# 过滤敏感词
if any(kw in text for kw in filter_keywords):
continue
if text and len(text) < 30:
available_options.append({'elem': opt, 'text': text})
except:
continue
# 从该组中随机选一个点击
if available_options:
selected = random.choice(available_options)
logger.info(f"推荐选项: {[o['text'] for o in available_options]},选择: {selected['text']}")
self._human_click(selected['elem'], f"推荐回复: {selected['text']}")
logger.info(f"✅ 已点击推荐: {selected['text']}")
self.sent_recommend_reply = selected['text']
self.sent_message = f"[推荐回复] {selected['text']}" # 记录到sent_message
if not hasattr(self, 'sent_messages'):
self.sent_messages = []
self.sent_messages.append({'role': '我方(推荐回复)', 'content': selected['text']})
clicked_count += 1
# 随机延迟2-5秒模拟真人操作
delay = random.uniform(2, 5)
time.sleep(delay)
except:
continue
if clicked_count > 0:
logger.info(f"共点击 {clicked_count} 个推荐回复")
return True
# 兜底:使用通用选择器查找
button_selectors = [
"div[class*='suggest'] span",
"div[class*='recommend'] span",
"div[class*='quick-reply'] span",
"button[class*='suggest']",
"div[class*='reply-item']",
"span[class*='reply']",
]
available_buttons = []
for selector in button_selectors:
try:
elements = self.page.locator(selector).all()
for elem in elements:
if elem.is_visible():
try:
text = elem.inner_text().strip()
if any(kw in text for kw in filter_keywords):
continue
if text and len(text) < 30:
box = elem.bounding_box()
if box and box['width'] < 200 and box['height'] < 60:
if not any(b['text'] == text for b in available_buttons):
available_buttons.append({'elem': elem, 'text': text})
except:
continue
except:
continue
if available_buttons:
selected = random.choice(available_buttons)
logger.info(f"找到 {len(available_buttons)} 个推荐按钮,选择: {selected['text']}")
# 随机延迟1-3秒后点击
time.sleep(random.uniform(1, 3))
self._human_click(selected['elem'], f"推荐按钮: {selected['text']}")
logger.info(f"✅ 已点击推荐按钮: {selected['text']}")
self.sent_recommend_reply = selected['text']
if not hasattr(self, 'sent_messages'):
self.sent_messages = []
self.sent_messages.append({'role': '我方(推荐回复)', 'content': selected['text']})
# 点击后随机延迟2-4秒
time.sleep(random.uniform(2, 4))
return True
logger.debug("未找到可见的推荐按钮")
return False
except Exception as e:
logger.error(f"查找推荐按钮异常: {str(e)}")
return False
def _count_dom_recommend_buttons(self) -> int:
"""
统计页面DOM中的推荐按钮数量
Returns:
推荐按钮数量
"""
try:
count = 0
# 基于实际页面结构的选择器
selectors = [
"div.gt-jmy-h5-c-msg-tag span.content-text",
"div[class*='msg-tag'] span.content-text",
]
for selector in selectors:
try:
elements = self.page.locator(selector).all()
for elem in elements:
if elem.is_visible():
count += 1
except:
continue
return count
except:
return 0
def _get_latest_doctor_message_from_dom(self) -> Optional[str]:
"""
从DOM获取最新的医生消息并添加到doctor_replies
Returns:
最新医生消息内容
"""
try:
# 基于实际页面结构的选择器
selectors = [
"div.msg-container-normal div.mip-sjh-text",
"div[class*='msg-container'] div.mip-sjh-text",
"div[class*='bot-msg'] div.mip-sjh-text",
]
for selector in selectors:
try:
elements = self.page.locator(selector).all()
if elements:
# 获取最后一个(最新的)消息
last_elem = elements[-1]
if last_elem.is_visible():
text = last_elem.inner_text().strip()
if text:
# 检查是否已存在(避免重复)
if not any(r.get('content') == text for r in self.doctor_replies):
self.doctor_replies.append({
'content': text,
'messageId': f'dom_{len(self.doctor_replies)}',
'source': 'dom',
'received_at': datetime.now().isoformat()
})
logger.debug(f"[DOM] 添加医生消息: {text[:50]}...")
return text
except:
continue
return None
except:
return None
def _send_initial_message(self):
"""发送初始咨询消息"""
initial_message = random.choice(self.CONSULTATION_MESSAGES)
if self._send_message_to_chat(initial_message):
logger.info(f"✅ 已发送初始消息: {initial_message}")
self.sent_message = initial_message
else:
logger.warning("初始消息发送失败")
def _send_message_to_chat(self, message: str) -> bool:
"""
在聊天页面发送消息
Args:
message: 要发送的消息
Returns:
是否发送成功
"""
try:
# 查找输入框(多种选择器,按优先级排列)
input_selectors = [
# 基于实际页面结构(自定义输入框组件)
"div.gt-jmy-h5-bot-text-input",
"div.text-input",
"div.input-area",
"div.fake-input",
# 基于class名称
"textarea.chat-input",
"textarea[class*='input']",
"textarea[class*='textarea']",
"div[class*='input'] textarea",
"div[class*='chat'] textarea",
# 基于placeholder
"textarea[placeholder*='消息']",
"textarea[placeholder*='问题']",
"textarea[placeholder*='输入']",
"textarea[placeholder*='说点']",
"textarea[placeholder*='描述']",
"input[type='text'][placeholder*='消息']",
"input[type='text'][placeholder*='输入']",
# 基于contenteditable
"div[contenteditable='true']",
# 通用兜底
"textarea",
"input[type='text']"
]
# 最多重试3次
for retry in range(3):
input_element = None
is_custom_input = False # 标记是否是自定义输入框
for selector in input_selectors:
try:
elements = self.page.locator(selector).all()
for elem in elements:
if elem.is_visible():
box = elem.bounding_box()
if box and box['height'] > 20:
input_element = elem
# 检查是否是自定义输入框
if 'gt-jmy' in selector or 'fake-input' in selector or 'text-input' in selector:
is_custom_input = True
logger.debug(f"找到输入框: {selector}, 自定义: {is_custom_input}")
break
if input_element:
break
except:
continue
if input_element:
break
# 没找到,等待后重试
if retry < 2:
logger.info(f"未找到输入框等待2秒后重试... ({retry+1}/3)")
time.sleep(2)
try:
self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
except:
pass
# 兜底方案:点击页面底部中心位置激活输入框
if not input_element:
logger.info("尝试兜底方案:点击页面底部中心位置...")
try:
# 获取页面尺寸
viewport = self.page.viewport_size
if viewport:
# 点击底部中心上方一点的位置大约底部往上100px
click_x = viewport['width'] // 2
click_y = viewport['height'] - 100
self.page.mouse.click(click_x, click_y)
logger.info(f"点击位置: ({click_x}, {click_y})")
time.sleep(0.5)
# 使用真人模拟输入
self._human_type(message)
time.sleep(0.3)
self.page.keyboard.press('Enter')
self.sent_message = message # 记录发送内容
if not hasattr(self, 'sent_messages'):
self.sent_messages = []
self.sent_messages.append({'role': '我方', 'content': message})
logger.info(f"✅ 已发送消息(兜底方案): {message[:50]}...")
time.sleep(1)
return True
except Exception as e:
logger.warning(f"兜底方案失败: {str(e)}")
logger.warning("所有方案均失败")
return False
# 点击输入框获取焦点
input_element.click()
time.sleep(0.5)
# 使用真人模拟输入
logger.debug("使用真人模拟输入...")
self._human_type(message)
time.sleep(0.5)
# 发送消息:先尝试点击发送按钮,再尝试按回车
sent = False
# 方法1点击发送按钮
send_btn_selectors = [
"div.send-btn",
"div.icon.send-btn",
"button.send-btn",
"span.send-btn",
"div[class*='send']",
"button[class*='send']",
]
for btn_selector in send_btn_selectors:
try:
btn = self.page.locator(btn_selector).first
if btn and btn.is_visible():
btn.click()
logger.debug(f"点击发送按钮: {btn_selector}")
sent = True
break
except:
continue
# 方法2按回车键
if not sent:
try:
self.page.keyboard.press('Enter')
sent = True
except:
pass
if sent:
self.sent_message = message # 记录发送内容
if not hasattr(self, 'sent_messages'):
self.sent_messages = []
self.sent_messages.append({'role': '我方', 'content': message})
logger.info(f"✅ 已发送消息: {message[:50]}...")
time.sleep(1)
return True
else:
logger.warning("发送消息失败")
return False
except Exception as e:
logger.error(f"发送消息异常: {str(e)}")
return False
def _wait_for_new_doctor_reply(self, timeout: int = 60) -> Optional[str]:
"""
等待新的医生回复
Args:
timeout: 等待超时时间
Returns:
新的医生回复内容超时返回None
"""
try:
initial_count = len(self.doctor_replies)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(2)
# 检查是否有新回复
if len(self.doctor_replies) > initial_count:
# 获取最新的回复
new_reply = self.doctor_replies[-1]
content = new_reply.get('content', '')
if content:
logger.info(f"收到新医生回复: {content[:50]}...")
return content
# 打印等待进度
elapsed = int(time.time() - start_time)
if elapsed % 15 == 0 and elapsed > 0:
logger.info(f"等待医生回复... ({elapsed}/{timeout}秒)")
logger.info("等待医生回复超时")
return None
except Exception as e:
logger.error(f"等待医生回复异常: {str(e)}")
return None
def _auto_chat_interaction(self) -> int:
"""
自动聊天交互3-5
流程
1. 持续监控API响应收到推荐回复立即点击
2. 如果没有推荐回复但有医生回复使用AI生成回复
3. 重复3-5每轮间隔30-90
Returns:
实际完成的交互轮数
"""
try:
# 随机决定交互轮数3-5轮
target_rounds = random.randint(3, 5)
completed_rounds = 0
logger.info(f"开始自动聊天交互,目标轮数: {target_rounds}")
for round_num in range(1, target_rounds + 1):
logger.info(f"=== 第 {round_num}/{target_rounds} 轮交互 ===")
self._update_overlay("聊天交互", f"{round_num}/{target_rounds}")
# 记录本轮开始时的状态
initial_recommend_count = len(self.recommend_replies)
initial_doctor_count = len(self.doctor_replies)
initial_dom_button_count = self._count_dom_recommend_buttons()
round_completed = False
last_doctor_reply = None
no_recommend_count = 0 # 连续找不到推荐按钮的次数
# 持续监控最多等待60秒
start_time = time.time()
timeout = 60
while time.time() - start_time < timeout and not round_completed:
time.sleep(1) # 每秒检查一次
# 每次循环都尝试点击可见的推荐按钮(遍历所有消息)
clicked = self._try_click_visible_recommend()
if clicked:
logger.info(f"✅ 第 {round_num} 轮点击了推荐回复")
completed_rounds += 1
round_completed = True
no_recommend_count = 0
# 更新计数
initial_recommend_count = len(self.recommend_replies)
initial_dom_button_count = self._count_dom_recommend_buttons()
continue
else:
no_recommend_count += 1
# 检查API是否有新的推荐回复
if len(self.recommend_replies) > initial_recommend_count:
logger.info("[API] 检测到新推荐回复...")
initial_recommend_count = len(self.recommend_replies)
no_recommend_count = 0
# 尝试点击
if self._try_click_visible_recommend() or self._try_click_recommend_reply():
logger.info(f"✅ 第 {round_num} 轮使用推荐回复完成")
completed_rounds += 1
round_completed = True
# 检查DOM是否有新的推荐按钮
if not round_completed:
current_dom_count = self._count_dom_recommend_buttons()
if current_dom_count > initial_dom_button_count:
logger.info(f"[DOM] 检测到新推荐按钮 ({initial_dom_button_count} -> {current_dom_count})...")
initial_dom_button_count = current_dom_count
no_recommend_count = 0
if self._try_click_visible_recommend():
logger.info(f"✅ 第 {round_num} 轮通过DOM点击推荐完成")
completed_rounds += 1
round_completed = True
# 记录医生回复用于AI生成
if len(self.doctor_replies) > initial_doctor_count:
last_doctor_reply = self.doctor_replies[-1].get('content', '')
initial_doctor_count = len(self.doctor_replies)
if not last_doctor_reply:
dom_doctor_msg = self._get_latest_doctor_message_from_dom()
if dom_doctor_msg:
last_doctor_reply = dom_doctor_msg
# 如果收到医生回复但连续10秒找不到推荐按钮提前使用AI
if not round_completed and last_doctor_reply and no_recommend_count >= 10:
logger.info(f"已收到医生回复但连续{no_recommend_count}秒无推荐按钮使用AI回复...")
ai_reply = self._call_qwen_api(last_doctor_reply)
if ai_reply and self._send_message_to_chat(ai_reply):
logger.info(f"✅ 第 {round_num} 轮使用AI回复完成")
completed_rounds += 1
round_completed = True
else:
logger.warning("AI回复发送失败继续等待推荐按钮...")
no_recommend_count = 0 # 重置计数,继续尝试
# 如果连续25秒没有任何响应无推荐按钮也无医生回复主动发消息
if not round_completed and not last_doctor_reply and no_recommend_count >= 25:
# 尝试获取历史医生消息
history_doctor_msg = None
if self.doctor_replies:
history_doctor_msg = self.doctor_replies[-1].get('content', '')
if not history_doctor_msg:
history_doctor_msg = self._get_latest_doctor_message_from_dom()
if history_doctor_msg:
logger.info(f"连续{no_recommend_count}秒无新响应使用历史消息生成AI回复...")
ai_reply = self._call_qwen_api(history_doctor_msg)
if ai_reply and self._send_message_to_chat(ai_reply):
logger.info(f"✅ 第 {round_num} 轮使用AI回复历史消息完成")
completed_rounds += 1
round_completed = True
else:
no_recommend_count = 0
else:
# 完全没有历史消息,发送激活消息
logger.info(f"连续{no_recommend_count}秒无响应且无历史消息,发送激活消息...")
fallback_messages = [
"您好,请问还在吗?",
"想咨询一下具体情况",
"请问医生什么时候有空呢?",
"我想了解一下治疗方案"
]
fallback_msg = random.choice(fallback_messages)
if self._send_message_to_chat(fallback_msg):
logger.info(f"✅ 第 {round_num} 轮发送激活消息: {fallback_msg}")
completed_rounds += 1
round_completed = True
else:
no_recommend_count = 0
# 打印等待进度
elapsed = int(time.time() - start_time)
if elapsed % 15 == 0 and elapsed > 0:
logger.info(f"等待中... ({elapsed}/{timeout}秒)")
# 如果本轮没有通过推荐回复完成尝试使用AI
if not round_completed:
if last_doctor_reply:
logger.info("无可用推荐回复使用千问AI生成回复...")
ai_reply = self._call_qwen_api(last_doctor_reply)
if ai_reply and self._send_message_to_chat(ai_reply):
logger.info(f"✅ 第 {round_num} 轮使用AI回复完成")
completed_rounds += 1
round_completed = True
else:
logger.warning(f"{round_num} 轮AI回复发送失败尝试点击推荐按钮...")
# 再次尝试点击推荐按钮
if self._try_click_visible_recommend():
logger.info(f"✅ 第 {round_num} 轮通过点击推荐按钮完成")
completed_rounds += 1
round_completed = True
else:
logger.warning(f"{round_num} 轮失败,继续下一轮...")
else:
# 当前轮没有收到新回复,尝试用历史消息生成回复
logger.warning(f"{round_num} 轮未收到新回复,尝试使用历史消息...")
# 先尝试点击推荐按钮
if self._try_click_visible_recommend():
logger.info(f"✅ 第 {round_num} 轮通过点击推荐按钮完成")
completed_rounds += 1
round_completed = True
else:
# 尝试获取历史医生消息
history_doctor_msg = None
if self.doctor_replies:
history_doctor_msg = self.doctor_replies[-1].get('content', '')
if not history_doctor_msg:
history_doctor_msg = self._get_latest_doctor_message_from_dom()
if history_doctor_msg:
logger.info("使用历史消息生成AI回复...")
ai_reply = self._call_qwen_api(history_doctor_msg)
if ai_reply and self._send_message_to_chat(ai_reply):
logger.info(f"✅ 第 {round_num} 轮使用AI回复历史消息完成")
completed_rounds += 1
round_completed = True
else:
# 完全没有历史消息,发送激活消息
fallback_messages = [
"您好,请问还在吗?",
"想咨询一下具体情况",
"请问医生什么时候有空呢?",
"我想了解一下治疗方案"
]
fallback_msg = random.choice(fallback_messages)
if self._send_message_to_chat(fallback_msg):
logger.info(f"✅ 第 {round_num} 轮发送激活消息: {fallback_msg}")
completed_rounds += 1
round_completed = True
else:
logger.warning(f"{round_num} 轮发送消息失败")
# 如果还有下一轮随机等待30-90秒
if round_num < target_rounds:
wait_seconds = random.randint(30, 90)
logger.info(f"等待 {wait_seconds} 秒后进行下一轮...")
time.sleep(wait_seconds)
logger.info(f"自动聊天交互完成,共 {completed_rounds}/{target_rounds}")
return completed_rounds
except Exception as e:
logger.error(f"自动聊天交互异常: {str(e)}")
return completed_rounds if 'completed_rounds' in locals() else 0
def _get_doctor_reply_content(self) -> Optional[str]:
"""获取医生回复内容(合并所有回复)"""
if not self.doctor_replies:
return None
# 合并所有回复内容
contents = [r.get('content', '') for r in self.doctor_replies if r.get('content')]
return '\n'.join(contents) if contents else None
def _get_full_chat_log(self) -> Optional[str]:
"""
获取完整聊天记录格式化为文本
数据来源
1. doctor_replies - API监听/DOM采集的医生消息
2. chat_history - 与千问API的对话历史user=医生, assistant=我方AI回复
3. sent_messages - 所有发送的消息包括推荐回复和AI回复
4. sent_recommend_reply - 点击的推荐回复兼容旧代码
5. sent_message - 最后发送的消息兼容旧代码
Returns:
格式化的聊天记录文本
"""
try:
chat_lines = []
seen_contents = set() # 用于去重
# 1. 从 chat_history 中提取对话包含医生消息和AI回复
for i, msg in enumerate(self.chat_history):
role = msg.get('role', '')
content = msg.get('content', '')
if not content:
continue
if role == 'user':
# 医生消息,去掉前缀 "[医生/客服说]: "
if content.startswith('[医生/客服说]:'):
content = content.replace('[医生/客服说]:', '').strip()
elif content.startswith('[医生/客服说]'):
content = content.replace('[医生/客服说]', '').strip()
if content and content not in seen_contents:
chat_lines.append(f"[医生] {content}")
seen_contents.add(content)
elif role == 'assistant':
# 我方AI回复
if content and content not in seen_contents:
chat_lines.append(f"[我方(AI)] {content}")
seen_contents.add(content)
# 2. 补充 doctor_replies 中的消息(可能有些消息没进入 chat_history
for reply in self.doctor_replies:
content = reply.get('content', '')
if content and content not in seen_contents:
chat_lines.append(f"[医生] {content}")
seen_contents.add(content)
# 3. 从 sent_messages 获取所有发送的消息
if hasattr(self, 'sent_messages') and self.sent_messages:
for msg in self.sent_messages:
content = msg.get('content', '')
role = msg.get('role', '我方')
if content and content not in seen_contents:
chat_lines.append(f"[{role}] {content}")
seen_contents.add(content)
# 4. 添加发送的推荐回复(兼容旧代码)
if hasattr(self, 'sent_recommend_reply') and self.sent_recommend_reply:
content = self.sent_recommend_reply
if content not in seen_contents:
chat_lines.append(f"[我方(推荐回复)] {content}")
seen_contents.add(content)
# 5. 添加最后发送的消息(兼容旧代码)
if hasattr(self, 'sent_message') and self.sent_message:
content = self.sent_message
if not content.startswith('[推荐回复]') and content not in seen_contents:
chat_lines.append(f"[我方] {content}")
seen_contents.add(content)
result = '\n'.join(chat_lines) if chat_lines else None
# 调试日志
logger.info(f"聊天记录汇总: chat_history={len(self.chat_history)}条, doctor_replies={len(self.doctor_replies)}条, sent_messages={len(getattr(self, 'sent_messages', []))}条, 输出={len(chat_lines)}")
return result
except Exception as e:
logger.error(f"获取完整聊天记录失败: {str(e)}")
return None
2026-01-13 18:59:26 +08:00
def _count_messages(self) -> int:
"""
统计当前页面的消息数量
Returns:
消息数量
"""
try:
# 根据实际页面结构调整选择器
# 这里是示例选择器,需要根据实际情况修改
message_selectors = [
"//div[contains(@class, 'message')]",
"//div[contains(@class, 'chat-message')]",
"//div[contains(@class, 'msg-item')]",
]
for selector in message_selectors:
try:
messages = self.page.locator(f"xpath={selector}").all()
if messages:
return len(messages)
except:
continue
return 0
except Exception as e:
logger.error(f"统计消息数量异常: {str(e)}")
return 0
def _close_current_tab(self):
"""关闭当前标签页并返回主窗口"""
try:
pages = self.page.context.pages
if len(pages) > 1:
self.page.close()
self.page = pages[0]
logger.info("已关闭广告页面")
except Exception as e:
logger.error(f"关闭标签页异常: {str(e)}")
def random_delay(self, min_seconds: int = 2, max_seconds: int = 5):
"""随机延迟,模拟人工操作"""
delay = random.uniform(min_seconds, max_seconds)
time.sleep(delay)