This commit is contained in:
sjk
2026-01-10 21:46:50 +08:00
parent 3b66018271
commit 213229953b
14 changed files with 1499 additions and 282 deletions

View File

@@ -576,13 +576,12 @@ class XHSLoginService:
# 尝试查找二维码图片元素
qrcode_selectors = [
'.qrcode-img', # 根据您提供的HTML
'.qrcode-img', # 小红书风控二维码的特定class
'img.qrcode-img',
'.qrcode-container img',
'img[src*="data:image"]', # base64图片
'img[src*="qrcode"]',
'img[alt*="二维码"]',
'img[alt*="qrcode"]',
'.qrcode-container img', # 二维码容器内的图片
'.verify-captcha img', # 验证弹窗内的图片
'img[alt*="二维码"]', # alt属性包含"二维码"
'img[alt*="qrcode"]', # alt属性包含"qrcode"
]
for selector in qrcode_selectors:
@@ -643,27 +642,37 @@ class XHSLoginService:
后台监听扫码后的页面跳转和二维码失效
通过监听小红书API https://edith.xiaohongshu.com/api/redcaptcha/v2/qr/status/query
来精准判断二维码状态:
- status=1: 未过期,等待扫码
- status=5: 扫码,等待确认
- 其他: 失效或已完成
- status=1: 正常,等待扫码
- status=2: 扫码完成,待APP确认
- status=5: 二维码已过期/失效
Args:
session_id: 会话 ID
"""
try:
logger.info(f"[WebSocket] 开始监听扫码状态: {session_id}")
# 等待1秒确保WebSocket连接完全建立
logger.info(f"[WebSocket] 等待WebSocket连接建立...")
await asyncio.sleep(1.0)
logger.info(f"[WebSocket] 等待完成,开始监听")
if not self.page:
logger.error(f"[WebSocket] 页面对象不存在: {session_id}")
return
# 用于存储最新的二维码状态
latest_qr_status = {"status": 1, "scanned": False}
latest_qr_status = {"status": 1}
# 标记是否已推送失效消息
expired_notified = False
# 标记是否已推送扫码成功消息
scan_success_notified = False
# 记录上次推送的状态,避免重复推送
last_notified_status = None
# 设置响应监听拦截二维码状态查询API
async def handle_qr_status_response(response):
nonlocal last_notified_status
try:
if '/api/redcaptcha/v2/qr/status/query' in response.url:
json_data = await response.json()
@@ -671,11 +680,56 @@ class XHSLoginService:
status = json_data['data'].get('status')
latest_qr_status['status'] = status
if status == 5:
latest_qr_status['scanned'] = True
logger.info(f"[WebSocket] 检测到二维码已扫描,等待确认: status={status}")
elif status == 1:
logger.debug(f"[WebSocket] 二维码未过期,等待扫码: status={status}")
# 推送状态变化给前端
if status != last_notified_status:
status_message = {
1: "等待扫码",
2: "扫码完成请在APP中确认",
5: "二维码已过期"
}.get(status, f"二维码状态: {status}")
try:
# 使用Redis发布消息避免事件循环隔离问题
import redis
import json as json_lib
from config import get_config
config = get_config()
redis_host = config.get_str('redis.host', 'localhost')
redis_port = config.get_int('redis.port', 6379)
redis_password = config.get_str('redis.password', '')
redis_client = redis.Redis(
host=redis_host,
port=redis_port,
password=redis_password if redis_password else None,
decode_responses=True
)
message = {
"type": "qrcode_status",
"status": status,
"message": status_message
}
# 发布到Redis频道
channel = f"ws_message:{session_id}"
redis_client.publish(channel, json_lib.dumps(message))
logger.info(f"[WebSocket] 已通过Redis推送二维码状态: status={status}, channel={channel}")
last_notified_status = status
redis_client.close()
except Exception as ws_error:
logger.error(f"[WebSocket] 推送状态失败: {str(ws_error)}")
import traceback
traceback.print_exc()
if status == 1:
logger.debug(f"[WebSocket] 二维码正常,等待扫码: status={status}")
elif status == 2:
logger.info(f"[WebSocket] 检测到扫码完成,等待APP确认: status={status}")
elif status == 5:
logger.warning(f"[WebSocket] 检测到二维码已过期: status={status}")
else:
logger.info(f"[WebSocket] 二维码状态: status={status}")
except Exception as e:
@@ -694,88 +748,104 @@ class XHSLoginService:
# 1. 检测是否跳转回首页不再是captcha/verify页
if 'captcha' not in current_url.lower() and 'verify' not in current_url.lower():
# 如果已经推送过扫码成功消息,跳过
if scan_success_notified:
continue
# 检查是否跳转到小红书首页
if 'xiaohongshu.com' in current_url:
logger.success(f"[WebSocket] 检测到扫码完成,页面跳转回: {current_url}")
# 等待500ms确保WebSocket连接完全建立
await asyncio.sleep(0.5)
# 通过WebSocket推送扫码成功消息
try:
from main import ws_manager
await ws_manager.send_message(session_id, {
# 使用Redis发布消息
import redis
import json as json_lib
from config import get_config
config = get_config()
redis_host = config.get_str('redis.host', 'localhost')
redis_port = config.get_int('redis.port', 6379)
redis_password = config.get_str('redis.password', '')
redis_client = redis.Redis(
host=redis_host,
port=redis_port,
password=redis_password if redis_password else None,
decode_responses=True
)
message = {
"type": "qrcode_scan_success",
"message": "扫码验证完成,请重新发送验证码"
})
logger.success(f"[WebSocket] 已推送扫码成功消息: {session_id}")
}
channel = f"ws_message:{session_id}"
redis_client.publish(channel, json_lib.dumps(message))
logger.success(f"[WebSocket] 已通过Redis推送扫码成功消息: channel={channel}")
scan_success_notified = True
redis_client.close()
except Exception as ws_error:
logger.error(f"[WebSocket] 推送消息失败: {str(ws_error)}")
break
import traceback
traceback.print_exc()
# 不退出监听,继续等待用户后续操作
logger.info(f"[WebSocket] 扫码成功,保持监听状态")
# 2. 检测二维码是否失效通过API状态判断
if 'captcha' in current_url.lower() or 'verify' in current_url.lower():
# 如果已经推送过失效消息跳过后续检测
# 如果已经推送过失效消息,跳过后续检测
if expired_notified:
continue
# 如果状态不是1和5说明二维码可能已失效
if latest_qr_status['status'] not in [1, 5]:
logger.warning(f"[WebSocket] 检测到二维码失效: status={latest_qr_status['status']}")
# 如果状态是5,说明二维码已过期
if latest_qr_status['status'] == 5:
logger.warning(f"[WebSocket] API检测到二维码过期: status=5")
# 等待500ms确保WebSocket连接完全建立
await asyncio.sleep(0.5)
# 通过WebSocket推送失效消息
try:
from main import ws_manager
await ws_manager.send_message(session_id, {
# 使用Redis发布消息
import redis
import json as json_lib
from config import get_config
config = get_config()
redis_host = config.get_str('redis.host', 'localhost')
redis_port = config.get_int('redis.port', 6379)
redis_password = config.get_str('redis.password', '')
redis_client = redis.Redis(
host=redis_host,
port=redis_port,
password=redis_password if redis_password else None,
decode_responses=True
)
message = {
"type": "qrcode_expired",
"message": "二维码已失效,请重新发送验证码"
})
logger.success(f"[WebSocket] 已推送二维码失效消息: {session_id}")
expired_notified = True # 标记已推送
}
channel = f"ws_message:{session_id}"
redis_client.publish(channel, json_lib.dumps(message))
logger.success(f"[WebSocket] 已通过Redis推送二维码失效消息: channel={channel}")
expired_notified = True
redis_client.close()
except Exception as ws_error:
logger.error(f"[WebSocket] 推送消息失败: {str(ws_error)}")
break # 退出监听循环
import traceback
traceback.print_exc()
# 备用方案检查页面文本以防API未返回
try:
expired_selectors = [
'text="已过期"',
'text="二维码已失效"',
'text="二维码过期"',
]
for selector in expired_selectors:
expired_elem = await self.page.query_selector(selector)
if expired_elem:
is_visible = await expired_elem.is_visible()
if is_visible:
# 进一步检查元素文本内容
text_content = await expired_elem.text_content()
# 只在明确显示"已过期"或"已失效"时才认为失效,忽略"二维码X分钟失效"这种提示
if text_content and ('已过期' in text_content or '已失效' in text_content):
logger.warning(f"[WebSocket] DOM检测到二维码失效: {selector}, 文本: {text_content}")
# 通过WebSocket推送失效消息
try:
from main import ws_manager
await ws_manager.send_message(session_id, {
"type": "qrcode_expired",
"message": "二维码已失效,请重新发送验证码"
})
logger.success(f"[WebSocket] 已推送二维码失效消息: {session_id}")
expired_notified = True # 标记已推送
except Exception as ws_error:
logger.error(f"[WebSocket] 推送消息失败: {str(ws_error)}")
# 退出所有循环
break
# 如果检测到失效,退出外层循环
if expired_notified:
break
except Exception as e:
# 页面可能已关闭,忽略错误
pass
# 不退出监听,继续等待用户重新操作
# 每30秒打印一次状态
if i > 0 and i % 60 == 0:
@@ -783,21 +853,11 @@ class XHSLoginService:
except Exception as e:
logger.error(f"[WebSocket] 监听异常: {str(e)}")
break
# 不退出,继续监听
# 超时5分钟未扫码,通知前端关闭弹窗
logger.warning(f"[WebSocket] 扫码监听超时(5分钟): {session_id}")
try:
from main import ws_manager
await ws_manager.send_message(session_id, {
"type": "qrcode_expired",
"message": "二维码已超时,请重新发送验证码"
})
logger.success(f"[WebSocket] 已推送超时消息: {session_id}")
except Exception as ws_error:
logger.error(f"[WebSocket] 推送消息失败: {str(ws_error)}")
logger.info(f"[WebSocket] 监听任务结束: {session_id}")
# 超时5分钟,通知前端(但不退出监听)
logger.warning(f"[WebSocket] 监听已运行5分钟: {session_id}")
logger.info(f"[WebSocket] 监听仍将继续,直到用户关闭页面")
except Exception as e:
logger.error(f"[WebSocket] 监听任务异常: {str(e)}")
@@ -850,6 +910,26 @@ class XHSLoginService:
logger.warning(f"[页面导航] 导航超时,但尝试继续: {str(e)}")
logger.info(f"[页面导航] 当前URL: {current_url}")
# 检测小红书反爬JSON页面
await asyncio.sleep(0.5) # 等待页面内容加载
try:
page_content = await self.page.content()
# 检查页面是否只返回JSON小红书的检测机制
if page_content and len(page_content) < 500: # JSON页面通常很短
# 尝试解析JSON
if '{"code"' in page_content and '"success":true' in page_content:
logger.warning("="*50)
logger.warning("⚠️ 检测到小红书反爬JSON页面")
logger.warning(f"页面内容: {page_content[:200]}")
logger.warning("="*50)
# 抛出异常,让外层处理
raise Exception("ANTI_CRAWL_JSON")
except Exception as e:
if "ANTI_CRAWL_JSON" in str(e):
raise # 重新抛出,让外层捕获
# 其他异常忽略,继续执行
pass
# 等待二维码API请求最多等待timeout秒
wait_count = 0
max_wait = timeout * 10 # 每次等待0.1秒
@@ -919,12 +999,30 @@ class XHSLoginService:
else:
# 页面变了,重新访问登录页
logger.success(f"[预热] 页面已变更 ({current_url}),重新访问{page_name}登录页...")
await self._navigate_with_qrcode_listener(login_url)
try:
await self._navigate_with_qrcode_listener(login_url)
except Exception as e:
if "ANTI_CRAWL_JSON" in str(e):
logger.error("⚠️ 检测到小红书反爬检测,请稍后再试")
return {
"success": False,
"error": "当前IP被小红书检测请等待5分钟后再试"
}
raise
else:
# 未预热或不是池模式,使用监听机制访问页面
logger.debug(f"正在访问{page_name}登录页...")
await self._navigate_with_qrcode_listener(login_url)
try:
await self._navigate_with_qrcode_listener(login_url)
except Exception as e:
if "ANTI_CRAWL_JSON" in str(e):
logger.error("⚠️ 检测到小红书反爬检测,请稍后再试")
return {
"success": False,
"error": "当前IP被小红书检测请等待5分钟后再试"
}
raise
logger.success(f"✅ 已进入{page_name}登录页面")
@@ -951,9 +1049,8 @@ class XHSLoginService:
logger.info(f"二维码数据长度: {len(qrcode_data)} 字符")
logger.info("返回二维码给前端,等待用户扫码后重新调用接口")
# 启动后台任务监听页面跳转,扫码完成后通知前端
asyncio.create_task(self._monitor_qrcode_scan(session_id))
logger.info(f"[WebSocket] 已启动扫码监听任务: {session_id}")
# 不再在这里启动监听任务由main.py中的WebSocket端点启动
# asyncio.create_task(self._monitor_qrcode_scan(session_id))
return {
"success": False,
@@ -1517,14 +1614,14 @@ class XHSLoginService:
logger.success(f"✅ 检测到登录成功,用户: {user_me_data.get('nickname')}")
# 通过WebSocket推送登录成功消息
if session_id:
if self.session_id:
try:
from main import ws_manager
await ws_manager.send_message(session_id, {
await ws_manager.send_message(self.session_id, {
"type": "login_success",
"user_info": user_me_data
})
logger.info(f"[WebSocket] 已推送登录成功消息: {session_id}")
logger.info(f"[WebSocket] 已推送登录成功消息: {self.session_id}")
except Exception as ws_error:
logger.error(f"[WebSocket] 推送消息失败: {str(ws_error)}")
except Exception as e:
@@ -1599,7 +1696,7 @@ class XHSLoginService:
"need_captcha": True,
"captcha_type": "qrcode",
"qrcode_image": qrcode_data,
"session_id": session_id, # 返回session_id供后续轮询使用
"session_id": self.session_id, # 返回session_id供后续轮询使用
"message": "需要扫码验证请使用小红书APP扫描二维码"
}
else:
@@ -1626,40 +1723,114 @@ class XHSLoginService:
logger.info(f"最终URL: {self.page.url}")
logger.info("="*50)
# 2. 即使URL没变也要检测页面上是否出现二维码弹窗
logger.info("检测页面上是否出现扫码验证...")
qrcode_selectors = [
'.qrcode-img',
'img.qrcode-img',
'.qrcode-container img',
'img[src*="data:image"]',
'img[src*="qrcode"]',
'img[alt*="二维码"]',
'img[alt*="qrcode"]',
]
for selector in qrcode_selectors:
# 2. 只有在未检测到风控且未登录成功时,才检测页面上是否出现二维码弹窗
current_url = self.page.url
# 如果已经跳转到成功页面,不再检测二维码
if 'explore' in current_url or 'creator' in current_url or 'xiaohongshu.com' in current_url:
if 'login' not in current_url:
logger.info("已跳转到登录成功页面,跳过二维码检测")
else:
logger.info("仍在登录页,检测页面上是否出现扫码验证...")
# 先检测提示文本
try:
tip_elem = await self.page.query_selector('.tip')
if tip_elem:
tip_text = await tip_elem.inner_text()
logger.info(f"检测到提示文本: {tip_text}")
if '扫码' in tip_text or '二维码' in tip_text:
logger.warning("⚠️ 确认检测到扫码验证提示")
except Exception as e:
logger.debug(f"检测提示文本失败: {str(e)}")
qrcode_selectors = [
'.qrcode-img', # 小红书风控二维码的特定class
'img.qrcode-img',
'.qrcode-container img', # 二维码容器内的图片
'.qrcode .qrcode-img', # 二维码容器下的二维码图片
'.verify-captcha img', # 验证弹窗内的图片
'.login-container .qrcode-img', # 登录容器内的二维码
'img[alt*="二维码"]', # alt属性包含"二维码"
'img[alt*="qrcode"]', # alt属性包含"qrcode"
]
for selector in qrcode_selectors:
try:
qrcode_elem = await self.page.query_selector(selector)
if qrcode_elem:
logger.info(f"检测到符合选择器的元素: {selector},尝试提取二维码...")
qrcode_data = await self.extract_verification_qrcode()
if qrcode_data:
logger.warning(f"⚠️ 确认检测到风控二维码: {selector}")
logger.success("✅ 成功提取扫码验证二维码,返回给前端")
# 注意不移除API监听保持session_id对应的浏览器继续运行
return {
"success": False,
"need_captcha": True,
"captcha_type": "qrcode",
"qrcode_image": qrcode_data,
"session_id": self.session_id, # 返回session_id供后续轮询使用
"message": "需要扫码验证请使用小红书APP扫描二维码"
}
else:
logger.debug(f"选择器 {selector} 匹配到元素但无法提取二维码,可能不是风控二维码")
break
except Exception as e:
logger.debug(f"选择器 {selector} 检测失败: {str(e)}")
continue
logger.info("未检测到扫码验证")
else:
logger.info("仍在登录页,检测页面上是否出现扫码验证...")
# 先检测提示文本
try:
qrcode_elem = await self.page.query_selector(selector)
if qrcode_elem:
logger.warning(f"⚠️ 检测到页面上出现二维码: {selector}")
qrcode_data = await self.extract_verification_qrcode()
if qrcode_data:
logger.success("✅ 成功提取扫码验证二维码,返回给前端")
# 注意不移除API监听保持session_id对应的浏览器继续运行
return {
"success": False,
"need_captcha": True,
"captcha_type": "qrcode",
"qrcode_image": qrcode_data,
"session_id": session_id, # 返回session_id供后续轮询使用
"message": "需要扫码验证请使用小红书APP扫描二维码"
}
break
except Exception:
continue
tip_elem = await self.page.query_selector('.tip')
if tip_elem:
tip_text = await tip_elem.inner_text()
logger.info(f"检测到提示文本: {tip_text}")
if '扫码' in tip_text or '二维码' in tip_text:
logger.warning("⚠️ 确认检测到扫码验证提示")
except Exception as e:
logger.debug(f"检测提示文本失败: {str(e)}")
qrcode_selectors = [
'.qrcode-img', # 小红书风控二维码的特定class
'img.qrcode-img',
'.qrcode-container img', # 二维码容器内的图片
'.qrcode .qrcode-img', # 二维码容器下的二维码图片
'.verify-captcha img', # 验证弹窗内的图片
'.login-container .qrcode-img', # 登录容器内的二维码
'img[alt*="二维码"]', # alt属性包含"二维码"
'img[alt*="qrcode"]', # alt属性包含"qrcode"
]
for selector in qrcode_selectors:
try:
qrcode_elem = await self.page.query_selector(selector)
if qrcode_elem:
logger.info(f"检测到符合选择器的元素: {selector},尝试提取二维码...")
qrcode_data = await self.extract_verification_qrcode()
if qrcode_data:
logger.warning(f"⚠️ 确认检测到风控二维码: {selector}")
logger.success("✅ 成功提取扫码验证二维码,返回给前端")
# 注意不移除API监听保持session_id对应的浏览器继续运行
return {
"success": False,
"need_captcha": True,
"captcha_type": "qrcode",
"qrcode_image": qrcode_data,
"session_id": self.session_id, # 返回session_id供后续轮询使用
"message": "需要扫码验证请使用小红书APP扫描二维码"
}
else:
logger.debug(f"选择器 {selector} 匹配到元素但无法提取二维码,可能不是风控二维码")
break
except Exception as e:
logger.debug(f"选择器 {selector} 检测失败: {str(e)}")
continue
logger.info("未检测到扫码验证")
logger.info("未检测到扫码验证,继续等待登录...")
logger.info("继续等待登录...")
# 等待URL跳转或API响应最多30秒
logger.info("[登录检测] 等待扫码完成或登录跳转...")
@@ -1908,6 +2079,7 @@ class XHSLoginService:
"localStorage": localStorage_data, # API 返回localStorage数据
"sessionStorage": sessionStorage_data, # API 返回sessionStorage数据
"url": current_url,
"storage_state": storage_state_data, # 新增Playwright storage_state对象
"storage_state_path": storage_state_path # 新增storage_state文件路径
}
@@ -3162,6 +3334,26 @@ class XHSLoginService:
current_url = self.page.url
logger.success(f"[扫码登录] 页面加载完成, 当前URL: {current_url}")
# 检测小红书反爬JSON页面
await asyncio.sleep(0.5) # 等待页面内容加载
try:
page_content = await self.page.content()
# 检查页面是否只返回JSON小红书的检测机制
if page_content and len(page_content) < 500: # JSON页面通常很短
# 尝试解析JSON
if '{"code"' in page_content and '"success":true' in page_content:
logger.warning("="*50)
logger.warning("⚠️ 检测到小红书反爬JSON页面")
logger.warning(f"页面内容: {page_content[:200]}")
logger.warning("="*50)
return {
"success": False,
"error": "当前IP被小红书检测请等待5分钟后再试"
}
except Exception as e:
# 其他异常忽略,继续执行
pass
# 检查是否跳转到验证码页面
if '/website-login/captcha' in current_url or 'verifyUuid=' in current_url:
logger.warning(f"[扫码登录] 检测到风控验证页面,尝试等待或跳过...")