This commit is contained in:
sjk
2026-01-07 22:55:12 +08:00
parent cb267e8d5e
commit 4720ab2a15
76 changed files with 3110 additions and 7168 deletions

View File

@@ -17,6 +17,16 @@ from datetime import datetime
from pathlib import Path
from browser_pool import get_browser_pool
from error_screenshot import save_error_screenshot, save_screenshot_with_html
from loguru import logger
from damai_proxy_config import get_random_proxy, format_proxy_for_playwright
# 配置loguru日志格式
logger.remove() # 移除默认handler
logger.add(
sys.stderr,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{message}</cyan>",
level="INFO"
)
async def download_image(url: str) -> str:
@@ -65,18 +75,20 @@ async def download_image(url: str) -> str:
class XHSLoginService:
"""小红书登录服务"""
def __init__(self, use_pool: bool = True, headless: bool = True, session_id: Optional[str] = None):
def __init__(self, use_pool: bool = True, headless: bool = True, session_id: Optional[str] = None, use_page_isolation: bool = False):
"""
初始化登录服务
Args:
use_pool: 是否使用浏览器池默认True提升性能
headless: 是否使用无头模式False为有头模式方便调试
session_id: 会话ID用于并发隔离不同的session_id会创建独立的浏览器实例
session_id: 会话 ID用于并发隔离不同的session_id会创建独立的浏览器实例
use_page_isolation: 是否使用页面隔离模式(扫码登录专用,减少浏览器实例数)
"""
self.use_pool = use_pool
self.headless = headless
self.session_id = session_id # 保存session_id用于并发隔离
self.use_page_isolation = use_page_isolation # 页面隔离模式
self.browser_pool = get_browser_pool(headless=headless) if use_pool else None
self.playwright = None
self.browser: Optional[Browser] = None
@@ -84,17 +96,26 @@ class XHSLoginService:
self.page: Optional[Page] = None
self.current_phone = None
async def init_browser(self, cookies: Optional[list] = None, proxy: Optional[str] = None, user_agent: Optional[str] = None, restore_state: bool = False):
async def init_browser(self, cookies: Optional[list] = None, proxy: Optional[dict] = None, user_agent: Optional[str] = None, restore_state: bool = False, use_random_proxy: bool = True):
"""
初始化浏览器
Args:
cookies: 可选的Cookie列表用于恢复登录状态
proxy: 可选的代理地址,例如 http://user:pass@ip:port
proxy: 可选的代理配置,例如 {"server": "http://ip:port", "username": "...", "password": "..."}
user_agent: 可选的自定义User-Agent
restore_state: 是否从log_state.json文件恢复完整登录状态
use_random_proxy: 是否自动使用随机代理默认True
"""
try:
# 如果没有指定代理且启用自动代理,则使用随机代理
if not proxy and use_random_proxy:
try:
proxy_config = get_random_proxy()
proxy = format_proxy_for_playwright(proxy_config)
logger.info(f"[代理] 自动选择代理: {proxy_config['name']} ({proxy_config['server']})")
except Exception as e:
logger.info(f"[代理] 无可用代理,使用直连访问")
# 如果要求恢复状态,先加载 login_state.json
login_state = None
if restore_state and os.path.exists('login_state.json'):
@@ -112,12 +133,54 @@ class XHSLoginService:
# 使用浏览器池
if self.use_pool and self.browser_pool:
# 扫码登录使用页面隔离模式
if self.use_page_isolation and self.session_id:
print(f"[页面隔离模式] 获取扫码登录页面 (session_id={self.session_id})", file=sys.stderr)
# 获取或创建页面
self.page = await self.browser_pool.get_qrcode_page(self.session_id)
# 使用浏览器池的主浏览器和context
self.browser = self.browser_pool.browser
self.context = self.browser_pool.context
print("浏览器初始化成功(页面隔离模式)", file=sys.stderr)
return
# 普通浏览器池模式
print(f"[浏览器池模式] 从浏览器池获取实例 (session_id={self.session_id}, headless={self.headless})", file=sys.stderr)
self.browser, self.context, self.page = await self.browser_pool.get_browser(
cookies=cookies, proxy=proxy, user_agent=user_agent, session_id=self.session_id,
headless=self.headless # 传递headless参数
)
# 保存proxy配置
if proxy:
self.proxy = proxy
# 检查page状态如果是空白页或已关闭重新创建page
try:
current_url = self.page.url
print(f"当前URL: {current_url}", file=sys.stderr)
if current_url == 'about:blank' or current_url == '':
print("[浏览器池] 检测到空白页面重新创建page", file=sys.stderr)
try:
# 关闭旧page
await self.page.close()
except Exception as e:
print(f"[浏览器池] 关闭旧page失败: {str(e)}", file=sys.stderr)
# 创建新page
self.page = await self.context.new_page()
print(f"[浏览器池] 已创建新page, 新URL: {self.page.url}", file=sys.stderr)
# 更新浏览器池中保存的page引用
if self.session_id and self.session_id in self.browser_pool.temp_browsers:
self.browser_pool.temp_browsers[self.session_id]["page"] = self.page
print("[浏览器池] 已更新浏览器池中的page引用", file=sys.stderr)
except Exception as e:
print(f"[浏览器池] 检查page状态异常: {str(e)}", file=sys.stderr)
# 如果有localStorage/sessionStorage恢复它们
if login_state:
await self._restore_storage(login_state)
@@ -149,7 +212,8 @@ class XHSLoginService:
],
}
if proxy:
launch_kwargs["proxy"] = {"server": proxy}
launch_kwargs["proxy"] = proxy # 直接使用proxy字典
self.proxy = proxy # 保存proxy配置供后续使用
self.browser = await self.playwright.chromium.launch(**launch_kwargs)
@@ -390,13 +454,13 @@ class XHSLoginService:
except Exception as e:
print(f"⚠️ 恢夏storage失败: {str(e)}", file=sys.stderr)
async def init_browser_with_storage_state(self, storage_state_path: str, proxy: Optional[str] = None):
async def init_browser_with_storage_state(self, storage_state_path: str, proxy: Optional[dict] = None):
"""
使用Playwright原生storage_state初始化浏览器最优方案
Args:
storage_state_path: storage_state文件路径
proxy: 可选的代理地址
proxy: 可选的代理配置
"""
try:
if not os.path.exists(storage_state_path):
@@ -424,7 +488,7 @@ class XHSLoginService:
],
}
if proxy:
launch_kwargs["proxy"] = {"server": proxy}
launch_kwargs["proxy"] = proxy # 直接使用proxy字典
self.browser = await self.playwright.chromium.launch(**launch_kwargs)
@@ -574,6 +638,71 @@ class XHSLoginService:
print(f"⚠️ 提取二维码失败: {str(e)}", file=sys.stderr)
return None
async def _navigate_with_qrcode_listener(self, url: str, timeout: int = 120):
"""
带有二维码API监听的页面导航
通过监听https://edith.xiaohongshu.com/api/sns/web/v1/login/qrcode/create
来判断登录框是否已加载完成,而不是等待固定时间
Args:
url: 目标URL
timeout: 最大等待时间默认120秒
"""
qrcode_api_detected = False
# 设置路由监听二维码创建API
async def handle_qrcode_create(route):
nonlocal qrcode_api_detected
try:
request = route.request
logger.info(f"[页面导航] 监听到二维码API请求: {request.url}")
qrcode_api_detected = True
# 继续请求
await route.continue_()
except Exception as e:
logger.error(f"[页面导航] 处理二维码API请求失败: {str(e)}")
await route.continue_()
try:
# 注册路由监听
await self.page.route('**/api/sns/web/v1/login/qrcode/create', handle_qrcode_create)
logger.info(f"[页面导航] 已注册二维码API监听")
# 开始导航,不等待加载完成
try:
await self.page.goto(url, wait_until='commit', timeout=timeout * 1000)
logger.info(f"[页面导航] 已开始导航到 {url}")
except Exception as e:
# 即使超时也继续只要URL匹配
current_url = self.page.url
logger.warning(f"[页面导航] 导航超时,但尝试继续: {str(e)}")
logger.info(f"[页面导航] 当前URL: {current_url}")
# 等待二维码API请求最多等待timeout秒
wait_count = 0
max_wait = timeout * 10 # 每次等待0.1秒
while not qrcode_api_detected and wait_count < max_wait:
await asyncio.sleep(0.1)
wait_count += 1
if qrcode_api_detected:
logger.success(f"[页面导航] 监听到二维码API请求登录框已加载完成耗时{wait_count * 0.1:.1f}秒)")
else:
logger.warning(f"[页面导航] {timeout}秒内未监听到二维码API请求尝试继续")
# 额外等待500ms确保元素渲染完成
await asyncio.sleep(0.5)
finally:
# 移除路由监听
try:
await self.page.unroute('**/api/sns/web/v1/login/qrcode/create')
logger.info(f"[页面导航] 已移除二维码API监听")
except Exception:
pass
async def send_verification_code(self, phone: str, country_code: str = "+86", login_page: str = "creator") -> Dict[str, Any]:
"""
发送验证码
@@ -587,7 +716,10 @@ class XHSLoginService:
Dict containing success status and error message if any
"""
try:
logger.info(f"[发送验证码] 开始 - 手机号: {phone}, 登录页面: {login_page}")
if not self.page:
logger.info(f"[发送验证码] 浏览器未初始化,开始初始化...")
await self.init_browser()
self.current_phone = phone
@@ -608,19 +740,39 @@ class XHSLoginService:
else:
# 页面变了,重新访问登录页
print(f"[预热] 页面已变更 ({current_url}),重新访问{page_name}登录页...", file=sys.stderr)
await self.page.goto(login_url, wait_until='networkidle', timeout=30000)
await asyncio.sleep(0.5)
await self._navigate_with_qrcode_listener(login_url)
else:
# 未预热或不是池模式,正常访问页面
# 未预热或不是池模式,使用监听机制访问页面
print(f"正在访问{page_name}登录页...", file=sys.stderr)
# 优化超时时间缩短到30秒使用networkidle提升加载速度
try:
await self.page.goto(login_url, wait_until='networkidle', timeout=30000)
print("✅ 页面加载完成", file=sys.stderr)
except Exception as e:
print(f"页面加载超时,尝试继续: {str(e)}", file=sys.stderr)
# 超时后等待500ms让关键元素加载
await asyncio.sleep(0.5)
# 先验证代理IP如果配置了代理
if hasattr(self, 'proxy') and self.proxy:
try:
print(f"[代理验证] 配置的代理: {self.proxy.get('server', '未知')}", file=sys.stderr)
print(f"[代理验证] 正在访问 IP 查询网站...", file=sys.stderr)
await self.page.goto('https://httpbin.org/ip', timeout=15000)
ip_info = await self.page.locator('body').inner_text()
print(f"[代理验证] 当前 IP 信息:\n{ip_info}", file=sys.stderr)
# 简单解析IP地址
import json
try:
ip_data = json.loads(ip_info)
current_ip = ip_data.get('origin', '未知')
proxy_host = self.proxy.get('server', '').split('://')[-1].split(':')[0]
if proxy_host in current_ip or current_ip in self.proxy.get('server', ''):
print(f"[代理验证] ✅ 代理生效当前IP: {current_ip}", file=sys.stderr)
else:
print(f"[代理验证] ⚠️ 当前IP ({current_ip}) 与代理IP ({proxy_host}) 不匹配", file=sys.stderr)
except:
print(f"[代理验证] IP信息: {ip_info}", file=sys.stderr)
except Exception as e:
print(f"[代理验证] 验证失败: {str(e)}", file=sys.stderr)
else:
print(f"[代理验证] 未配置代理使用本机IP", file=sys.stderr)
await self._navigate_with_qrcode_listener(login_url)
print(f"✅ 已进入{page_name}登录页面", file=sys.stderr)
@@ -850,14 +1002,23 @@ class XHSLoginService:
]
# 直接查找,不重试
send_code_btn = None
send_code_selector = None
for selector in selectors:
send_code_btn = await self.page.query_selector(selector)
if send_code_btn:
print(f"✅ 找到发送验证码按钮: {selector}", file=sys.stderr)
send_code_selector = selector
break
if send_code_btn:
if send_code_selector:
# 重新获取元素句柄以确保其有效性
send_code_btn = await self.page.query_selector(send_code_selector)
if not send_code_btn:
return {
"success": False,
"error": "按钮元素已失效,请重试"
}
# 获取按钮文本内容
btn_text = await send_code_btn.inner_text()
btn_text = btn_text.strip() if btn_text else ""
@@ -892,9 +1053,20 @@ class XHSLoginService:
}
print(f"✅ 按钮已激活: class={class_name}", file=sys.stderr)
# 点击按钮
await send_code_btn.click()
print("✅ 已点击发送验证码", file=sys.stderr)
# 点击前再次确保元素有效页面DOM可能在检查过程中更新
try:
# 使用 page.click 直接通过选择器点击,避免元素句柄失效问题
await self.page.click(send_code_selector, timeout=5000)
print("✅ 已点击发送验证码", file=sys.stderr)
except Exception as click_error:
# 如果直接点击失败,尝试重新获取元素点击
print(f"⚠️ 直接点击失败: {str(click_error)}, 尝试重新获取元素", file=sys.stderr)
send_code_btn = await self.page.query_selector(send_code_selector)
if send_code_btn:
await send_code_btn.click()
print("✅ 重新获取元素后点击成功", file=sys.stderr)
else:
raise Exception("按钮元素已失效,无法点击")
# 等待页面响应,检测是否出现验证二维码
await asyncio.sleep(1.5)
@@ -924,6 +1096,7 @@ class XHSLoginService:
}
# 直接返回成功,不再检测滑块
logger.info(f"[发送验证码] 成功 - 手机号: {phone}")
print("\n✅ 验证码发送流程完成,请查看手机短信", file=sys.stderr)
print("请在小程序中输入收到的验证码并点击登录\n", file=sys.stderr)
print("[响应即将返回] success=True, message=验证码发送成功", file=sys.stderr)
@@ -951,6 +1124,7 @@ class XHSLoginService:
except Exception as e:
error_msg = str(e)
logger.error(f"[发送验证码] 异常 - 手机号: {phone}, 错误: {error_msg}")
print(f"\n❌ 发送验证码异常: {error_msg}", file=sys.stderr)
print(f"当前页面URL: {self.page.url if self.page else 'N/A'}", file=sys.stderr)
@@ -2519,3 +2693,649 @@ class XHSLoginService:
"success": False,
"error": str(e)
}
async def start_qrcode_login(self, login_page: str = "home") -> Dict[str, Any]:
"""
启动小红书首页的扫码登录流程
Args:
login_page: 登录页面类型默认home(小红书首页)
Returns:
Dict containing qrcode image and status
"""
try:
if not self.page:
await self.init_browser()
# 访问小红书首页
login_url = 'https://www.xiaohongshu.com'
logger.info(f"[扫码登录] 正在访问小红书首页...")
# 强制访问首页,不管当前在哪个页面
try:
# 使用domcontentloaded而不是networkidle避免等待所有资源加载
await self.page.goto(login_url, wait_until='domcontentloaded', timeout=10000)
current_url = self.page.url
logger.success(f"[扫码登录] 页面加载完成, 当前URL: {current_url}")
# 检查是否跳转到验证码页面
if '/website-login/captcha' in current_url or 'verifyUuid=' in current_url:
logger.warning(f"[扫码登录] 检测到风控验证页面,尝试等待或跳过...")
# 等待30秒看是否会自动跳过
await asyncio.sleep(30)
current_url = self.page.url
logger.info(f"[扫码登录] 等待30秒后当前URL: {current_url}")
# 如果还在验证码页面,返回错误
if '/website-login/captcha' in current_url or 'verifyUuid=' in current_url:
return {
"success": False,
"error": "当前IP被风控需要验证。请稍后再试或启用代理。"
}
except Exception as e:
# 即使超时也继续,因为页面可能已经跳转到explore
current_url = self.page.url
if 'xiaohongshu.com' in current_url:
logger.warning(f"[扫码登录] 页面加载超时但已到达小红书页面: {current_url}")
else:
logger.error(f"[扫码登录] 页面加载失败: {str(e)}, 当前URL: {current_url}")
raise e
# 🔥 关键修改: 在explore页面后立即注册路由监听被动等待二维码创建
qrcode_create_data = None
# 设置路由监听二维码创建 API
async def handle_qrcode_create(route):
nonlocal qrcode_create_data
try:
request = route.request
logger.info(f"[扫码登录] API请求: {request.method} {request.url}")
response = await route.fetch()
body = await response.body()
try:
data = json.loads(body.decode('utf-8'))
logger.info(f"[扫码登录] API响应: {json.dumps(data, ensure_ascii=False)}")
if data.get('code') == 0 and data.get('success') and data.get('data'):
qrcode_create_data = data.get('data')
logger.success(f"[扫码登录] 获取到二维码 qr_id={qrcode_create_data.get('qr_id')}")
except Exception as e:
logger.error(f"[扫码登录] 解析响应失败: {str(e)}")
await route.fulfill(response=response)
except Exception as e:
logger.error(f"[扫码登录] 处理API请求失败: {str(e)}")
await route.continue_()
# 注册路由 (在explore页面后立即注册)
await self.page.route('**/api/sns/web/v1/login/qrcode/create', handle_qrcode_create)
logger.info("[扫码登录] 已注册 API路由监听等待页面自动触发二维码创建...")
# 被动等待二维码创建 API请求完成(最多等待30秒)
for i in range(300): # 300 * 0.1 = 30秒
if qrcode_create_data:
break
await asyncio.sleep(0.1)
if not qrcode_create_data:
logger.warning("[扫码登录] 30秒内未捕获到二维码创建 API请求尝试从页面提取二维码")
# 提取二维码和状态(但不检测登录成功,因为这是初始化)
qrcode_result = await self.extract_qrcode_with_status(check_login_success=False)
# 如果获取到二维码创建信息,添加到结果中
if qrcode_create_data:
qrcode_result["qr_id"] = qrcode_create_data.get('qr_id')
qrcode_result["qr_code"] = qrcode_create_data.get('code')
qrcode_result["qr_url"] = qrcode_create_data.get('url')
qrcode_result["multi_flag"] = qrcode_create_data.get('multi_flag')
return qrcode_result
except Exception as e:
print(f"启动扫码登录失败: {str(e)}", file=sys.stderr)
return {
"success": False,
"error": str(e)
}
async def extract_qrcode_with_status(self, check_login_success: bool = True) -> Dict[str, Any]:
"""
提取二维码图片和状态信息,并检测是否扫码成功
Args:
check_login_success: 是否检测登录成功默认True。start_qrcode_login时传False
Returns:
Dict containing qrcode image, status text, login success and user data
"""
try:
if not self.page:
return {
"success": False,
"error": "浏览器未初始化"
}
result = {
"success": True,
"qrcode_image": "",
"status_text": "",
"status_desc": "",
"is_expired": False,
"login_success": False, # 新增:是否扫码登录成功
"user_info": None,
"cookies": None,
"cookies_full": None,
"login_state": None
}
# 只有在轮询检查时才判断登录成功
if check_login_success:
# 方法1: 监听用户信息API请求(最准确的方式)
user_me_data = None
try:
# 直接请求用户信息API
response = await self.page.evaluate('''
async () => {
try {
const response = await fetch('https://edith.xiaohongshu.com/api/sns/web/v2/user/me', {
method: 'GET',
credentials: 'include'
});
const data = await response.json();
return data;
} catch (error) {
return { error: error.message };
}
}
''')
if response and not response.get('error'):
# 关键修复: 检查是否是游客状态
if response.get('code') == 0 and response.get('success') and response.get('data'):
data = response.get('data')
is_guest = data.get('guest', False)
# 只有非游客状态才算登录成功
if not is_guest and data.get('user_id') and data.get('nickname'):
user_me_data = data
logger.success(f"[扫码登录] 登录成功! user_id={user_me_data.get('user_id')}, nickname={user_me_data.get('nickname')}")
except Exception as e:
logger.error(f"[扫码登录] 请求用户信息 API异常: {str(e)}")
# 如果获取到用户信息,说明登录成功
if user_me_data:
result["login_success"] = True
# 等待页面稳定
await asyncio.sleep(1)
# 获取Cookies
try:
cookies = await self.context.cookies()
cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}
result["cookies"] = cookies_dict
result["cookies_full"] = cookies
except Exception as e:
logger.error(f"[扫码登录] 获取Cookie失败: {str(e)}")
# 构建用户信息(使用API返回的数据 + localStorage)
try:
# 先从 localStorage 获取基础信息
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
storage_dict = json.loads(storage)
user_info = {
# 从 API 响应中提取的信息(最准确)
'user_id': user_me_data.get('user_id'),
'red_id': user_me_data.get('red_id'),
'nickname': user_me_data.get('nickname'),
'desc': user_me_data.get('desc'),
'gender': user_me_data.get('gender'),
'avatar_small': user_me_data.get('images'), # 小头像
'avatar_large': user_me_data.get('imageb'), # 大头像
'is_guest': user_me_data.get('guest', False)
}
# 补充 localStorage 中的其他信息
useful_keys = ['b1', 'b1b1', 'p1']
for key in useful_keys:
if key in storage_dict:
try:
value = storage_dict[key]
if value and value.strip():
user_info[key] = json.loads(value) if value.startswith('{') or value.startswith('[') else value
except:
user_info[key] = storage_dict[key]
result["user_info"] = user_info
except Exception as e:
logger.error(f"[扫码登录] 构建用户信息失败: {str(e)}")
# 即\u4f7f失\u8d25,\u4e5f\u4f7f\u7528API\u8fd4\u56de\u7684\u6570\u636e
result["user_info"] = {
'user_id': user_me_data.get('user_id'),
'red_id': user_me_data.get('red_id'),
'nickname': user_me_data.get('nickname'),
'desc': user_me_data.get('desc'),
'gender': user_me_data.get('gender'),
'avatar_small': user_me_data.get('images'),
'avatar_large': user_me_data.get('imageb'),
'is_guest': user_me_data.get('guest', False)
}
# 获取完整的登录状态
try:
current_url = self.page.url
localStorage_data = {}
sessionStorage_data = {}
try:
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
localStorage_data = json.loads(storage)
except Exception as e:
print(f"\u26a0\ufe0f \u83b7\u53d6localStorage\u5931\u8d25: {str(e)}", file=sys.stderr)
try:
session_storage = await self.page.evaluate('() => JSON.stringify(sessionStorage)')
sessionStorage_data = json.loads(session_storage)
except Exception as e:
print(f"\u26a0\ufe0f \u83b7\u53d6sessionStorage\u5931\u8d25: {str(e)}", file=sys.stderr)
result["login_state"] = {
"cookies": result["cookies_full"],
"localStorage": localStorage_data,
"sessionStorage": sessionStorage_data,
"url": current_url,
"timestamp": time.time()
}
print("\u2705 \u5df2\u6784\u5efa\u5b8c\u6574\u767b\u5f55\u72b6\u6001", file=sys.stderr)
except Exception as e:
print(f"\u26a0\ufe0f \u6784\u5efa\u767b\u5f55\u72b6\u6001\u5931\u8d25: {str(e)}", file=sys.stderr)
return result
# 如果API请求失败,退而求其次使用页面元素检测
print("\u26a0\ufe0f API\u68c0\u6d4b\u5931\u8d25,\u4f7f\u7528\u9875\u9762\u5143\u7d20\u68c0\u6d4b", file=sys.stderr)
current_url = self.page.url
print(f"\u5f53\u524dURL: {current_url}", file=sys.stderr)
# 方法2: 检查\u4e8c\u7ef4\u7801\u662f\u5426\u8fd8\u5728(如\u679c\u4e8c\u7ef4\u7801\u6d88\u5931\u4e86,\u8bf4\u660e\u53ef\u80fd\u767b\u5f55\u4e86)
qrcode_exists = False
try:
qrcode_img = await self.page.query_selector('.qrcode-img')
if qrcode_img:
qrcode_exists = await qrcode_img.is_visible()
except Exception:
pass
# 方法3: 检查\u767b\u5f55\u5f39\u7a97\u662f\u5426\u5173\u95ed
login_modal_closed = True
try:
modal_selectors = [
'.login-container',
'.reds-modal',
'[class*="login-modal"]',
'[class*="LoginModal"]',
]
for selector in modal_selectors:
modal = await self.page.query_selector(selector)
if modal and await modal.is_visible():
login_modal_closed = False
break
except Exception:
pass
# 方法4: 检查\u662f\u5426\u6709\u767b\u5f55\u540e\u7684\u7528\u6237\u4fe1\u606f\u5143\u7d20
has_user_info = False
try:
user_selectors = [
'.user-info',
'.avatar',
'[class*="user"]',
]
for selector in user_selectors:
user_el = await self.page.query_selector(selector)
if user_el and await user_el.is_visible():
has_user_info = True
break
except Exception:
pass
print(f"\u767b\u5f55\u72b6\u6001\u68c0\u6d4b: \u4e8c\u7ef4\u7801\u5b58\u5728={qrcode_exists}, \u767b\u5f55\u6846\u5173\u95ed={login_modal_closed}, \u6709\u7528\u6237\u4fe1\u606f={has_user_info}", file=sys.stderr)
# 综合\u5224\u65ad: \u4e8c\u7ef4\u7801\u6d88\u5931 \u4e14 (\u767b\u5f55\u6846\u5173\u95ed \u6216 \u6709\u7528\u6237\u4fe1\u606f)
if not qrcode_exists and (login_modal_closed or has_user_info):
print("\u2705 \u68c0\u6d4b\u5230\u626b\u7801\u767b\u5f55\u6210\u529f!(\u4e8c\u7ef4\u7801\u5df2\u6d88\u5931)", file=sys.stderr)
result["login_success"] = True
# 等\u5f85\u9875\u9762\u7a33\u5b9a
await asyncio.sleep(1)
# 获\u53d6Cookies
try:
cookies = await self.context.cookies()
cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}
result["cookies"] = cookies_dict
result["cookies_full"] = cookies
print(f"\u2705 \u5df2\u83b7\u53d6 {len(cookies)} \u4e2aCookie", file=sys.stderr)
except Exception as e:
print(f"\u26a0\ufe0f \u83b7\u53d6Cookie\u5931\u8d25: {str(e)}", file=sys.stderr)
# 获\u53d6\u7528\u6237\u4fe1\u606f
try:
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
storage_dict = json.loads(storage)
user_info = {}
useful_keys = ['b1', 'b1b1', 'p1']
for key in useful_keys:
if key in storage_dict:
try:
value = storage_dict[key]
if value and value.strip():
user_info[key] = json.loads(value) if value.startswith('{') or value.startswith('[') else value
except:
user_info[key] = storage_dict[key]
result["user_info"] = user_info
print(f"\u2705 \u5df2\u83b7\u53d6\u7528\u6237\u4fe1\u606f: {list(user_info.keys())}", file=sys.stderr)
except Exception as e:
print(f"\u26a0\ufe0f \u83b7\u53d6\u7528\u6237\u4fe1\u606f\u5931\u8d25: {str(e)}", file=sys.stderr)
# 获\u53d6\u5b8c\u6574\u7684\u767b\u5f55\u72b6\u6001
try:
localStorage_data = {}
sessionStorage_data = {}
try:
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
localStorage_data = json.loads(storage)
except Exception as e:
print(f"\u26a0\ufe0f \u83b7\u53d6localStorage\u5931\u8d25: {str(e)}", file=sys.stderr)
try:
session_storage = await self.page.evaluate('() => JSON.stringify(sessionStorage)')
sessionStorage_data = json.loads(session_storage)
except Exception as e:
print(f"\u26a0\ufe0f \u83b7\u53d6sessionStorage\u5931\u8d25: {str(e)}", file=sys.stderr)
result["login_state"] = {
"cookies": result["cookies_full"],
"localStorage": localStorage_data,
"sessionStorage": sessionStorage_data,
"url": current_url,
"timestamp": time.time()
}
print("\u2705 \u5df2\u6784\u5efa\u5b8c\u6574\u767b\u5f55\u72b6\u6001", file=sys.stderr)
except Exception as e:
print(f"\u26a0\ufe0f \u6784\u5efa\u767b\u5f55\u72b6\u6001\u5931\u8d25: {str(e)}", file=sys.stderr)
return result
# 还在登录页或不检查登录状态,继续提取二维码和状态
# 提取二维码图片
qrcode_selectors = [
'.qrcode-img',
'img.qrcode-img',
'.qrcode img',
'img[src*="data:image"]',
'img[alt*="二维码"]',
]
for selector in qrcode_selectors:
try:
qrcode_img = await self.page.wait_for_selector(selector, timeout=3000)
if qrcode_img:
# 获取src属性
src = await qrcode_img.get_attribute('src')
if src:
if src.startswith('data:image'):
result["qrcode_image"] = src
else:
# 如果是URL,尝试下载转换
try:
async with aiohttp.ClientSession() as session:
async with session.get(src, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
img_data = await response.read()
import base64
img_base64 = base64.b64encode(img_data).decode('utf-8')
content_type = response.headers.get('Content-Type', 'image/png')
result["qrcode_image"] = f"data:{content_type};base64,{img_base64}"
print("✅ 成功下载并转换二维码", file=sys.stderr)
except Exception as e:
print(f"⚠️ 下载二维码失败: {str(e)}", file=sys.stderr)
# 如果还是没有图片,尝试截图
if not result["qrcode_image"]:
try:
screenshot_bytes = await qrcode_img.screenshot()
if screenshot_bytes:
import base64
img_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
result["qrcode_image"] = f"data:image/png;base64,{img_base64}"
print("✅ 成功截取二维码", file=sys.stderr)
except Exception as e:
print(f"⚠️ 截取二维码失败: {str(e)}", file=sys.stderr)
break
except Exception as e:
continue
if not result["qrcode_image"]:
return {
"success": False,
"error": "未找到二维码图片"
}
# 提取状态信息
print("正在提取二维码状态...", file=sys.stderr)
status_selectors = [
'.status',
'.qrcode-status',
'[class*="status"]',
]
for selector in status_selectors:
try:
status_el = await self.page.query_selector(selector)
if status_el:
# 检查状态是否可见
is_visible = await status_el.is_visible()
if not is_visible:
print("二维码状态元素不可见,说明二维码有效", file=sys.stderr)
result["status_text"] = "" # 空字符串表示正常状态
result["is_expired"] = False
break
print(f"✅ 找到状态元素: {selector}", file=sys.stderr)
# 提取状态文本
status_text_el = await status_el.query_selector('.status-text')
if status_text_el:
status_text = await status_text_el.inner_text()
result["status_text"] = status_text.strip()
print(f"状态文本: {result['status_text']}", file=sys.stderr)
# 提取状态描述
status_desc_el = await status_el.query_selector('.status-desc')
if status_desc_el:
status_desc = await status_desc_el.inner_text()
result["status_desc"] = status_desc.strip()
print(f"状态描述: {result['status_desc']}", file=sys.stderr)
# 判断是否过期
if "过期" in result["status_text"] or "过期" in result["status_desc"]:
result["is_expired"] = True
print("⚠️ 二维码已过期", file=sys.stderr)
break
except Exception as e:
continue
# 如果没有找到状态元素,说明二维码正常(不设置status_text小程序端自己显示)
if not result["status_text"]:
result["status_text"] = "" # 空字符串表示正常状态,小程序端不显示覆盖层
result["is_expired"] = False
print(f"✅ 二维码提取完成: 状态={result['status_text']}, 过期={result['is_expired']}, 登录成功={result['login_success']}", file=sys.stderr)
return result
except Exception as e:
print(f"提取二维码状态失败: {str(e)}", file=sys.stderr)
return {
"success": False,
"error": str(e)
}
async def refresh_qrcode(self) -> Dict[str, Any]:
"""
刷新二维码(当二维码过期时点击刷新)
Returns:
Dict containing new qrcode and status
"""
try:
if not self.page:
return {
"success": False,
"error": "浏览器未初始化"
}
# 检查page状态如果是空白页需要重新导航到登录页
try:
current_url = self.page.url
logger.info(f"[刷新二维码] 当前URL: {current_url}")
if current_url == 'about:blank' or current_url == '':
logger.warning("[刷新二维码] 检测到空白页重新导航到explore页面")
await self.page.goto('https://www.xiaohongshu.com/explore', wait_until='networkidle')
await asyncio.sleep(1)
except Exception as e:
logger.error(f"[刷新二维码] 检查page状态异常: {str(e)}")
logger.info("[刷新二维码] 正在刷新...")
# 🔥 关键修改: 先注册路由监听,然后再打开登录弹窗
qrcode_create_data = None
# 设置路由监听二维码创建 API
async def handle_qrcode_create(route):
nonlocal qrcode_create_data
try:
# 记录请求
request = route.request
logger.info(f"[刷新二维码] API请求: {request.method} {request.url}")
response = await route.fetch()
body = await response.body()
try:
data = json.loads(body.decode('utf-8'))
logger.info(f"[刷新二维码] API响应: {json.dumps(data, ensure_ascii=False)}")
if data.get('code') == 0 and data.get('success') and data.get('data'):
qrcode_create_data = data.get('data')
logger.success(f"[刷新二维码] 获取到新二维码 qr_id={qrcode_create_data.get('qr_id')}")
except Exception as e:
logger.error(f"[刷新二维码] 解析响应失败: {str(e)}")
await route.fulfill(response=response)
except Exception as e:
logger.error(f"[刷新二维码] 处理API请求失败: {str(e)}")
await route.continue_()
# 注册路由 (在打开登录页之前)
await self.page.route('**/api/sns/web/v1/login/qrcode/create', handle_qrcode_create)
logger.info("[刷新二维码] 已注册 API路由监听")
# 确保在登录页面或扫码页面
current_url = self.page.url
if 'login' not in current_url.lower():
# 如果不在登录页,先打开登录页
logger.info("[刷新二维码] 不在登录页,先打开登录页")
try:
login_btn = await self.page.wait_for_selector('text="登录"', timeout=3000)
if login_btn:
await login_btn.click()
await asyncio.sleep(1)
except Exception as e:
logger.warning(f"[刷新二维码] 打开登录页失败: {str(e)}")
# 确保切换到扫码登录选项卡
qrcode_tab_selectors = [
'text="扫码登录"',
'div:has-text("扫码登录")',
'text="二维码登录"',
'div:has-text("二维码登录")',
'.qrcode-tab',
'[data-type="qrcode"]',
]
for selector in qrcode_tab_selectors:
try:
qrcode_tab = await self.page.query_selector(selector)
if qrcode_tab:
logger.info("[刷新二维码] 切换到扫码登录模式")
await qrcode_tab.click()
await asyncio.sleep(0.5)
break
except Exception:
continue
# 查找刷新按钮或刷新文本
refresh_selectors = [
'.status-desc.refresh',
'text="点击刷新"',
'.refresh',
'[class*="refresh"]',
]
refresh_clicked = False
for selector in refresh_selectors:
try:
refresh_el = await self.page.query_selector(selector)
if refresh_el:
logger.info(f"[刷新二维码] 找到刷新按钮: {selector}")
await refresh_el.click()
logger.success("[刷新二维码] 已点击刷新")
await asyncio.sleep(1)
refresh_clicked = True
break
except Exception:
continue
if not refresh_clicked:
return {
"success": False,
"error": "未找到刷新按钮"
}
# 等待二维码创建 API请求完成(最多等待 3 秒)
for i in range(30): # 30 * 0.1 = 3秒
if qrcode_create_data:
break
await asyncio.sleep(0.1)
if not qrcode_create_data:
logger.warning("[刷新二维码] 未捕获到二维码创建 API请求")
# 重新提取二维码
qrcode_result = await self.extract_qrcode_with_status(check_login_success=False)
# 如果获取到二维码创建信息,添加到结果中
if qrcode_create_data:
qrcode_result["qr_id"] = qrcode_create_data.get('qr_id')
qrcode_result["qr_code"] = qrcode_create_data.get('code')
qrcode_result["qr_url"] = qrcode_create_data.get('url')
qrcode_result["multi_flag"] = qrcode_create_data.get('multi_flag')
logger.success("[刷新二维码] 已将二维码创建信息添加到返回结果")
return qrcode_result
except Exception as e:
logger.error(f"[刷新二维码] 失败: {str(e)}")
return {
"success": False,
"error": str(e)
}