"""
小红书登录服务
使用 Playwright 模拟浏览器登录小红书
"""
from playwright.async_api import async_playwright, Browser, Page, BrowserContext
from typing import Dict, Any, Optional
import asyncio
import json
import random
import unicodedata
import sys
import os
import tempfile
import aiohttp
import time
from datetime import datetime
from pathlib import Path
from browser_pool import get_browser_pool
from error_screenshot import save_error_screenshot, save_screenshot_with_html
from loguru import logger
from damai_proxy_config import get_random_proxy, format_proxy_for_playwright
# 配置loguru日志格式
logger.remove() # 移除默认handler
logger.add(
sys.stderr,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}",
level="INFO"
)
async def download_image(url: str) -> str:
"""
下载网络图片到临时文件
Args:
url: 图片URL
Returns:
本地文件路径
"""
try:
logger.info(f"下载网络图片: {url}")
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status == 200:
# 获取文件扩展名
ext = '.jpg' # 默认jpg
content_type = response.headers.get('Content-Type', '')
if 'png' in content_type:
ext = '.png'
elif 'jpeg' in content_type or 'jpg' in content_type:
ext = '.jpg'
elif 'webp' in content_type:
ext = '.webp'
# 创建临时文件
temp_dir = Path(tempfile.gettempdir()) / 'xhs_images'
temp_dir.mkdir(exist_ok=True)
temp_file = temp_dir / f"img_{random.randint(10000, 99999)}{ext}"
# 保存图片
with open(temp_file, 'wb') as f:
f.write(await response.read())
logger.success(f"✅ 图片下载成功: {temp_file}")
return str(temp_file)
else:
raise Exception(f"HTTP {response.status}")
except Exception as e:
logger.error(f"⚠️ 下载图片失败: {str(e)}")
raise
class XHSLoginService:
"""小红书登录服务"""
def __init__(self, use_pool: bool = True, headless: bool = True, session_id: Optional[str] = None, use_page_isolation: bool = False):
"""
初始化登录服务
Args:
use_pool: 是否使用浏览器池(默认True,提升性能)
headless: 是否使用无头模式,False为有头模式(方便调试)
session_id: 会话 ID,用于并发隔离(不同的session_id会创建独立的浏览器实例)
use_page_isolation: 是否使用页面隔离模式(扫码登录专用,减少浏览器实例数)
"""
self.use_pool = use_pool
self.headless = headless
self.session_id = session_id # 保存session_id用于并发隔离
self.use_page_isolation = use_page_isolation # 页面隔离模式
self.browser_pool = get_browser_pool(headless=headless) if use_pool else None
self.playwright = None
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self.current_phone = None
async def init_browser(self, cookies: Optional[list] = None, proxy: Optional[dict] = None, user_agent: Optional[str] = None, restore_state: bool = False, use_random_proxy: bool = True):
"""
初始化浏览器
Args:
cookies: 可选的Cookie列表,用于恢复登录状态
proxy: 可选的代理配置,例如 {"server": "http://ip:port", "username": "...", "password": "..."}
user_agent: 可选的自定义User-Agent
restore_state: 是否从log_state.json文件恢复完整登录状态
use_random_proxy: 是否自动使用随机代理(默认True)
"""
try:
# 如果没有指定代理且启用自动代理,则使用随机代理
if not proxy and use_random_proxy:
try:
proxy_config = get_random_proxy()
proxy = format_proxy_for_playwright(proxy_config)
logger.info(f"[代理] 自动选择代理: {proxy_config['name']} ({proxy_config['server']})")
except Exception as e:
logger.info(f"[代理] 无可用代理,使用直连访问")
# 如果要求恢复状态,先加载 login_state.json
login_state = None
if restore_state and os.path.exists('login_state.json'):
try:
with open('login_state.json', 'r', encoding='utf-8') as f:
login_state = json.load(f)
logger.success("✅ 加载到保存的登录状态")
# 使用保存的配置
cookies = login_state.get('cookies', cookies)
if not user_agent and login_state.get('user_agent'):
user_agent = login_state['user_agent']
except Exception as e:
logger.error(f"⚠️ 加载登录状态失败: {str(e)}")
# 使用浏览器池
if self.use_pool and self.browser_pool:
# 扫码登录使用页面隔离模式
if self.use_page_isolation and self.session_id:
logger.info(f"[页面隔离模式] 获取扫码登录页面 (session_id={self.session_id})")
# 获取或创建页面
self.page = await self.browser_pool.get_qrcode_page(self.session_id)
# 使用浏览器池的主浏览器和context
self.browser = self.browser_pool.browser
self.context = self.browser_pool.context
logger.success("浏览器初始化成功(页面隔离模式)")
return
# 普通浏览器池模式
logger.info(f"[浏览器池模式] 从浏览器池获取实例 (session_id={self.session_id}, headless={self.headless})")
self.browser, self.context, self.page = await self.browser_pool.get_browser(
cookies=cookies, proxy=proxy, user_agent=user_agent, session_id=self.session_id,
headless=self.headless # 传递headless参数
)
# 保存proxy配置
if proxy:
self.proxy = proxy
# 检查page状态,如果是空白页或已关闭,重新创建page
try:
current_url = self.page.url
logger.info(f"当前URL: {current_url}")
if current_url == 'about:blank' or current_url == '':
logger.warning("[浏览器池] 检测到空白页面,重新创建page")
try:
# 关闭旧page
await self.page.close()
except Exception as e:
logger.error(f"[浏览器池] 关闭旧page失败: {str(e)}")
# 创建新page
self.page = await self.context.new_page()
logger.success(f"[浏览器池] 已创建新page, 新URL: {self.page.url}")
# 更新浏览器池中保存的page引用
if self.session_id and self.session_id in self.browser_pool.temp_browsers:
self.browser_pool.temp_browsers[self.session_id]["page"] = self.page
logger.success("[浏览器池] 已更新浏览器池中的page引用")
except Exception as e:
logger.error(f"[浏览器池] 检查page状态异常: {str(e)}")
# 如果有localStorage/sessionStorage,恢复它们
if login_state:
await self._restore_storage(login_state)
logger.success("浏览器初始化成功(池模式)")
return
# 传统模式(每次新建)
logger.info("[传统模式] 创建新浏览器实例")
# Windows环境下,需要设置事件循环策略
if sys.platform == 'win32':
try:
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
except Exception as e:
logger.error(f"警告: 设置事件循环策略失败: {str(e)}")
self.playwright = await async_playwright().start()
# 启动浏览器(使用chromium)
# headless=True 在服务器环境下运行,不显示浏览器界面
launch_kwargs = {
"headless": self.headless, # 使用配置的headless参数
"args": [
'--disable-blink-features=AutomationControlled',
'--disable-infobars',
'--no-first-run',
'--no-default-browser-check',
],
}
if proxy:
launch_kwargs["proxy"] = proxy # 直接使用proxy字典
self.proxy = proxy # 保存proxy配置供后续使用
self.browser = await self.playwright.chromium.launch(**launch_kwargs)
# 创建浏览器上下文,模拟真实用户
context_kwargs = {
"viewport": login_state.get('viewport') if login_state else {'width': 1280, 'height': 720},
"user_agent": user_agent or 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
}
self.context = await self.browser.new_context(**context_kwargs)
# 添加初始化脚本,隐藏自动化特征
await self.context.add_init_script("""
// 移除webdriver标记
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// 阻止检测自动化调试端口
window.chrome = {
runtime: {}
};
// 阻止检测Chrome DevTools Protocol
const originalFetch = window.fetch;
window.fetch = function(...args) {
const url = args[0];
// 阻止小红书检测本地调试端口
if (typeof url === 'string' && (
url.includes('127.0.0.1:9222') ||
url.includes('127.0.0.1:54345') ||
url.includes('localhost:9222') ||
url.includes('chrome-extension://invalid')
)) {
return Promise.reject(new Error('blocked'));
}
return originalFetch.apply(this, args);
};
// 阻止XMLHttpRequest检测
const originalXHROpen = XMLHttpRequest.prototype.open;
XMLHttpRequest.prototype.open = function(...args) {
const url = args[1];
if (typeof url === 'string' && (
url.includes('127.0.0.1:9222') ||
url.includes('127.0.0.1:54345') ||
url.includes('localhost:9222') ||
url.includes('chrome-extension://invalid')
)) {
throw new Error('blocked');
}
return originalXHROpen.apply(this, args);
};
// 添加chrome.app
Object.defineProperty(window, 'chrome', {
get: () => ({
app: {
isInstalled: false,
},
webstore: {
onInstallStageChanged: {},
onDownloadProgress: {},
},
runtime: {
PlatformOs: {
MAC: 'mac',
WIN: 'win',
ANDROID: 'android',
CROS: 'cros',
LINUX: 'linux',
OPENBSD: 'openbsd',
},
PlatformArch: {
ARM: 'arm',
X86_32: 'x86-32',
X86_64: 'x86-64',
},
PlatformNaclArch: {
ARM: 'arm',
X86_32: 'x86-32',
X86_64: 'x86-64',
},
RequestUpdateCheckStatus: {
THROTTLED: 'throttled',
NO_UPDATE: 'no_update',
UPDATE_AVAILABLE: 'update_available',
},
OnInstalledReason: {
INSTALL: 'install',
UPDATE: 'update',
CHROME_UPDATE: 'chrome_update',
SHARED_MODULE_UPDATE: 'shared_module_update',
},
OnRestartRequiredReason: {
APP_UPDATE: 'app_update',
OS_UPDATE: 'os_update',
PERIODIC: 'periodic',
},
},
}),
configurable: true,
});
// 模拟permissions
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
// 添加plugins
Object.defineProperty(navigator, 'plugins', {
get: () => [
{
0: {type: "application/x-google-chrome-pdf", suffixes: "pdf", description: "Portable Document Format"},
description: "Portable Document Format",
filename: "internal-pdf-viewer",
length: 1,
name: "Chrome PDF Plugin"
},
{
0: {type: "application/pdf", suffixes: "pdf", description: ""},
description: "",
filename: "mhjfbmdgcfjbbpaeojofohoefgiehjai",
length: 1,
name: "Chrome PDF Viewer"
},
{
0: {type: "application/x-nacl", suffixes: "", description: "Native Client Executable"},
1: {type: "application/x-pnacl", suffixes: "", description: "Portable Native Client Executable"},
description: "",
filename: "internal-nacl-plugin",
length: 2,
name: "Native Client"
}
],
});
""")
logger.success("✅ 已注入反检测脚本")
# 如果提供了Cookies,注入到浏览器上下文
if cookies:
await self.context.add_cookies(cookies)
logger.success(f"已注入 {len(cookies)} 个Cookie")
# 创建新页面
self.page = await self.context.new_page()
# 使用Playwright路由拦截,直接阻止小红书的检测请求
async def block_detection_requests(route, request):
url = request.url
# 阻止所有检测自动化的请求
if any([
'127.0.0.1:9222' in url,
'127.0.0.1:54345' in url,
'localhost:9222' in url,
'chrome-extension://invalid' in url,
'chrome-extension://bla' in url,
]):
await route.abort()
else:
await route.continue_()
# 注册路由拦截,匹配所有请求
await self.page.route('**/*', block_detection_requests)
logger.success("✅ 已启用请求拦截,阻止检测自动化")
# 添加页面跳转监控,检测无限跳转
self.redirect_count = 0
self.last_redirect_time = 0
async def on_response(response):
"""监控页面响应,检测重定向循环"""
if response.status in [301, 302, 303, 307, 308]:
import time
current_time = time.time()
if current_time - self.last_redirect_time < 1: # 1秒内连续重定向
self.redirect_count += 1
if self.redirect_count > 5:
logger.warning(f"⚠️ 检测到频繁重定向 ({self.redirect_count}次),可能是无限跳转")
else:
self.redirect_count = 0
self.last_redirect_time = current_time
self.page.on('response', on_response)
# 如果有localStorage/sessionStorage,恢复它们
if login_state:
await self._restore_storage(login_state)
logger.success("浏览器初始化成功(传统模式)")
except Exception as e:
logger.error(f"浏览器初始化失败: {str(e)}")
raise
async def _restore_storage(self, login_state: dict):
"""恢夏localStorage和sessionStorage"""
try:
# 首先访问小红书的任意页面,以便注入storage
target_url = login_state.get('url', 'https://www.xiaohongshu.com')
logger.debug(f"正在访问 {target_url} 以注入storage...")
# 设置更短的超时时间,避免长时间等待
try:
await self.page.goto(target_url, wait_until='domcontentloaded', timeout=15000)
await asyncio.sleep(1)
# 检查是否被重定向到登录页
current_url = self.page.url
if 'login' in current_url.lower():
logger.warning("⚠️ 检测到被重定向到登录页,跳过storage恢复")
return
except Exception as e:
logger.error(f"⚠️ 访问页面失败: {str(e)},跳过storage恢复")
return
# 恢夏localStorage
if login_state.get('localStorage'):
for key, value in login_state['localStorage'].items():
try:
await self.page.evaluate(f'localStorage.setItem("{key}", {json.dumps(value)})')
except Exception as e:
logger.error(f"⚠️ 设置localStorage {key} 失败: {str(e)}")
logger.success(f"✅ 已恢复 {len(login_state['localStorage'])} 个localStorage项")
# 恢夏sessionStorage
if login_state.get('sessionStorage'):
for key, value in login_state['sessionStorage'].items():
try:
await self.page.evaluate(f'sessionStorage.setItem("{key}", {json.dumps(value)})')
except Exception as e:
logger.error(f"⚠️ 设置sessionStorage {key} 失败: {str(e)}")
logger.success(f"✅ 已恢复 {len(login_state['sessionStorage'])} 个sessionStorage项")
except Exception as e:
logger.error(f"⚠️ 恢夏storage失败: {str(e)}")
async def init_browser_with_storage_state(self, storage_state_path: str, proxy: Optional[dict] = None):
"""
使用Playwright原生storage_state初始化浏览器(最优方案)
Args:
storage_state_path: storage_state文件路径
proxy: 可选的代理配置
"""
try:
if not os.path.exists(storage_state_path):
raise Exception(f"storage_state文件不存在: {storage_state_path}")
logger.success(f"✅ 使用 storage_state 初始化浏览器: {storage_state_path}")
# Windows环境下,需要设置事件循环策略
if sys.platform == 'win32':
try:
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
except Exception as e:
logger.error(f"警告: 设置事件循环策略失败: {str(e)}")
self.playwright = await async_playwright().start()
# 启动浏览器
launch_kwargs = {
"headless": self.headless,
"args": [
'--disable-blink-features=AutomationControlled',
'--disable-infobars',
'--no-first-run',
'--no-default-browser-check',
],
}
if proxy:
launch_kwargs["proxy"] = proxy # 直接使用proxy字典
self.browser = await self.playwright.chromium.launch(**launch_kwargs)
# 使用storage_state创建上下文(Playwright原生API)
self.context = await self.browser.new_context(storage_state=storage_state_path)
logger.success(f"✅ 已使用 storage_state 创建浏览器上下文")
# 添加反检测脚本
await self.context.add_init_script("""
// 移除webdriver标记
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// 阻止检测自动化调试端口
window.chrome = {
runtime: {}
};
""")
logger.success("✅ 已注入反检测脚本")
# 创建页面
self.page = await self.context.new_page()
# 添加请求拦截
async def block_detection_requests(route, request):
url = request.url
if any([
'127.0.0.1:9222' in url,
'127.0.0.1:54345' in url,
'localhost:9222' in url,
'chrome-extension://invalid' in url,
]):
await route.abort()
else:
await route.continue_()
await self.page.route('**/*', block_detection_requests)
logger.success("✅ 已启用请求拦截,阻止检测自动化")
logger.success("✅ 浏览器初始化成功(storage_state模式)")
except Exception as e:
logger.error(f"浏览器初始化失败: {str(e)}")
raise
async def close_browser(self):
"""关闭浏览器(池模式下不关闭,仅清理引用)"""
try:
# 浏览器池模式:不关闭浏览器,保持复用
if self.use_pool and self.browser_pool:
logger.info("[浏览器池模式] 保留浏览器实例供下次复用")
# 仅清理当前服务的引用,浏览器池保持运行
self.browser = None
self.context = None
self.page = None
return
# 传统模式:完全关闭
logger.info("[传统模式] 完全关闭浏览器")
if self.page:
await self.page.close()
if self.context:
await self.context.close()
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
logger.success("浏览器已关闭")
except Exception as e:
logger.error(f"关闭浏览器异常: {str(e)}")
async def extract_verification_qrcode(self) -> Optional[str]:
"""
提取验证页面的二维码图片
Returns:
二维码图片的base64数据,如果提取失败则返回none
"""
try:
if not self.page:
return None
logger.debug("正在提取验证二维码...")
# 尝试查找二维码图片元素
qrcode_selectors = [
'.qrcode-img', # 根据您提供的HTML
'img.qrcode-img',
'.qrcode-container img',
'img[src*="data:image"]', # base64图片
'img[src*="qrcode"]',
'img[alt*="二维码"]',
'img[alt*="qrcode"]',
]
for selector in qrcode_selectors:
try:
qrcode_img = await self.page.wait_for_selector(selector, timeout=3000)
if qrcode_img:
logger.success(f"✅ 找到二维码图片: {selector}")
# 获取图片src属性
src = await qrcode_img.get_attribute('src')
if src:
# 如果是base64格式,直接返回
if src.startswith('data:image'):
logger.success("✅ 二维码已是base64格式,直接返回")
return src
# 如果是URL,尝试下载并转换为base64
logger.info(f"二维码是URL格式: {src[:100]}...")
try:
async with aiohttp.ClientSession() as session:
async with session.get(src, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
img_data = await response.read()
import base64
img_base64 = base64.b64encode(img_data).decode('utf-8')
# 根据内容类型确定格式
content_type = response.headers.get('Content-Type', 'image/png')
base64_str = f"data:{content_type};base64,{img_base64}"
logger.success("✅ 成功下载并转换为base64")
return base64_str
except Exception as e:
logger.error(f"⚠️ 下载二维码图片失败: {str(e)}")
# 如果src方法失败,尝试截图
logger.info("尝试截取二维码区域...")
screenshot_bytes = await qrcode_img.screenshot()
if screenshot_bytes:
import base64
img_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
base64_str = f"data:image/png;base64,{img_base64}"
logger.success("✅ 成功截取二维码并转换为base64")
return base64_str
break
except Exception as e:
logger.error(f"尝试选择器 {selector} 失败: {str(e)}")
continue
logger.warning("⚠️ 未找到二维码图片")
return None
except Exception as e:
logger.error(f"⚠️ 提取二维码失败: {str(e)}")
return None
async def _monitor_qrcode_scan(self, session_id: str):
"""
后台监听扫码后的页面跳转和二维码失效
通过监听小红书API https://edith.xiaohongshu.com/api/redcaptcha/v2/qr/status/query
来精准判断二维码状态:
- status=1: 未过期,等待扫码
- status=5: 已扫码,等待确认
- 其他: 失效或已完成
Args:
session_id: 会话 ID
"""
try:
logger.info(f"[WebSocket] 开始监听扫码状态: {session_id}")
if not self.page:
logger.error(f"[WebSocket] 页面对象不存在: {session_id}")
return
# 用于存储最新的二维码状态
latest_qr_status = {"status": 1, "scanned": False}
# 标记是否已推送失效消息
expired_notified = False
# 设置响应监听,拦截二维码状态查询API
async def handle_qr_status_response(response):
try:
if '/api/redcaptcha/v2/qr/status/query' in response.url:
json_data = await response.json()
if json_data.get('success') and 'data' in json_data:
status = json_data['data'].get('status')
latest_qr_status['status'] = status
if status == 5:
latest_qr_status['scanned'] = True
logger.info(f"[WebSocket] 检测到二维码已扫描,等待确认: status={status}")
elif status == 1:
logger.debug(f"[WebSocket] 二维码未过期,等待扫码: status={status}")
else:
logger.info(f"[WebSocket] 二维码状态: status={status}")
except Exception as e:
logger.error(f"[WebSocket] 解析二维码状态响应失败: {str(e)}")
# 注册API响应监听
self.page.on('response', handle_qr_status_response)
logger.info(f"[WebSocket] 已注册二维码状态API监听")
# 最多监吡5分钟
for i in range(600): # 600 * 0.5 = 300秒 = 5分钟
await asyncio.sleep(0.5)
try:
current_url = self.page.url
# 1. 检测是否跳转回首页(不再是captcha/verify页)
if 'captcha' not in current_url.lower() and 'verify' not in current_url.lower():
# 检查是否跳转到小红书首页
if 'xiaohongshu.com' in current_url:
logger.success(f"[WebSocket] 检测到扫码完成,页面跳转回: {current_url}")
# 通过WebSocket推送扫码成功消息
try:
from main import ws_manager
await ws_manager.send_message(session_id, {
"type": "qrcode_scan_success",
"message": "扫码验证完成,请重新发送验证码"
})
logger.success(f"[WebSocket] 已推送扫码成功消息: {session_id}")
except Exception as ws_error:
logger.error(f"[WebSocket] 推送消息失败: {str(ws_error)}")
break
# 2. 检测二维码是否失效(通过API状态判断)
if 'captcha' in current_url.lower() or 'verify' in current_url.lower():
# 如果已经推送过失效消息,跳过后续检测
if expired_notified:
continue
# 如果状态不是1和5,说明二维码可能已失效
if latest_qr_status['status'] not in [1, 5]:
logger.warning(f"[WebSocket] 检测到二维码失效: status={latest_qr_status['status']}")
# 通过WebSocket推送失效消息
try:
from main import ws_manager
await ws_manager.send_message(session_id, {
"type": "qrcode_expired",
"message": "二维码已失效,请重新发送验证码"
})
logger.success(f"[WebSocket] 已推送二维码失效消息: {session_id}")
expired_notified = True # 标记已推送
except Exception as ws_error:
logger.error(f"[WebSocket] 推送消息失败: {str(ws_error)}")
break # 退出监听循环
# 备用方案:检查页面文本(以防API未返回)
try:
expired_selectors = [
'text="已过期"',
'text="二维码已失效"',
'text="二维码过期"',
]
for selector in expired_selectors:
expired_elem = await self.page.query_selector(selector)
if expired_elem:
is_visible = await expired_elem.is_visible()
if is_visible:
# 进一步检查元素文本内容
text_content = await expired_elem.text_content()
# 只在明确显示"已过期"或"已失效"时才认为失效,忽略"二维码X分钟失效"这种提示
if text_content and ('已过期' in text_content or '已失效' in text_content):
logger.warning(f"[WebSocket] DOM检测到二维码失效: {selector}, 文本: {text_content}")
# 通过WebSocket推送失效消息
try:
from main import ws_manager
await ws_manager.send_message(session_id, {
"type": "qrcode_expired",
"message": "二维码已失效,请重新发送验证码"
})
logger.success(f"[WebSocket] 已推送二维码失效消息: {session_id}")
expired_notified = True # 标记已推送
except Exception as ws_error:
logger.error(f"[WebSocket] 推送消息失败: {str(ws_error)}")
# 退出所有循环
break
# 如果检测到失效,退出外层循环
if expired_notified:
break
except Exception as e:
# 页面可能已关闭,忽略错误
pass
# 每30秒打印一次状态
if i > 0 and i % 60 == 0:
logger.info(f"[WebSocket] 扫码监听中... ({i // 2}秒) URL: {current_url}, QR_Status: {latest_qr_status['status']}")
except Exception as e:
logger.error(f"[WebSocket] 监听异常: {str(e)}")
break
# 超时5分钟未扫码,通知前端关闭弹窗
logger.warning(f"[WebSocket] 扫码监听超时(5分钟): {session_id}")
try:
from main import ws_manager
await ws_manager.send_message(session_id, {
"type": "qrcode_expired",
"message": "二维码已超时,请重新发送验证码"
})
logger.success(f"[WebSocket] 已推送超时消息: {session_id}")
except Exception as ws_error:
logger.error(f"[WebSocket] 推送消息失败: {str(ws_error)}")
logger.info(f"[WebSocket] 监听任务结束: {session_id}")
except Exception as e:
logger.error(f"[WebSocket] 监听任务异常: {str(e)}")
finally:
# 清理监听器
try:
if self.page:
self.page.remove_listener('response', handle_qr_status_response)
logger.info(f"[WebSocket] 已移除API监听器")
except Exception as e:
logger.error(f"[WebSocket] 移除监听器失败: {str(e)}")
async def _navigate_with_qrcode_listener(self, url: str, timeout: int = 120):
"""
带有二维码API监听的页面导航
通过监听https://edith.xiaohongshu.com/api/sns/web/v1/login/qrcode/create
来判断登录框是否已加载完成,而不是等待固定时间
Args:
url: 目标URL
timeout: 最大等待时间(秒),默认120秒
"""
qrcode_api_detected = False
# 设置路由监听二维码创建API
async def handle_qrcode_create(route):
nonlocal qrcode_api_detected
try:
request = route.request
logger.info(f"[页面导航] 监听到二维码API请求: {request.url}")
qrcode_api_detected = True
# 继续请求
await route.continue_()
except Exception as e:
logger.error(f"[页面导航] 处理二维码API请求失败: {str(e)}")
await route.continue_()
try:
# 注册路由监听
await self.page.route('**/api/sns/web/v1/login/qrcode/create', handle_qrcode_create)
logger.info(f"[页面导航] 已注册二维码API监听")
# 开始导航,不等待加载完成
try:
await self.page.goto(url, wait_until='commit', timeout=timeout * 1000)
logger.info(f"[页面导航] 已开始导航到 {url}")
except Exception as e:
# 即使超时也继续,只要URL匹配
current_url = self.page.url
logger.warning(f"[页面导航] 导航超时,但尝试继续: {str(e)}")
logger.info(f"[页面导航] 当前URL: {current_url}")
# 等待二维码API请求(最多等待timeout秒)
wait_count = 0
max_wait = timeout * 10 # 每次等待0.1秒
while not qrcode_api_detected and wait_count < max_wait:
# 每次循环检查URL是否被风控跳转
current_url = self.page.url
if 'captcha' in current_url.lower() or 'verify' in current_url.lower():
logger.warning(f"[页面导航] 检测到风控页面跳转: {current_url}")
logger.warning("[页面导航] 立即停止等待二维码API")
break
await asyncio.sleep(0.1)
wait_count += 1
if qrcode_api_detected:
logger.success(f"[页面导航] 监听到二维码API请求,登录框已加载完成(耗时{wait_count * 0.1:.1f}秒)")
else:
logger.warning(f"[页面导航] {timeout}秒内未监听到二维码API请求,尝试继续")
# 额外等待500ms确保元素渲染完成
await asyncio.sleep(0.5)
finally:
# 移除路由监听
try:
await self.page.unroute('**/api/sns/web/v1/login/qrcode/create')
logger.info(f"[页面导航] 已移除二维码API监听")
except Exception:
pass
async def send_verification_code(self, phone: str, country_code: str = "+86", login_page: str = "creator", session_id: str = None) -> Dict[str, Any]:
"""
发送验证码
Args:
phone: 手机号
country_code: 国家区号
login_page: 登录页面类型,creator(创作者中心) 或 home(小红书首页)
session_id: 会话ID,用于WebSocket通知
Returns:
Dict containing success status and error message if any
"""
try:
logger.info(f"[发送验证码] 开始 - 手机号: {phone}, 登录页面: {login_page}")
if not self.page:
logger.info(f"[发送验证码] 浏览器未初始化,开始初始化...")
await self.init_browser()
self.current_phone = phone
# 根据login_page参数选择登录URL
if login_page == "home":
login_url = 'https://www.xiaohongshu.com'
page_name = "小红书首页"
else:
login_url = 'https://creator.xiaohongshu.com/login'
page_name = "创作者中心"
# 优化:如果浏览器已预热且在登录页,直接使用
current_url = self.page.url if self.page else ""
if self.use_pool and self.browser_pool and self.browser_pool.is_preheated:
if login_url in current_url:
logger.success(f"✅ 浏览器已预热在{page_name}登录页,直接使用!")
else:
# 页面变了,重新访问登录页
logger.success(f"[预热] 页面已变更 ({current_url}),重新访问{page_name}登录页...")
await self._navigate_with_qrcode_listener(login_url)
else:
# 未预热或不是池模式,使用监听机制访问页面
logger.debug(f"正在访问{page_name}登录页...")
await self._navigate_with_qrcode_listener(login_url)
logger.success(f"✅ 已进入{page_name}登录页面")
# 立即检查是否被风控跳转到验证页面
current_url = self.page.url
logger.info(f"[风控检测] 当前URL: {current_url}")
# 检查是否在风控页面
if 'captcha' in current_url.lower() or 'verify' in current_url.lower():
logger.warning("="*50)
logger.warning(f"⚠️ 发送验证码阶段检测到风控页面!")
logger.warning(f"完整URL: {current_url}")
logger.warning("="*50)
# 等待页面加载完成
logger.info("等待验证页面加载完成...")
await asyncio.sleep(1)
# 尝试提取二维码
logger.info("开始提取二维码...")
qrcode_data = await self.extract_verification_qrcode()
if qrcode_data:
logger.success("✅ 成功提取验证二维码")
logger.info(f"二维码数据长度: {len(qrcode_data)} 字符")
logger.info("返回二维码给前端,等待用户扫码后重新调用接口")
# 启动后台任务监听页面跳转,扫码完成后通知前端
asyncio.create_task(self._monitor_qrcode_scan(session_id))
logger.info(f"[WebSocket] 已启动扫码监听任务: {session_id}")
return {
"success": False,
"error": "需要验证",
"need_captcha": True,
"captcha_type": "qrcode",
"qrcode_image": qrcode_data,
"message": "发送验证码时触发风控,需要扫码验证。扫码后页面会自动跳转回首页,请重新点击发送验证码"
}
else:
logger.error("⚠️ 检测到验证页面但未提取到二维码")
logger.info("尝试保存截图...")
try:
await self.page.screenshot(path='logs/captcha_page_sendcode.png')
logger.success("截图已保存到 logs/captcha_page_sendcode.png")
except Exception as e:
logger.error(f"保存截图失败: {str(e)}")
return {
"success": False,
"error": "验证页面异常",
"need_captcha": True,
"captcha_type": "unknown",
"message": "检测到验证页面但无法提取二维码"
}
# 检查是否已经在首页(扫码后跳转回来的)
if login_page == "home" and login_url in current_url:
logger.success("✅ 已在首页,风控已解除,继续正常登录流程")
# 根据登录页面类型处理协议复选框
if login_page == "home":
# 小红书首页需要主动触发登录框
logger.info("处理小红书首页登录流程...")
try:
# 首先尝试触发登录框(点击登录按钮)
logger.debug("查找并点击登录按钮以弹出登录框...")
login_trigger_selectors = [
'.login', # 常见的登录按钮class
'text="登录"',
'button:has-text("登录")',
'a:has-text("登录")',
'.header-login',
'[class*="login"]',
]
login_triggered = False
for selector in login_trigger_selectors:
try:
login_btn = await self.page.query_selector(selector)
if login_btn:
# 检查是否可见
is_visible = await login_btn.is_visible()
if is_visible:
logger.success(f"✅ 找到登录触发按钮: {selector}")
await login_btn.click()
logger.success("✅ 已点击登录按钮,等待登录框弹出...")
await asyncio.sleep(0.5) # 从1秒减少到0.5秒
login_triggered = True
break
except Exception as e:
logger.error(f"尝试选择器 {selector} 失败: {str(e)}")
continue
if not login_triggered:
logger.warning("⚠️ 未找到登录触发按钮,假设登录框已存在")
# 等待登录弹窗中的元素加载
logger.info("等待登录弹窗中的元素加载...")
# 直接等待手机号输入框出现(说明登录框已弹出)
phone_input_ready = False
try:
await self.page.wait_for_selector('input[placeholder="输入手机号"]', timeout=3000) # 从to 8秒减少到3秒
phone_input_ready = True
logger.success("✅ 登录弹窗已弹出,手机号输入框就绪")
except Exception:
logger.warning("⚠️ 等待登录弹窗超时,尝试继续...")
# 检查是否需要点击“手机号登录”选项卡(如果有多个登录方式)
phone_login_tab_selectors = [
'text="手机号登录"',
'div:has-text("手机号登录")',
'.title:has-text("手机号登录")',
]
phone_login_tab = None
for selector in phone_login_tab_selectors:
try:
phone_login_tab = await self.page.query_selector(selector)
if phone_login_tab:
# 检查是否已经选中
is_active = await phone_login_tab.evaluate('el => el.classList.contains("active") || el.parentElement.classList.contains("active")')
if not is_active:
logger.success(f"✅ 找到手机号登录选项卡: {selector}")
await phone_login_tab.click()
logger.success("✅ 已点击手机号登录选项卡")
await asyncio.sleep(0.3) # 从0.5秒减少到0.3秒
else:
logger.success("✅ 手机号登录选项卡已选中")
break
except Exception:
continue
if not phone_login_tab:
logger.warning("✅ 未找到手机号登录选项卡,可能已经是手机号登录界面")
# 查找并点击协议复选框(小红书首页特有)
agreement_selectors = [
'.agree-icon',
'.agreements .icon-wrapper',
'span.agree-icon',
'.icon-wrapper',
]
agreement_checkbox = None
for selector in agreement_selectors:
agreement_checkbox = await self.page.query_selector(selector)
if agreement_checkbox:
# 检查是否已勾选
is_checked = await agreement_checkbox.evaluate('el => el.classList.contains("checked") || el.querySelector(".checked") !== null')
if not is_checked:
logger.success(f"✅ 找到协议复选框: {selector}")
await agreement_checkbox.click()
logger.success("✅ 已勾选协议")
await asyncio.sleep(0.2)
else:
logger.success("✅ 协议已勾选")
break
if not agreement_checkbox:
logger.warning("⚠️ 未找到协议复选框,尝试继续...")
except Exception as e:
logger.error(f"处理首页登录流程失败: {str(e)}")
else:
# 创作者中心登录流程
# 根据记忆:小红书登录跳过协议复选框,无需处理
# 优化:简化协议处理,减少等待时间
try:
agreement_btn = await self.page.query_selector('text="同意并继续"')
if agreement_btn:
await agreement_btn.click()
logger.success(f"✅ 已点击协议按钮")
await asyncio.sleep(0.3)
except Exception:
pass # 无协议弹窗(正常情况)
# 输入手机号
try:
logger.debug("查找手机号输入框...")
# 根据登录页面类型选择不同的选择器
if login_page == "home":
# 小红书首页的手机号输入框(已经在上面等待过了)
phone_input_selectors = [
'input[placeholder="输入手机号"]',
'label.phone input',
'input[name="blur"]',
'input[type="text"]',
]
else:
# 创作者中心的手机号输入框
phone_input_selectors = [
'input[placeholder="手机号"]',
'input.css-nt440g',
'input[placeholder*="手机号"]',
'input[type="tel"]',
'input[type="text"]',
]
# 优化:直接查找,不重试(因为已经等待过元素就绪)
phone_input = None
for selector in phone_input_selectors:
phone_input = await self.page.query_selector(selector)
if phone_input:
logger.success(f"✅ 找到手机号输入框: {selector}")
# 清空并输入手机号(使用原生JS,避免上下文销毁)
await self.page.evaluate(f'''
(selector) => {{
const input = document.querySelector(selector);
if (input) {{
input.value = '';
input.focus();
input.value = '{phone}';
input.dispatchEvent(new Event('input', {{ bubbles: true }}));
input.dispatchEvent(new Event('change', {{ bubbles: true }}));
}}
}}
''', selector)
logger.success(f"✅ 已输入手机号: {phone}")
await asyncio.sleep(0.3)
break
if not phone_input:
# 打印页面信息用于调试
logger.warning("⚠️ 未找到手机号输入框,打印页面信息...")
logger.info(f"页面URL: {self.page.url}")
# 查找所有input元素
inputs = await self.page.query_selector_all('input')
logger.info(f"页面上找到 {len(inputs)} 个input元素")
for i, inp in enumerate(inputs[:5]):
try:
placeholder = await inp.get_attribute('placeholder')
input_type = await inp.get_attribute('type')
name = await inp.get_attribute('name')
class_name = await inp.get_attribute('class')
logger.info(f"Input {i+1}: type={input_type}, placeholder={placeholder}, name={name}, class={class_name}")
except Exception:
pass
return {
"success": False,
"error": "未找到手机号输入框,请检查页面是否正确加载"
}
except Exception as e:
# 保存错误截图
await save_error_screenshot(
self.page,
"send_code_input_phone_failed",
f"输入手机号失败: {str(e)}"
)
return {
"success": False,
"error": f"输入手机号失败: {str(e)}"
}
# 点击发送验证码按钮
try:
logger.debug("查找发送验证码按钮...")
# 等待页面稳定(输入手机号后可能有动态渲染)
await asyncio.sleep(0.3) # 从0.5秒减少到0.3秒
# 根据登录页面类型选择不同的选择器
if login_page == "home":
# 小红书首页的验证码按钮
selectors = [
'span.code-button',
'.code-button',
'text="获取验证码"',
'span:has-text("获取验证码")',
]
else:
# 创作者中心的验证码按钮
selectors = [
'div.css-uyobdj',
'text="发送验证码"',
'div:has-text("发送验证码")',
'text="重新发送"',
'text="获取验证码"',
]
# 直接查找,不重试
send_code_selector = None
for selector in selectors:
send_code_btn = await self.page.query_selector(selector)
if send_code_btn:
logger.success(f"✅ 找到发送验证码按钮: {selector}")
send_code_selector = selector
break
if send_code_selector:
# 重新获取元素句柄以确保其有效性
send_code_btn = await self.page.query_selector(send_code_selector)
if not send_code_btn:
return {
"success": False,
"error": "按钮元素已失效,请重试"
}
# 获取按钮文本内容
btn_text = await send_code_btn.inner_text()
btn_text = btn_text.strip() if btn_text else ""
logger.info(f"📝 按钮文本: '{btn_text}'")
# 检查按钮是否处于倒计时状态
# 倒计时状态通常显示为: "59s", "58s", "60秒后重新获取" 等
if btn_text and (btn_text[-1] == 's' or '秒' in btn_text or btn_text.isdigit()):
logger.warning(f"⚠️ 按钮处于倒计时状态: {btn_text}")
return {
"success": False,
"error": f"验证码发送过于频繁,请{btn_text}后再试"
}
# 检查按钮文本是否为期望的"获取验证码"或"发送验证码"
expected_texts = ["获取验证码", "发送验证码", "重新发送"]
if btn_text not in expected_texts:
logger.warning(f"⚠️ 按钮文本不符合预期: '{btn_text}', 期望: {expected_texts}")
return {
"success": False,
"error": f"按钮状态异常(当前文本: {btn_text}),请刷新页面重试"
}
# 检查按钮是否有 active 类(小红书首页的按钮需要active才能点击)
if login_page == "home":
class_name = await send_code_btn.get_attribute('class') or ""
if 'active' not in class_name:
logger.warning(f"⚠️ 按钮未激活状态: class={class_name}")
return {
"success": False,
"error": "按钮未激活,请检查手机号是否正确输入"
}
logger.success(f"✅ 按钮已激活: class={class_name}")
# 在点击前再次确保元素有效(页面DOM可能在检查过程中更新)
try:
# 使用 page.click 直接通过选择器点击,避免元素句柄失效问题
await self.page.click(send_code_selector, timeout=5000)
logger.success("✅ 已点击发送验证码")
except Exception as click_error:
# 如果直接点击失败,尝试重新获取元素点击
logger.error(f"⚠️ 直接点击失败: {str(click_error)}, 尝试重新获取元素")
send_code_btn = await self.page.query_selector(send_code_selector)
if send_code_btn:
await send_code_btn.click()
logger.success("✅ 重新获取元素后点击成功")
else:
raise Exception("按钮元素已失效,无法点击")
# 等待页面响应,检测是否出现验证二维码
await asyncio.sleep(1.5)
# 检查当前页面URL是否包含captcha(验证页面)
current_url = self.page.url
if 'captcha' in current_url or 'verify' in current_url:
logger.warning(f"⚠️ 检测到验证页面: {current_url}")
# 尝试提取二维码图片
qrcode_data = await self.extract_verification_qrcode()
if qrcode_data:
logger.success("✅ 成功提取验证二维码")
return {
"success": False,
"need_captcha": True,
"captcha_type": "qrcode",
"qrcode_image": qrcode_data,
"message": "需要扫码验证,请使用小红书APP扫描二维码"
}
else:
return {
"success": False,
"need_captcha": True,
"captcha_type": "unknown",
"message": "出现验证码验证,请稍后重试"
}
# 直接返回成功,不再检测滑块
logger.info(f"[发送验证码] 成功 - 手机号: {phone}")
logger.success("\n✅ 验证码发送流程完成,请查看手机短信")
logger.info("请在小程序中输入收到的验证码并点击登录\n")
logger.success("[响应即将返回] success=True, message=验证码发送成功")
return {
"success": True,
"message": "验证码发送成功,请查看手机短信"
}
else:
return {
"success": False,
"error": "未找到发送验证码按钮,请检查页面结构"
}
except Exception as e:
# 保存错误截图
await save_error_screenshot(
self.page,
"send_code_click_button_failed",
f"点击发送验证码失败: {str(e)}"
)
return {
"success": False,
"error": f"点击发送验证码失败: {str(e)}"
}
except Exception as e:
error_msg = str(e)
logger.error(f"[发送验证码] 异常 - 手机号: {phone}, 错误: {error_msg}")
logger.error(f"\n❌ 发送验证码异常: {error_msg}")
logger.info(f"当前页面URL: {self.page.url if self.page else 'N/A'}")
# 打印调试信息
if self.page:
try:
logger.error("尝试截图保存错误状态...")
await self.page.screenshot(path='error_screenshot.png')
logger.error("✅ 错误状态已截图保存到 error_screenshot.png")
except Exception:
pass
return {
"success": False,
"error": error_msg
}
async def login(self, phone: str, code: str, country_code: str = "+86", login_page: str = "creator") -> Dict[str, Any]:
"""
使用验证码登录
Args:
phone: 手机号
code: 验证码
country_code: 国家区号
login_page: 登录页面类型,creator(创作者中心) 或 home(小红书首页)
Returns:
Dict containing login result, user info and cookies
"""
try:
if not self.page:
return {
"success": False,
"error": "页面未初始化,请先发送验证码"
}
# 输入验证码
try:
logger.debug("查找验证码输入框...")
# 根据登录页面类型选择不同的选择器
if login_page == "home":
# 小红书首页的验证码输入框
code_input_selectors = [
'input[placeholder="输入验证码"]', # 从您提供的HTML中找到
'label.auth-code input',
'input[type="number"]',
'input[placeholder*="验证码"]',
]
else:
# 创作者中心的验证码输入框
code_input_selectors = [
'input[placeholder="验证码"]', # 根据HTML精确匹配
'input.css-1ge5flv', # 根据HTML中的class
'input[placeholder*="验证码"]',
'input[type="text"]:not([placeholder*="手机"])',
]
code_input = None
for selector in code_input_selectors:
try:
code_input = await self.page.wait_for_selector(selector, timeout=2000)
if code_input:
logger.success(f"✅ 找到验证码输入框: {selector}")
break
except Exception:
continue
if not code_input:
return {
"success": False,
"error": "未找到验证码输入框"
}
await code_input.click()
await asyncio.sleep(0.2)
await code_input.press('Control+A')
await code_input.type(code, delay=50)
logger.success(f"✅ 已输入验证码: {code}")
await asyncio.sleep(0.5)
except Exception as e:
return {
"success": False,
"error": f"输入验证码失败: {str(e)}"
}
# 点击登录按钮
try:
logger.debug("查找登录按钮...")
# 根据登录页面类型选择不同的选择器
if login_page == "home":
# 小红书首页的登录按钮
login_btn_selectors = [
'button.submit', # 从您提供的HTML中找到
'button:has-text("登录")',
'text="登录"',
'.submit',
]
else:
# 创作者中心的登录按钮
login_btn_selectors = [
'button.beer-login-btn', # 根据HTML中的class
'button.css-y4h4ay', # 根据HTML
'button:has-text("登 录")', # 注意有空格
'button:has-text("登录")',
'text="登 录"',
'text="登录"',
'.login-button',
]
login_btn = None
for selector in login_btn_selectors:
try:
login_btn = await self.page.wait_for_selector(selector, timeout=2000)
if login_btn:
logger.success(f"✅ 找到登录按钮: {selector}")
break
except Exception:
continue
if not login_btn:
# 打印所有按钮用于调试
logger.warning("⚠️ 未找到登录按钮,打印所有按钮...")
buttons = await self.page.query_selector_all('button')
logger.info(f"页面上找到 {len(buttons)} 个按钮")
for i, btn in enumerate(buttons[:10]):
try:
text = await btn.inner_text()
classes = await btn.get_attribute('class')
logger.info(f"按钮 {i+1}: 文本=[{text.strip()}] class=[{classes}]")
except Exception:
pass
return {
"success": False,
"error": "未找到登录按钮"
}
# 优化:在点击登录前注册API监听,避免错过直接登录成功的API响应
logger.info("[登录检测] 开始监听user/me API...")
login_success = False
user_me_data = None
# 添加:拦截小红书反检测脚本请求(异步处理,不阻塞)
async def handle_shield_script(route):
try:
# 快速放行,不阻塞后续流程
await route.continue_()
# 异步记录日志
request = route.request
url = request.url
logger.warning(f"[反检测拦截] 监听到小红书反检测脚本请求: {url}")
except Exception as e:
logger.error(f"[反检测拦截] 处理失败: {str(e)}")
await route.continue_()
# 注册小红书反检测脚本监听
await self.page.route('**/api/sec/v1/scripting*', handle_shield_script)
logger.info("[反检测拦截] 已注册小红书反检测脚本监听")
# 设置路由监听用户信息API
async def handle_user_me(route):
nonlocal login_success, user_me_data
try:
request = route.request
logger.info(f"[登录检测] 监听到用户信息API: {request.url}")
response = await route.fetch()
body = await response.body()
try:
data = json.loads(body.decode('utf-8'))
logger.info(f"[登录检测] API响应: {json.dumps(data, ensure_ascii=False)}")
# 检查是否登录成功:code=0, success=true, guest=false
if (data.get('code') == 0 and
data.get('success') == True and
data.get('data', {}).get('guest') == False):
login_success = True
user_me_data = data.get('data')
logger.success(f"✅ 检测到登录成功,用户: {user_me_data.get('nickname')}")
# 通过WebSocket推送登录成功消息
if session_id:
try:
from main import ws_manager
await ws_manager.send_message(session_id, {
"type": "login_success",
"user_info": user_me_data
})
logger.info(f"[WebSocket] 已推送登录成功消息: {session_id}")
except Exception as ws_error:
logger.error(f"[WebSocket] 推送消息失败: {str(ws_error)}")
except Exception as e:
logger.error(f"[登录检测] 解析响应失败: {str(e)}")
await route.fulfill(response=response)
except Exception as e:
logger.error(f"[登录检测] 处理API请求失败: {str(e)}")
await route.continue_()
# 注册路由监听
await self.page.route('**/api/sns/web/v2/user/me', handle_user_me)
logger.info("[登录检测] 已注册用户信息API监听")
# 点击登录按钮
logger.info("="*50)
logger.info("开始点击登录按钮")
logger.info(f"点击前URL: {self.page.url}")
await login_btn.click()
logger.success("✅ 已点击登录按钮")
# 立即检查URL变化
await asyncio.sleep(0.2)
logger.info(f"点击后0.2秒URL: {self.page.url}")
# 优化:简化协议处理,减少等待
await asyncio.sleep(0.3)
logger.info(f"点击后0.5秒URL: {self.page.url}")
try:
popup_btn = await self.page.query_selector('text="同意并继续"')
if popup_btn:
await popup_btn.click()
logger.success(f"✅ 已点击登录后的协议弹窗")
await asyncio.sleep(0.3)
logger.info(f"点击协议后URL: {self.page.url}")
except Exception:
pass # 无弹窗
# 优化:持续检测URL变化,最多等待10秒
logger.info("="*50)
logger.info("开始持续检测URL变化...")
captcha_detected = False
for i in range(20): # 20 * 0.5 = 10秒
await asyncio.sleep(0.5)
current_url = self.page.url
# 每次都打印URL,方便看到变化
if i == 0 or i % 2 == 0: # 每秒打印一次
logger.info(f"[检测{i+1}/20] 当前URL: {current_url}")
# 1. 检查URL是否包含captcha或verify
if 'captcha' in current_url.lower() or 'verify' in current_url.lower():
logger.warning("="*50)
logger.warning(f"⚠️ 检测到验证页面跳转!")
logger.warning(f"完整URL: {current_url}")
logger.warning("="*50)
captcha_detected = True
# 等待页面加载完成
logger.info("等待验证页面加载完成...")
await asyncio.sleep(1)
logger.info(f"等待后URL: {self.page.url}")
# 尝试提取二维码
logger.info("开始提取二维码...")
qrcode_data = await self.extract_verification_qrcode()
if qrcode_data:
logger.success("✅ 成功提取验证二维码,返回给前端")
logger.info(f"二维码数据长度: {len(qrcode_data)} 字符")
# 注意:不移除API监听,保持session_id对应的浏览器继续运行
return {
"success": False,
"need_captcha": True,
"captcha_type": "qrcode",
"qrcode_image": qrcode_data,
"session_id": session_id, # 返回session_id,供后续轮询使用
"message": "需要扫码验证,请使用小红书APP扫描二维码"
}
else:
logger.error("⚠️ 检测到验证页面但未提取到二维码")
logger.info("尝试保存截图...")
try:
await self.page.screenshot(path='logs/captcha_page.png')
logger.success("截图已保存到 logs/captcha_page.png")
except Exception as e:
logger.error(f"保存截图失败: {str(e)}")
break
# 2. 检查是否直接登录成功(URL跳转到explore或creator)
if 'explore' in current_url or 'creator' in current_url:
logger.success("="*50)
logger.success(f"✅ 检测到登录成功URL跳转")
logger.success(f"完整URL: {current_url}")
logger.success("="*50)
break
if not captcha_detected:
logger.info("="*50)
logger.info(f"10秒检测结束,未检测到验证页面")
logger.info(f"最终URL: {self.page.url}")
logger.info("="*50)
# 2. 即使URL没变,也要检测页面上是否出现二维码弹窗
logger.info("检测页面上是否出现扫码验证...")
qrcode_selectors = [
'.qrcode-img',
'img.qrcode-img',
'.qrcode-container img',
'img[src*="data:image"]',
'img[src*="qrcode"]',
'img[alt*="二维码"]',
'img[alt*="qrcode"]',
]
for selector in qrcode_selectors:
try:
qrcode_elem = await self.page.query_selector(selector)
if qrcode_elem:
logger.warning(f"⚠️ 检测到页面上出现二维码: {selector}")
qrcode_data = await self.extract_verification_qrcode()
if qrcode_data:
logger.success("✅ 成功提取扫码验证二维码,返回给前端")
# 注意:不移除API监听,保持session_id对应的浏览器继续运行
return {
"success": False,
"need_captcha": True,
"captcha_type": "qrcode",
"qrcode_image": qrcode_data,
"session_id": session_id, # 返回session_id,供后续轮询使用
"message": "需要扫码验证,请使用小红书APP扫描二维码"
}
break
except Exception:
continue
logger.info("未检测到扫码验证,继续等待登录...")
# 等待URL跳转或API响应(最多30秒)
logger.info("[登录检测] 等待扫码完成或登录跳转...")
url_jumped = False
for i in range(60): # 60 * 0.5 = 30秒
await asyncio.sleep(0.5)
# 如果捕获到user/me API,说明登录成功
if login_success:
logger.success(f"✅ 通过API确认登录成功")
break
current_url = self.page.url
# 检查URL是否跳转
if 'login' not in current_url:
if 'creator.xiaohongshu.com' in current_url or 'www.xiaohongshu.com' in current_url:
if not url_jumped:
logger.success(f"✅ URL已跳转: {current_url}")
url_jumped = True
# URL跳转后继续等待API响应,最多再等待10秒
logger.info("[登录检测] URL已跳转,继续等待user/me API...")
# 移除路由监听
try:
await self.page.unroute('**/api/sns/web/v2/user/me')
logger.info("[登录检测] 已移除用户信息API监听")
except Exception:
pass
# 如果没有捕获到API,但URL已跳转,尝试通过Cookie判断
if not login_success and url_jumped:
logger.warning("[登录检测] 未捕获到user/me API,尝试通过Cookie判断...")
cookies = await self.context.cookies()
cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}
# 检查关键Cookie是否存在
key_cookies = ['web_session', 'webId', 'a1']
has_key_cookies = all(key in cookies_dict for key in key_cookies)
if has_key_cookies:
logger.success(f"✅ 检测到关键Cookie,判定登录成功")
login_success = True
# 没有user_me_data,后续会从localStorage获取
else:
logger.error(f"❌ 未检测到关键Cookie: {list(cookies_dict.keys())}")
# 移除路由监听
try:
await self.page.unroute('**/api/sns/web/v2/user/me')
logger.info("[登录检测] 已移除用户信息API监听")
except Exception:
pass
if not login_success:
# 8秒后还在登录页,可能验证码错误
if 'login' in self.page.url:
# 保存错误截图
await save_error_screenshot(
self.page,
"login_failed_wrong_code",
"登录失败,验证码可能错误"
)
return {
"success": False,
"error": "登录失败,请检查验证码是否正确"
}
else:
# URL已跳转但Cookie不对
await save_error_screenshot(
self.page,
"login_failed_no_cookie",
"登录失败,未获取到登录Cookie"
)
return {
"success": False,
"error": "登录失败,未获取到有效的登录信息"
}
except Exception as e:
# 保存错误截图
await save_error_screenshot(
self.page,
"login_click_button_failed",
f"点击登录按钮失败: {str(e)}"
)
return {
"success": False,
"error": f"点击登录按钮失败: {str(e)}"
}
# 检查是否登录成功
# 优化:已经通过URL跳转检查,但需要再次确认页面稳定
logger.success("✅ 登录成功,正在确认页面稳定性...")
# 优化:减少等待时间
await asyncio.sleep(1) # 从2秒减少到1秒
final_url = self.page.url
if 'login' in final_url:
logger.warning("⚠️ 检测到页面被重定向回登录页,Cookie可能被小红书拒绝")
await save_error_screenshot(
self.page,
"login_redirect_back",
"登录后被重定向回登录页"
)
return {
"success": False,
"error": "登录失败:小红书检测到异常登录行为,请稍后再试或使用手动登录"
}
logger.success(f"✅ 页面稳定,最终URL: {final_url}")
# 获取Cookies
cookies = await self.context.cookies()
# 注意:这里返回两种格式
# 1. cookies_dict: 键值对格式(用于 API 返回,方便前端展示)
# 2. cookies: Playwright 完整格式(用于保存文件和后续使用)
cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}
# 打印重要的Cookies
logger.info(f"\n========== Cookies 信息 ==========")
logger.info(f"共获取到 {len(cookies)} 个Cookie")
# 打印所有Cookie名称
logger.info(f"\nCookie名称列表: {list(cookies_dict.keys())}")
# 完整打印所有Cookies(键值对格式)
logger.info(f"\n完整Cookies内容(键值对格式):")
for name, value in cookies_dict.items():
logger.info(f" {name}: {value}")
logger.info(f"\n================================\n")
# 获取用户信息(优先使用API返回的user_me_data)
user_info = {}
if user_me_data:
# 使用API返回的用户信息
user_info = {
'nickname': user_me_data.get('nickname'),
'desc': user_me_data.get('desc'),
'gender': user_me_data.get('gender'),
'avatar': user_me_data.get('images'),
'red_id': user_me_data.get('red_id'),
'user_id': user_me_data.get('user_id'),
'guest': user_me_data.get('guest')
}
logger.success(f"✅ 使用API返回的用户信息: {user_info.get('nickname')}")
else:
# 备用方案:从localStorage获取
try:
# 从 localStorage 获取用户信息(最关键)
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
storage_dict = json.loads(storage)
# 提取有用的localStorage数据
useful_keys = ['b1', 'b1b1', 'p1']
for key in useful_keys:
if key in storage_dict:
try:
value = storage_dict[key]
if value and value.strip():
user_info[key] = json.loads(value) if value.startswith('{') or value.startswith('[') else value
except:
user_info[key] = storage_dict[key]
# 获取用户数据
for key, value in storage_dict.items():
if 'user' in key.lower():
try:
user_info['user_data'] = json.loads(value)
break
except:
pass
logger.success(f"✅ 从 localStorage 获取到用户信息: {list(user_info.keys())}")
except Exception as e:
logger.error(f"⚠️ 获取用户信息失败: {str(e)}")
# 获取当前URL(可能包含token等信息)
current_url = self.page.url
logger.info(f"当前URL: {current_url}")
# 获取完整的localStorage数据
localStorage_data = {}
try:
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
localStorage_data = json.loads(storage)
logger.success(f"✅ 获取到 {len(localStorage_data)} 个localStorage项")
except Exception as e:
logger.error(f"⚠️ 获取localStorage失败: {str(e)}")
# 获取sessionStorage数据
sessionStorage_data = {}
try:
session_storage = await self.page.evaluate('() => JSON.stringify(sessionStorage)')
sessionStorage_data = json.loads(session_storage)
logger.success(f"✅ 获取到 {len(sessionStorage_data)} 个sessionStorage项")
except Exception as e:
logger.error(f"⚠️ 获取sessionStorage失败: {str(e)}")
# 保存完整的登录状态(包含Cookies、localStorage、sessionStorage)
try:
login_state = {
"cookies": cookies, # Playwright 完整格式
"localStorage": localStorage_data,
"sessionStorage": sessionStorage_data,
"url": current_url,
"timestamp": time.time(),
"user_agent": self.context._impl_obj._options.get('userAgent'),
"viewport": self.context._impl_obj._options.get('viewport')
}
# 保存到文件(兼容旧版)
with open('login_state.json', 'w', encoding='utf-8') as f:
json.dump(login_state, f, ensure_ascii=False, indent=2)
logger.success("✅ 已保存完整登录状态到 login_state.json 文件")
logger.info(f" 包含: {len(cookies)} 个Cookies, {len(localStorage_data)} 个localStorage, {len(sessionStorage_data)} 个sessionStorage")
# 兼容性:同时保存单独的cookies.json文件
with open('cookies.json', 'w', encoding='utf-8') as f:
json.dump(cookies, f, ensure_ascii=False, indent=2)
logger.success("✅ 已保存 Cookies 到 cookies.json 文件(兼容旧版)")
# 新增:使用Playwright原生storage_state保存(按手机号命名)
storage_state_dir = 'storage_states'
os.makedirs(storage_state_dir, exist_ok=True)
storage_state_filename = f"xhs_{phone}.json"
storage_state_path = os.path.join(storage_state_dir, storage_state_filename)
# 使用Playwright原生API保存storage_state
storage_state_data = await self.context.storage_state(path=storage_state_path)
logger.success(f"✅ 已保存 Playwright Storage State 到: {storage_state_path}")
logger.info(f" 此文件包含完整的浏览器上下文状态,可用于后续免登录恢复")
except Exception as e:
logger.error(f"保存登录状态文件失败: {str(e)}")
return {
"success": True,
"user_info": user_info,
"cookies": cookies_dict, # API 返回:键值对格式(方便前端展示)
"cookies_full": cookies, # API 返回:Playwright完整格式(数据库存储/脚本使用)
"login_state": login_state, # API 返回:完整登录状态(供Go服务存储到数据库)
"localStorage": localStorage_data, # API 返回:localStorage数据
"sessionStorage": sessionStorage_data, # API 返回:sessionStorage数据
"url": current_url,
"storage_state_path": storage_state_path # 新增:storage_state文件路径
}
except Exception as e:
logger.error(f"登录异常: {str(e)}")
# 保存错误截图(通用错误)
await save_error_screenshot(
self.page,
"login_exception",
f"登录异常: {str(e)}"
)
return {
"success": False,
"error": str(e)
}
async def get_user_profile(self) -> Dict[str, Any]:
"""
获取用户详细信息
登录成功后可以调用此方法获取更多用户信息
"""
try:
if not self.page:
return {
"success": False,
"error": "页面未初始化"
}
# 访问用户主页
await self.page.goto('https://www.xiaohongshu.com/user/profile', wait_until='networkidle')
await asyncio.sleep(2)
# 这里可以根据实际需求抓取用户信息
# 示例:获取用户昵称、头像等
return {
"success": True,
"profile": {}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def verify_login_status(self, url: str = None) -> Dict[str, Any]:
"""
验证当前登录状态
访问指定的小红书页面检查是否已登录
Args:
url: 可选的验证URL,默认访问创作者平台
Returns:
Dict containing login status and user info if logged in
"""
try:
if not self.page:
return {
"success": False,
"logged_in": False,
"error": "页面未初始化"
}
logger.debug("正在验证登录状态...")
# 确定要访问的URL
target_url = url or 'https://creator.xiaohongshu.com/'
page_name = "创作者平台" if "creator" in target_url else "小红书首页"
logger.info(f"访问{page_name}...")
# 重置跳转计数器
self.redirect_count = 0
self.last_redirect_time = 0
try:
await self.page.goto(target_url, wait_until='domcontentloaded', timeout=60000)
await asyncio.sleep(2) # 等待页面加载
# 检查是否发生了频繁跳转
if self.redirect_count > 5:
logger.error(f"❌ 检测到无限跳转 ({self.redirect_count}次重定向),Cookie已失效")
return {
"success": True,
"logged_in": False,
"cookie_expired": True,
"infinite_redirect": True,
"message": "Cookie已失效,小红书检测到异常登录行为",
"url": self.page.url
}
logger.success(f"✅ 已访问{page_name},当前URL: {self.page.url}")
except Exception as e:
logger.error(f"访问{page_name}失败: {str(e)}")
return {
"success": False,
"logged_in": False,
"error": f"访问{page_name}失败: {str(e)}"
}
# 检查是否被重定向到登录页(未登录状态)
current_url = self.page.url
if 'login' in current_url.lower():
logger.error("❌ 未登录状态(被重定向到登录页)")
return {
"success": True,
"logged_in": False,
"cookie_expired": True, # 标识Cookie已失效
"message": "Cookie已失效或未登录",
"url": current_url
}
# 如果成功访问目标页面且未被重定向到登录页,说明已登录
if 'xiaohongshu.com' in current_url and 'login' not in current_url.lower():
logger.success(f"✅ 已登录状态(成功访问{page_name})")
# 获取当前的Cookies
cookies = await self.context.cookies()
# 转换为键值对格式(用于 API 返回)
cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}
# 尝试获取用户信息
user_info = {}
try:
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
storage_dict = json.loads(storage)
# 提取有用的localStorage数据
for key, value in storage_dict.items():
if 'user' in key.lower():
try:
user_data = json.loads(value)
user_info['user_data'] = user_data
break
except:
pass
except Exception as e:
logger.error(f"获取用户信息失败: {str(e)}")
return {
"success": True,
"logged_in": True,
"message": "Cookie有效,已登录",
"cookies": cookies_dict, # 键值对格式(前端展示)
"cookies_full": cookies, # Playwright完整格式(数据库存储/脚本使用)
"user_info": user_info,
"url": current_url
}
else:
logger.error("❌ 未登录状态(URL异常)")
return {
"success": True,
"logged_in": False,
"cookie_expired": True, # 标识Cookie已失效
"message": "Cookie已失效或未登录",
"url": current_url
}
except Exception as e:
logger.error(f"验证登录状态异常: {str(e)}")
return {
"success": False,
"logged_in": False,
"error": str(e)
}
def _calculate_title_width(self, title: str) -> int:
width = 0
for ch in title:
if unicodedata.east_asian_width(ch) in ("F", "W"):
width += 2
else:
width += 1
return width
async def publish_note(self, title: str, content: str, images: list = None, topics: list = None, cookies: list = None, proxy: str = None, user_agent: str = None) -> Dict[str, Any]:
"""
发布笔记(支持Cookie注入)
Args:
title: 笔记标题
content: 笔记内容
images: 图片路径列表(本地文件路径)
topics: 话题标签列表
cookies: 可选的Cookie列表(Playwright完整格式),用于注入登录态
proxy: 可选的代理地址,例如 http://ip:port
user_agent: 可选的自定义User-Agent,用于防指纹识别
Returns:
Dict containing publish result
"""
try:
# ========== 内容验证 ==========
logger.debug("\n========== 开始验证发布内容 ==========")
# 1. 验证标题长度
if not title or len(title.strip()) == 0:
return {
"success": False,
"error": "标题不能为空",
"error_type": "validation_error"
}
title = title.strip()
title_width = self._calculate_title_width(title)
if title_width > 40:
return {
"success": False,
"error": f"标题超出限制:当前宽度 {title_width},平台限制 40",
"error_type": "validation_error"
}
logger.success(f"✅ 标题验证通过: 宽度 {title_width}/40")
# 2. 验证内容长度
if not content or len(content.strip()) == 0:
return {
"success": False,
"error": "内容不能为空",
"error_type": "validation_error"
}
content_length = len(content)
if content_length > 1000:
return {
"success": False,
"error": f"内容超出限制:当前 {content_length} 个字,最多 1000 个字",
"error_type": "validation_error"
}
logger.success(f"✅ 内容验证通过: {content_length}/1000 个字")
# 3. 验证图片数量
images_count = len(images) if images else 0
if images_count == 0:
return {
"success": False,
"error": "至少需要 1 张图片",
"error_type": "validation_error"
}
if images_count > 18:
return {
"success": False,
"error": f"图片超出限制:当前 {images_count} 张,最多 18 张",
"error_type": "validation_error"
}
logger.success(f"✅ 图片数量验证通过: {images_count}/18 张")
logger.success("✅ 所有验证通过,开始发布\n")
# ========== 开始发布流程 ==========
# 如果提供了Cookie且使用浏览器池,创建独立的context和page
if cookies:
logger.warning("✅ 检测到Cookie,将创建独立的浏览器环境")
# 调试:打印cookies格式
if cookies and len(cookies) > 0:
logger.info(f" Cookie格式检查: 类型={type(cookies).__name__}, 数量={len(cookies)}")
if isinstance(cookies, list) and len(cookies) > 0:
first_cookie = cookies[0]
logger.info(f" 第一个cookie字段: {list(first_cookie.keys()) if isinstance(first_cookie, dict) else 'not dict'}")
if isinstance(first_cookie, dict):
# 检查关键字段的类型
for key in ['name', 'value', 'expires', 'sameSite']:
if key in first_cookie:
val = first_cookie[key]
logger.info(f" {key}: type={type(val).__name__}, value={val}")
# 使用浏览器池模式:复用主浏览器,但为发布创建独立的context
if self.use_pool and self.browser_pool:
logger.info("[浏览器池模式] 复用主浏览器实例")
# 从池中获取浏览器(仅获取browser实例)
self.browser, _, _ = await self.browser_pool.get_browser()
logger.info("[浏览器池] 复用主浏览器实例")
# 为发布任务创建全新的context(不复用预热的context)
context_kwargs = {
"viewport": {'width': 1280, 'height': 720},
"user_agent": user_agent or 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
}
self.context = await self.browser.new_context(**context_kwargs)
logger.info("[浏览器池模式] 为发布创建独立的context(避免污染预热环境)")
# 注入Cookie到新的context
await self.context.add_cookies(cookies)
logger.success(f"✅ 已注入 {len(cookies)} 个Cookie")
# 创建发布页面
logger.info("[浏览器池模式] 创建发布专用页面")
self.page = await self.context.new_page()
logger.success("✅ 发布页面创建成功\n")
elif not self.page:
# 非池模式且页面不存在,初始化浏览器
await self.init_browser(cookies, proxy=proxy, user_agent=user_agent)
else:
# 非池模式但页面已存在,添加Cookie
await self.context.add_cookies(cookies)
logger.success(f"✅ 已注入 {len(cookies)} 个Cookie")
# 如果没有Cookie且没有page,尝试使用池
if not self.page:
if self.use_pool and self.browser_pool:
logger.info("[浏览器池模式] 获取浏览器实例")
self.browser, self.context, self.page = await self.browser_pool.get_browser(proxy=proxy, user_agent=user_agent)
else:
return {
"success": False,
"error": "页面未初始化,请先登录或提供Cookie"
}
logger.debug("\n========== 开始发布笔记 ==========")
logger.info(f"标题: {title}")
logger.info(f"内容: {content[:50]}..." if len(content) > 50 else f"内容: {content}")
logger.info(f"图片数量: {len(images) if images else 0}")
logger.info(f"话题: {topics if topics else []}")
# 优化:直接访问图文发布页面URL,跳过点击tab步骤
logger.info("访问创作者平台图文发布页面...")
publish_url = 'https://creator.xiaohongshu.com/publish/publish?source=official&from=menu&target=image'
# 尝试访问页面(最多重试2次)
page_loaded = False
for attempt in range(2):
try:
if attempt > 0:
logger.info(f"第 {attempt + 1} 次尝试加载页面...")
else:
logger.debug("开始加载页面...")
# 使用更宽松的等待条件,不等待networkidle
await self.page.goto(
publish_url,
wait_until='load', # 从networkidle改为load,更快
timeout=40000 # 增加到40秒
)
# 等待页面稳定
await asyncio.sleep(2)
# 检查是否被跳转回登录页或其他页面
current_url = self.page.url
# 先打印URL信息,但不立即判定为错误
if current_url != publish_url:
logger.warning(f"⚠️ 检测到页面跳转: {current_url}")
logger.warning(f"⚠️ 期望页面: {publish_url}")
# 关键优化:等待5秒,给小红书时间自动重定向回发布页
if 'redirectReason' in current_url or 'login' in current_url:
logger.warning("🔄 检测到重定向参数,等待5秒让小红书自动重定向...")
await asyncio.sleep(5)
# 再次检查最终URL
final_url = self.page.url
logger.info(f"🔍 最终页面URL: {final_url}")
# 如果最终还是在发布页,则认为成功
if 'publish/publish' in final_url:
logger.success("✅ 自动重定向成功,已到达发布页")
current_url = final_url # 更新当前URL
elif 'login' in final_url and 'publish' not in final_url:
# 真的停留在登录页,Cookie失效
return {
"success": False,
"error": "Cookie可能已失效,页面跳转到登录页",
"error_type": "cookie_expired"
}
# 最终检查:只要URL中包含'publish/publish',就认为在发布页
if 'publish/publish' not in current_url:
logger.error(f"❌ 页面最终未到达发布页: {current_url}")
# 其他跳转,重试
if attempt < 1:
logger.info("等待3秒后重试...")
await asyncio.sleep(3)
continue
else:
return {
"success": False,
"error": f"页面跳转到意外地址: {current_url}"
}
# 验证页面是否加载成功(检查是否有上传控件)
upload_check = await self.page.query_selector('input[type="file"]')
if upload_check:
logger.success(f"✅ 已进入图文发布页面: {current_url}")
page_loaded = True
break
else:
logger.warning("⚠️ 页面加载完成但未找到上传控件,可能需要重试")
if attempt < 1: # 还有重试机会
await asyncio.sleep(2)
continue
else:
# 最后一次尝试也失败了,继续执行看看
logger.warning("⚠️ 未找到上传控件,但继续执行")
page_loaded = True
break
except Exception as e:
error_msg = f"访问发布页面失败(尝试{attempt + 1}/2): {str(e)}"
logger.error(f"❌ {error_msg}")
# 保存错误截图
try:
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
error_type = type(e).__name__
screenshot_path = f"error_screenshots/{timestamp}_{error_type}.png"
os.makedirs('error_screenshots', exist_ok=True)
await self.page.screenshot(path=screenshot_path, full_page=True)
logger.error(f"📸 已保存错误截图: {screenshot_path}")
except Exception as screenshot_error:
logger.error(f"⚠️ 保存截图失败: {screenshot_error}")
if attempt < 1: # 还有重试机会
logger.info("等待3秒后重试...")
await asyncio.sleep(3)
continue
else:
# 所有重试都失败了
import traceback
traceback.print_exc()
return {
"success": False,
"error": f"访问发布页面失败(已重试2次): {str(e)}"
}
if not page_loaded:
return {
"success": False,
"error": "页面加载失败"
}
# 上传图片(如果有)
if images and len(images) > 0:
try:
logger.debug(f"开始上传 {len(images)} 张图片...")
# 预处理图片:将网络图片下载到本地
local_images = []
downloaded_files = [] # 用于清理临时文件
# OSS域名前缀(用于补充不完整的图片路径)
oss_prefix = "https://bxmkb-beijing.oss-cn-beijing.aliyuncs.com/Images/"
for img_path in images:
original_path = img_path
# 检查是否需要补充OSS前缀
if not (img_path.startswith('http://') or img_path.startswith('https://')):
# 不是完整URL
if not os.path.isabs(img_path):
# 也不是绝对路径,检查是否需要补充OSS前缀
if '/' in img_path and not img_path.startswith('/'):
# 可能是OSS相对路径(如 20251221/xxx.png),补充前缀
img_path = oss_prefix + img_path
logger.warning(f" 检测到相对路径,补充OSS前缀: {original_path} -> {img_path}")
if img_path.startswith('http://') or img_path.startswith('https://'):
# 网络图片,需要下载
try:
local_path = await download_image(img_path)
local_images.append(local_path)
downloaded_files.append(local_path) # 记录以便后续清理
except Exception as e:
logger.error(f"⚠️ 下载图片 {img_path} 失败: {str(e)}")
return {
"success": False,
"error": f"下载图片失败: {str(e)}"
}
else:
# 本地图片,直接使用
local_images.append(img_path)
logger.success(f"✅ 图片预处理完成,共 {len(local_images)} 张本地图片")
# 优化:减少等待时间
await asyncio.sleep(0.5)
# 优化:直接使用最常见的选择器,先用query_selector快速查找
logger.debug("查找图片上传控件...")
upload_selectors = [
'input[type="file"][accept*="image"]',
'input[type="file"]',
'input[accept*="image"]',
'.upload-input',
'[class*="upload"] input[type="file"]',
]
file_input = None
for selector in upload_selectors:
try:
# 优化:使用query_selector代替wait_for_selector,更快
file_input = await self.page.query_selector(selector)
if file_input:
logger.info(f"找到文件上传控件: {selector}")
break
except Exception:
continue
# 如果快速查找失败,再用wait方式
if not file_input:
for selector in upload_selectors:
try:
file_input = await self.page.wait_for_selector(selector, timeout=3000)
if file_input:
logger.info(f"找到文件上传控件: {selector}")
break
except Exception:
continue
if file_input:
# 批量上传图片(使用本地图片)
images_count = len(local_images)
logger.debug(f"正在上传 {images_count} 张本地图片: {local_images}")
# 验证文件是否存在
for img_path in local_images:
if not os.path.exists(img_path):
logger.warning(f"⚠️ 警告: 图片文件不存在: {img_path}")
else:
file_size = os.path.getsize(img_path) / 1024
logger.success(f" ✅ 文件存在: {img_path} ({file_size:.1f}KB)")
await file_input.set_input_files(local_images)
logger.success(f"已设置文件路径,等待上传...")
# 等待一下让页面处理文件
await asyncio.sleep(1)
# 优化:更快速的图片上传检测(500ms间隔)
upload_success = False
uploaded_count = 0
page_destroyed = False
for i in range(60): # 最多等待30秒(60次 × 500ms)
await asyncio.sleep(0.5) # 优化:从1秒改为500ms
try:
# 检查页面是否还有效
if self.page.is_closed():
logger.warning("检测到页面已关闭")
page_destroyed = True
break
# 查找所有已上传的图片缩略图 - 增加更多选择器
uploaded_images = await self.page.query_selector_all('img[src*="blob:"]')
if not uploaded_images:
# 尝试其他选择器
uploaded_images = await self.page.query_selector_all('[class*="image"][class*="item"] img')
if not uploaded_images:
# 再尝试其他可能的选择器
uploaded_images = await self.page.query_selector_all('.image-item img, .upload-item img, .pic-item img')
if not uploaded_images:
# 最后尝试查找包含图片的元素
uploaded_images = await self.page.query_selector_all('img[src*="data:image"]')
uploaded_count = len(uploaded_images)
if uploaded_count > 0:
# 检查是否所有图片都已上传
if uploaded_count >= images_count:
logger.success(f"✅ 所有图片上传完成!共 {uploaded_count} 张")
upload_success = True
break
# 每秒打印一次进度(避免刷屏)
if i % 2 == 0:
logger.info(f"等待图片上传... {uploaded_count}/{images_count} ({(i+1)*0.5:.1f}/30秒)")
except Exception as e:
error_msg = str(e)
# 检查是否是页面跳转/销毁导致的异常
if 'context was destroyed' in error_msg.lower() or 'navigation' in error_msg.lower():
logger.error(f"检测到页面跳转: {error_msg}")
page_destroyed = True
break
logger.error(f"检测上传状态异常: {e}")
# 连续异常可能说明页面有问题,等待更长时间
if i > 10: # 5秒后还在异常
await asyncio.sleep(1)
# 如果页面被销毁,尝试等待重定向完成
if page_destroyed:
logger.warning("⚠️ 页面发生跳转,检查当前URL...")
await asyncio.sleep(3)
# 检查跳转后的URL
current_url = self.page.url
logger.info(f"跳转后的URL: {current_url}")
# 如果跳转到登录页,说明Cookie失效
if 'login' in current_url:
# 清理临时文件
for temp_file in downloaded_files:
try:
os.remove(temp_file)
except Exception:
pass
return {
"success": False,
"error": "Cookie已失效,上传过程中跳转到登录页",
"error_type": "cookie_expired"
}
# 如果仍然在发布页,重新检查图片
if 'publish/publish' in current_url:
logger.success("✅ 仍在发布页,重新检查图片...")
try:
uploaded_images = await self.page.query_selector_all('img[src*="blob:"], img[src*="data:image"], [class*="image"][class*="item"] img')
uploaded_count = len(uploaded_images)
if uploaded_count >= images_count:
logger.success(f"✅ 页面稳定后确认图片已上传!共 {uploaded_count} 张")
upload_success = True
else:
logger.warning(f"⚠️ 页面稳定后检测到 {uploaded_count}/{images_count} 张图片")
except Exception as e:
logger.error(f"页面稳定后检测失败: {e}")
else:
# 跳转到其他页面
# 清理临时文件
for temp_file in downloaded_files:
try:
os.remove(temp_file)
except Exception:
pass
return {
"success": False,
"error": f"上传过程中页面跳转到: {current_url}"
}
if upload_success:
logger.success(f"✅ 图片上传成功!共 {uploaded_count} 张")
await asyncio.sleep(0.5) # 优化:从2秒减少到0.5秒
# 清理下载的临时文件
for temp_file in downloaded_files:
try:
os.remove(temp_file)
logger.success(f"✅ 已清理临时文件: {temp_file}")
except Exception:
pass
else:
logger.warning(f"⚠️ 仅检测到 {uploaded_count}/{images_count} 张图片,但继续执行...")
else:
logger.warning("未找到隐藏的file input,尝试查找可点击的上传区域...")
# 调试: 打印页面上所有包含upload的元素
try:
all_elements = await self.page.query_selector_all('[class*="upload"], [id*="upload"]')
logger.info(f"\u627e到 {len(all_elements)} 个包含upload的元素")
for i, el in enumerate(all_elements[:10]): # 只看前10个
try:
tag_name = await el.evaluate('el => el.tagName')
class_name = await el.evaluate('el => el.className')
logger.info(f" [{i+1}] {tag_name} class='{class_name}'")
except Exception:
pass
except Exception:
pass
# 尝试点击上传区域或按钮
upload_area_selectors = [
'[class*="upload"][class*="box"]',
'[class*="upload"][class*="area"]',
'[class*="upload"][class*="wrapper"]',
'.upload-zone',
'div:has-text("上传图片")',
'div:has-text("点击上传")',
'button:has-text("上传图片")',
]
clicked = False
for selector in upload_area_selectors:
try:
area = await self.page.wait_for_selector(selector, timeout=2000)
if area:
logger.info(f"找到上传区域: {selector}")
await area.click()
await asyncio.sleep(0.5)
# 点击后再次查找file input
file_input = await self.page.wait_for_selector('input[type="file"]', timeout=2000)
if file_input:
images_count = len(local_images)
logger.debug(f"正在上传 {images_count} 张本地图片: {local_images}")
await file_input.set_input_files(local_images)
logger.success(f"已设置文件路径,等待上传...")
# 等待一下让页面处理文件
await asyncio.sleep(1)
# 优化:更快的图片上传检测
upload_success = False
uploaded_count = 0
page_destroyed = False
for i in range(60): # 最多30秒
await asyncio.sleep(0.5) # 优化:500ms间隔
try:
# 检查页面是否还有效
if self.page.is_closed():
logger.warning("检测到页面已关闭")
page_destroyed = True
break
uploaded_images = await self.page.query_selector_all('img[src*="blob:"]')
if not uploaded_images:
uploaded_images = await self.page.query_selector_all('[class*="image"][class*="item"] img')
if not uploaded_images:
uploaded_images = await self.page.query_selector_all('.image-item img, .upload-item img, .pic-item img')
if not uploaded_images:
uploaded_images = await self.page.query_selector_all('img[src*="data:image"]')
uploaded_count = len(uploaded_images)
if uploaded_count > 0:
if uploaded_count >= images_count:
logger.success(f"✅ 所有图片上传完成!共 {uploaded_count} 张")
upload_success = True
break
# 每秒打印一次进度
if i % 2 == 0:
logger.info(f"等待图片上传... {uploaded_count}/{images_count} ({(i+1)*0.5:.1f}/30秒)")
except Exception as e:
error_msg = str(e)
if 'context was destroyed' in error_msg.lower() or 'navigation' in error_msg.lower():
logger.error(f"检测到页面跳转: {error_msg}")
page_destroyed = True
break
logger.error(f"检测上传状态异常: {e}")
if i > 10:
await asyncio.sleep(1)
# 如果页面被销毁,尝试等待重定向完成
if page_destroyed:
logger.warning("⚠️ 页面发生跳转,等待页面稳定...")
await asyncio.sleep(3)
try:
uploaded_images = await self.page.query_selector_all('img[src*="blob:"], img[src*="data:image"], [class*="image"][class*="item"] img')
uploaded_count = len(uploaded_images)
if uploaded_count >= images_count:
logger.success(f"✅ 页面稳定后确认图片已上传!共 {uploaded_count} 张")
upload_success = True
else:
logger.warning(f"⚠️ 页面稳定后检测到 {uploaded_count}/{images_count} 张图片")
except Exception as e:
logger.error(f"页面稳定后检测失败: {e}")
if upload_success:
logger.success(f"✅ 图片上传成功!共 {uploaded_count} 张")
await asyncio.sleep(0.5) # 优化:0.5秒
# 清理下载的临时文件
for temp_file in downloaded_files:
try:
os.remove(temp_file)
logger.success(f"✅ 已清理临时文件: {temp_file}")
except Exception:
pass
else:
logger.warning(f"⚠️ 仅检测到 {uploaded_count}/{images_count} 张图片,但继续执行...")
clicked = True
break
except Exception:
continue
if not clicked:
logger.warning("⚠️ 未找到任何上传控件,跳过图片上传")
except Exception as e:
logger.error(f"上传图片失败: {str(e)}")
# 不中断流程,继续发布文字
# 输入标题和内容
try:
logger.debug("开始输入文字内容...")
# 查找标题输入框(使用显式等待确保元素可交互)
title_selectors = [
'input[placeholder*="标题"]',
'input[placeholder*="填写标题"]',
'input[placeholder*="曝光"]',
'.title-input',
'[class*="title"] input',
]
title_input = None
# 优化:先用快速query_selector查找
for selector in title_selectors:
try:
title_input = await self.page.query_selector(selector)
if title_input:
# 检查元素是否可见
is_visible = await title_input.is_visible()
if is_visible:
await asyncio.sleep(0.2) # 优化:减少等待时间
logger.info(f"找到标题输入框: {selector}")
break
else:
title_input = None
except Exception:
continue
# 如果快速查找失败,再用wait方式
if not title_input:
for selector in title_selectors:
try:
title_input = await self.page.wait_for_selector(
selector,
state='visible',
timeout=3000 # 优化:减少超时时间
)
if title_input:
await asyncio.sleep(0.2)
logger.info(f"找到标题输入框: {selector}")
break
except Exception:
continue
if title_input:
await title_input.click()
await asyncio.sleep(0.3)
await title_input.fill(title)
logger.success(f"已输入标题: {title}")
else:
logger.warning("未找到标题输入框,可能不需要单独标题")
# 查找内容输入框(正文)(使用显式等待确保元素可交互)
content_selectors = [
'div[contenteditable="true"]',
'div[placeholder*="正文"]',
'div[placeholder*="输入正文"]',
'textarea[placeholder*="输入正文"]',
'textarea[placeholder*="填写笔记内容"]',
'textarea[placeholder*="笔记内容"]',
'[class*="content"] div[contenteditable="true"]',
'[class*="editor"] div[contenteditable="true"]',
'textarea',
]
content_input = None
# 优化:先用快速query_selector查找
for selector in content_selectors:
try:
content_input = await self.page.query_selector(selector)
if content_input:
is_visible = await content_input.is_visible()
if is_visible:
await asyncio.sleep(0.2) # 优化:减少等待时间
logger.info(f"找到内容输入框: {selector}")
break
else:
content_input = None
except Exception:
continue
# 如果快速查找失败,再用wait方式
if not content_input:
for selector in content_selectors:
try:
content_input = await self.page.wait_for_selector(
selector,
state='visible',
timeout=3000 # 优化:减少超时时间
)
if content_input:
await asyncio.sleep(0.2)
logger.info(f"找到内容输入框: {selector}")
break
except Exception:
continue
if content_input:
# 清空并输入内容
await content_input.click()
await asyncio.sleep(0.2) # 优化:减少等待时间
# 检查是否是contenteditable元素
try:
is_contenteditable = await content_input.evaluate('el => el.getAttribute("contenteditable") === "true"')
if is_contenteditable:
# 使用innerText设置内容
await content_input.evaluate(f'el => el.innerText = {json.dumps(content)}')
else:
# 普通textarea
await content_input.fill(content)
except Exception:
# 如果判断失败,尝试直接fill
await content_input.fill(content)
logger.success("已输入笔记内容")
await asyncio.sleep(0.2) # 优化:减少等待时间
# 添加话题标签
if topics:
logger.info(f"添加话题标签: {topics}")
for topic in topics:
# 在内容末尾添加话题
topic_text = f" #{topic}"
try:
is_contenteditable = await content_input.evaluate('el => el.getAttribute("contenteditable") === "true"')
if is_contenteditable:
await content_input.evaluate(f'el => el.innerText += {json.dumps(topic_text)}')
else:
current_value = await content_input.evaluate('el => el.value')
await content_input.fill(current_value + topic_text)
except Exception:
# 如果添加失败,继续下一个
pass
logger.success(f"已添加 {len(topics)} 个话题标签")
await asyncio.sleep(0.5) # 优化:减少等待时间
# 单独在话题输入框中模拟人类方式输入标签
if topics:
logger.info("尝试在话题输入框中逐个输入标签...")
tag_input_selectors = [
'input[placeholder*="话题"]',
'input[placeholder*="#"]',
'input[placeholder*="添加标签"]',
'[class*="tag"] input',
'[class*="topic"] input',
]
tag_input = None
# 优化:先用query_selector快速查找
for selector in tag_input_selectors:
try:
tag_input = await self.page.query_selector(selector)
if tag_input:
logger.info(f"找到话题输入框: {selector}")
break
except Exception:
continue
# 快速查找失败再用wait
if not tag_input:
for selector in tag_input_selectors:
try:
tag_input = await self.page.wait_for_selector(selector, timeout=2000)
if tag_input:
logger.info(f"找到话题输入框: {selector}")
break
except Exception:
continue
if tag_input:
for topic in topics:
try:
await tag_input.click()
await asyncio.sleep(0.2) # 优化:减少等待时间
# 清空已有内容
try:
await tag_input.fill("")
except Exception:
pass
# 优化:使用fill代替type,更快
await tag_input.fill("#" + topic)
await asyncio.sleep(0.5) # 优化:减少等待时间
# 等待联想列表并选择第一项
suggestion = None
suggestion_selectors = [
'[class*="suggest"] li',
'[role="listbox"] li',
'[class*="dropdown"] li',
]
for s_selector in suggestion_selectors:
try:
suggestion = await self.page.query_selector(s_selector)
if suggestion:
break
except Exception:
continue
if suggestion:
await suggestion.click()
logger.success(f"✅ 已选择联想话题: {topic}")
else:
# 没有联想列表时,通过回车确认
await tag_input.press("Enter")
logger.warning(f"✅ 未找到联想列表,使用回车确认话题: {topic}")
await asyncio.sleep(0.3) # 优化:减少等待时间
except Exception as e:
logger.error(f"添加话题 {topic} 到输入框失败: {str(e)}")
else:
logger.warning("⚠️ 未找到话题输入框,已退回到在正文中追加 #话题 的方式")
else:
return {
"success": False,
"error": "未找到内容输入框"
}
except Exception as e:
return {
"success": False,
"error": f"输入内容失败: {str(e)}"
}
# 模拟简单的人类滚动行为
try:
for _ in range(3):
await self.page.mouse.wheel(0, random.randint(200, 500))
await asyncio.sleep(random.uniform(0.3, 0.8))
except Exception:
pass
# 点击发布按钮
try:
logger.debug("查找发布按钮...")
submit_selectors = [
'button:has-text("发布笔记")',
'button:has-text("发布")',
'text="发布笔记"',
'text="发布"',
'.publish-btn',
'.submit-btn',
]
submit_btn = None
for selector in submit_selectors:
try:
submit_btn = await self.page.wait_for_selector(selector, timeout=3000)
if submit_btn:
# 检查按钮是否可点击
is_disabled = await submit_btn.evaluate('el => el.disabled')
if not is_disabled:
logger.info(f"找到发布按钮: {selector}")
break
else:
submit_btn = None
except Exception:
continue
if submit_btn:
# 设置网络监听,捕获发布接口响应
note_id = None
share_link = None
async def handle_response(response):
nonlocal note_id, share_link
try:
# 监听发布笔记的API响应
if '/web_api/sns/v2/note' in response.url:
logger.success(f"✅ 捕获到发布API响应: {response.url}")
if response.status == 200:
try:
data = await response.json()
logger.info(f"API响应数据: {json.dumps(data, ensure_ascii=False)}")
if data.get('success') and data.get('data'):
note_id = data['data'].get('id')
# 优先使用share_link,如果没有则使用note_id拼接
if 'share_link' in data:
share_link = data['share_link']
logger.success(f"✅ 获取到笔记链接: {share_link}")
elif note_id:
share_link = f"https://www.xiaohongshu.com/discovery/item/{note_id}"
logger.success(f"✅ 根据ID生成笔记链接: {share_link}")
except Exception as e:
logger.error(f"解析API响应失败: {str(e)}")
except Exception as e:
logger.error(f"处理响应失败: {str(e)}")
# 添加响应监听器
self.page.on('response', handle_response)
await submit_btn.click()
logger.success("✅ 已点击发布按钮")
await asyncio.sleep(3) # 等待更长时间以捕获API响应
# 检查是否出现社区规范限制提示
logger.info("检查是否有社区规范限制...")
try:
# 尝试查找各种可能的错误提示
error_selectors = [
'text="因违反社区规范禁止发笔记"',
'text*="违反社区规范"',
'text*="禁止发布"',
'text*="账号被限制"',
'text*="账号异常"',
'.error-tip',
'.warning-tip',
'[class*="error"]',
'[class*="warning"]',
]
for selector in error_selectors:
try:
error_el = await self.page.wait_for_selector(selector, timeout=2000)
if error_el:
error_text = await error_el.inner_text()
logger.error(f"❌ 检测到错误提示: {error_text}")
return {
"success": False,
"error": f"发布失败: {error_text}",
"error_type": "community_violation", # 标记错误类型
"message": error_text
}
except Exception:
continue
except Exception as e:
logger.error(f"检查错误提示异常: {str(e)}")
# 检查是否发布成功
logger.info("检查发布结果...")
try:
await asyncio.sleep(2) # 等待发布完成
# 如果捕获到了真实的笔记链接,直接返回
if share_link:
logger.success(f"✅ 发布成功,获取到笔记链接: {share_link}")
# 如果是浏览器池模式且使用了Cookie,关闭发布专用页面
if self.use_pool and self.browser_pool and cookies:
try:
logger.info("[浏览器池模式] 关闭发布专用页面")
await self.page.close()
self.page = None
logger.success("✅ 发布页面已关闭")
except Exception as e:
logger.error(f"⚠️ 关闭页面失败: {str(e)}")
return {
"success": True,
"message": "笔记发布成功",
"data": {
"note_id": note_id,
"note_url": share_link
},
"url": share_link # 保持兼容性
}
# 如果没有捕获到,使用原来的逻辑
# 等待发布成功的提示或页面跳转
success_selectors = [
'text="发布成功"',
'text="发布完成"',
'text*="成功"',
'.success-tip',
'.success-message',
]
publish_success = False
for selector in success_selectors:
try:
success_el = await self.page.wait_for_selector(selector, timeout=3000)
if success_el:
success_text = await success_el.inner_text()
logger.warning(f"✅ 检测到发布成功提示: {success_text}")
publish_success = True
break
except Exception:
continue
# 如果没有明确的成功提示,检查URL是否变化
current_url = self.page.url
if not publish_success:
# 如果还在发布页面,可能是发布失败
if 'publish' in current_url.lower():
logger.warning("⚠️ 未检测到成功提示,但继续执行")
else:
logger.success("✅ URL已变化,似乎发布成功")
publish_success = True
logger.info(f"发布后URL: {current_url}")
# 如果是浏览器池模式且使用了Cookie,关闭发布专用页面和context
if self.use_pool and self.browser_pool and cookies:
try:
logger.info("[浏览器池模式] 关闭发布专用环境")
if self.page:
await self.page.close()
self.page = None
logger.success("✅ 发布页面已关闭")
if self.context:
await self.context.close()
self.context = None
logger.success("✅ 发布context已关闭(预热环境保持不受影响)")
except Exception as e:
logger.error(f"⚠️ 关闭发布环境失败: {str(e)}")
return {
"success": True,
"message": "笔记发布成功",
"url": current_url
}
except Exception as e:
logger.error(f"检查发布结果异常: {str(e)}")
# 如果是浏览器池模式且使用了Cookie,关闭发布专用页面和context
if self.use_pool and self.browser_pool and cookies:
try:
logger.info("[浏览器池模式] 关闭发布专用环境")
if self.page:
await self.page.close()
self.page = None
logger.success("✅ 发布页面已关闭")
if self.context:
await self.context.close()
self.context = None
logger.success("✅ 发布context已关闭(预热环境保持不受影响)")
except Exception as e2:
logger.error(f"⚠️ 关闭发布环境失败: {str(e2)}")
# 即使检查异常,也返回成功(因为按钮已点击)
return {
"success": True,
"message": "笔记已提交发布,但未能确认结果",
"url": self.page.url if self.page else ""
}
else:
return {
"success": False,
"error": "未找到可用的发布按钮,可能内容不完整"
}
except Exception as e:
return {
"success": False,
"error": f"点击发布按钮失败: {str(e)}"
}
except Exception as e:
logger.error(f"发布笔记异常: {str(e)}")
return {
"success": False,
"error": str(e)
}
async def start_qrcode_login(self, login_page: str = "home") -> Dict[str, Any]:
"""
启动小红书首页的扫码登录流程
Args:
login_page: 登录页面类型,默认home(小红书首页)
Returns:
Dict containing qrcode image and status
"""
try:
if not self.page:
await self.init_browser()
# 访问小红书首页
login_url = 'https://www.xiaohongshu.com'
logger.info(f"[扫码登录] 正在访问小红书首页...")
# 强制访问首页,不管当前在哪个页面
try:
# 使用domcontentloaded而不是networkidle,避免等待所有资源加载
await self.page.goto(login_url, wait_until='domcontentloaded', timeout=10000)
current_url = self.page.url
logger.success(f"[扫码登录] 页面加载完成, 当前URL: {current_url}")
# 检查是否跳转到验证码页面
if '/website-login/captcha' in current_url or 'verifyUuid=' in current_url:
logger.warning(f"[扫码登录] 检测到风控验证页面,尝试等待或跳过...")
# 等待30秒,看是否会自动跳过
await asyncio.sleep(30)
current_url = self.page.url
logger.info(f"[扫码登录] 等待30秒后当前URL: {current_url}")
# 如果还在验证码页面,返回错误
if '/website-login/captcha' in current_url or 'verifyUuid=' in current_url:
return {
"success": False,
"error": "当前IP被风控,需要验证。请稍后再试或启用代理。"
}
except Exception as e:
# 即使超时也继续,因为页面可能已经跳转到explore
current_url = self.page.url
if 'xiaohongshu.com' in current_url:
logger.warning(f"[扫码登录] 页面加载超时但已到达小红书页面: {current_url}")
else:
logger.error(f"[扫码登录] 页面加载失败: {str(e)}, 当前URL: {current_url}")
raise e
# 🔥 关键修改: 在explore页面后立即注册路由监听,被动等待二维码创建
qrcode_create_data = None
# 设置路由监听二维码创建 API
async def handle_qrcode_create(route):
nonlocal qrcode_create_data
try:
request = route.request
logger.info(f"[扫码登录] API请求: {request.method} {request.url}")
response = await route.fetch()
body = await response.body()
try:
data = json.loads(body.decode('utf-8'))
logger.info(f"[扫码登录] API响应: {json.dumps(data, ensure_ascii=False)}")
if data.get('code') == 0 and data.get('success') and data.get('data'):
qrcode_create_data = data.get('data')
logger.success(f"[扫码登录] 获取到二维码 qr_id={qrcode_create_data.get('qr_id')}")
except Exception as e:
logger.error(f"[扫码登录] 解析响应失败: {str(e)}")
await route.fulfill(response=response)
except Exception as e:
logger.error(f"[扫码登录] 处理API请求失败: {str(e)}")
await route.continue_()
# 注册路由 (在explore页面后立即注册)
await self.page.route('**/api/sns/web/v1/login/qrcode/create', handle_qrcode_create)
logger.info("[扫码登录] 已注册 API路由监听,等待页面自动触发二维码创建...")
# 被动等待二维码创建 API请求完成(最多等待30秒)
for i in range(300): # 300 * 0.1 = 30秒
if qrcode_create_data:
break
await asyncio.sleep(0.1)
if not qrcode_create_data:
logger.warning("[扫码登录] 30秒内未捕获到二维码创建 API请求,尝试从页面提取二维码")
# 提取二维码和状态(但不检测登录成功,因为这是初始化)
qrcode_result = await self.extract_qrcode_with_status(check_login_success=False)
# 如果获取到二维码创建信息,添加到结果中
if qrcode_create_data:
qrcode_result["qr_id"] = qrcode_create_data.get('qr_id')
qrcode_result["qr_code"] = qrcode_create_data.get('code')
qrcode_result["qr_url"] = qrcode_create_data.get('url')
qrcode_result["multi_flag"] = qrcode_create_data.get('multi_flag')
return qrcode_result
except Exception as e:
logger.error(f"启动扫码登录失败: {str(e)}")
return {
"success": False,
"error": str(e)
}
async def extract_qrcode_with_status(self, check_login_success: bool = True) -> Dict[str, Any]:
"""
提取二维码图片和状态信息,并检测是否扫码成功
Args:
check_login_success: 是否检测登录成功,默认True。start_qrcode_login时传False
Returns:
Dict containing qrcode image, status text, login success and user data
"""
try:
if not self.page:
return {
"success": False,
"error": "浏览器未初始化"
}
result = {
"success": True,
"qrcode_image": "",
"status_text": "",
"status_desc": "",
"is_expired": False,
"login_success": False, # 新增:是否扫码登录成功
"user_info": None,
"cookies": None,
"cookies_full": None,
"login_state": None
}
# 只有在轮询检查时才判断登录成功
if check_login_success:
# 方法1: 监听用户信息API请求(最准确的方式)
user_me_data = None
try:
# 直接请求用户信息API
response = await self.page.evaluate('''
async () => {
try {
const response = await fetch('https://edith.xiaohongshu.com/api/sns/web/v2/user/me', {
method: 'GET',
credentials: 'include'
});
const data = await response.json();
return data;
} catch (error) {
return { error: error.message };
}
}
''')
if response and not response.get('error'):
# 关键修复: 检查是否是游客状态
if response.get('code') == 0 and response.get('success') and response.get('data'):
data = response.get('data')
is_guest = data.get('guest', False)
# 只有非游客状态才算登录成功
if not is_guest and data.get('user_id') and data.get('nickname'):
user_me_data = data
logger.success(f"[扫码登录] 登录成功! user_id={user_me_data.get('user_id')}, nickname={user_me_data.get('nickname')}")
except Exception as e:
logger.error(f"[扫码登录] 请求用户信息 API异常: {str(e)}")
# 如果获取到用户信息,说明登录成功
if user_me_data:
result["login_success"] = True
# 等待页面稳定
await asyncio.sleep(1)
# 获取Cookies
try:
cookies = await self.context.cookies()
cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}
result["cookies"] = cookies_dict
result["cookies_full"] = cookies
except Exception as e:
logger.error(f"[扫码登录] 获取Cookie失败: {str(e)}")
# 构建用户信息(使用API返回的数据 + localStorage)
try:
# 先从 localStorage 获取基础信息
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
storage_dict = json.loads(storage)
user_info = {
# 从 API 响应中提取的信息(最准确)
'user_id': user_me_data.get('user_id'),
'red_id': user_me_data.get('red_id'),
'nickname': user_me_data.get('nickname'),
'desc': user_me_data.get('desc'),
'gender': user_me_data.get('gender'),
'avatar_small': user_me_data.get('images'), # 小头像
'avatar_large': user_me_data.get('imageb'), # 大头像
'is_guest': user_me_data.get('guest', False)
}
# 补充 localStorage 中的其他信息
useful_keys = ['b1', 'b1b1', 'p1']
for key in useful_keys:
if key in storage_dict:
try:
value = storage_dict[key]
if value and value.strip():
user_info[key] = json.loads(value) if value.startswith('{') or value.startswith('[') else value
except:
user_info[key] = storage_dict[key]
result["user_info"] = user_info
except Exception as e:
logger.error(f"[扫码登录] 构建用户信息失败: {str(e)}")
# 即\u4f7f失\u8d25,\u4e5f\u4f7f\u7528API\u8fd4\u56de\u7684\u6570\u636e
result["user_info"] = {
'user_id': user_me_data.get('user_id'),
'red_id': user_me_data.get('red_id'),
'nickname': user_me_data.get('nickname'),
'desc': user_me_data.get('desc'),
'gender': user_me_data.get('gender'),
'avatar_small': user_me_data.get('images'),
'avatar_large': user_me_data.get('imageb'),
'is_guest': user_me_data.get('guest', False)
}
# 获取完整的登录状态
try:
current_url = self.page.url
localStorage_data = {}
sessionStorage_data = {}
try:
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
localStorage_data = json.loads(storage)
except Exception as e:
logger.info(f"\u26a0\ufe0f \u83b7\u53d6localStorage\u5931\u8d25: {str(e)}")
try:
session_storage = await self.page.evaluate('() => JSON.stringify(sessionStorage)')
sessionStorage_data = json.loads(session_storage)
except Exception as e:
logger.info(f"\u26a0\ufe0f \u83b7\u53d6sessionStorage\u5931\u8d25: {str(e)}")
result["login_state"] = {
"cookies": result["cookies_full"],
"localStorage": localStorage_data,
"sessionStorage": sessionStorage_data,
"url": current_url,
"timestamp": time.time()
}
logger.info("\u2705 \u5df2\u6784\u5efa\u5b8c\u6574\u767b\u5f55\u72b6\u6001")
except Exception as e:
logger.info(f"\u26a0\ufe0f \u6784\u5efa\u767b\u5f55\u72b6\u6001\u5931\u8d25: {str(e)}")
return result
# 如果API请求失败,退而求其次使用页面元素检测
logger.info("\u26a0\ufe0f API\u68c0\u6d4b\u5931\u8d25,\u4f7f\u7528\u9875\u9762\u5143\u7d20\u68c0\u6d4b")
current_url = self.page.url
logger.info(f"\u5f53\u524dURL: {current_url}")
# 方法2: 检查\u4e8c\u7ef4\u7801\u662f\u5426\u8fd8\u5728(如\u679c\u4e8c\u7ef4\u7801\u6d88\u5931\u4e86,\u8bf4\u660e\u53ef\u80fd\u767b\u5f55\u4e86)
qrcode_exists = False
try:
qrcode_img = await self.page.query_selector('.qrcode-img')
if qrcode_img:
qrcode_exists = await qrcode_img.is_visible()
except Exception:
pass
# 方法3: 检查\u767b\u5f55\u5f39\u7a97\u662f\u5426\u5173\u95ed
login_modal_closed = True
try:
modal_selectors = [
'.login-container',
'.reds-modal',
'[class*="login-modal"]',
'[class*="LoginModal"]',
]
for selector in modal_selectors:
modal = await self.page.query_selector(selector)
if modal and await modal.is_visible():
login_modal_closed = False
break
except Exception:
pass
# 方法4: 检查\u662f\u5426\u6709\u767b\u5f55\u540e\u7684\u7528\u6237\u4fe1\u606f\u5143\u7d20
has_user_info = False
try:
user_selectors = [
'.user-info',
'.avatar',
'[class*="user"]',
]
for selector in user_selectors:
user_el = await self.page.query_selector(selector)
if user_el and await user_el.is_visible():
has_user_info = True
break
except Exception:
pass
logger.info(f"\u767b\u5f55\u72b6\u6001\u68c0\u6d4b: \u4e8c\u7ef4\u7801\u5b58\u5728={qrcode_exists}, \u767b\u5f55\u6846\u5173\u95ed={login_modal_closed}, \u6709\u7528\u6237\u4fe1\u606f={has_user_info}")
# 综合\u5224\u65ad: \u4e8c\u7ef4\u7801\u6d88\u5931 \u4e14 (\u767b\u5f55\u6846\u5173\u95ed \u6216 \u6709\u7528\u6237\u4fe1\u606f)
if not qrcode_exists and (login_modal_closed or has_user_info):
logger.info("\u2705 \u68c0\u6d4b\u5230\u626b\u7801\u767b\u5f55\u6210\u529f!(\u4e8c\u7ef4\u7801\u5df2\u6d88\u5931)")
result["login_success"] = True
# 等\u5f85\u9875\u9762\u7a33\u5b9a
await asyncio.sleep(1)
# 获\u53d6Cookies
try:
cookies = await self.context.cookies()
cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}
result["cookies"] = cookies_dict
result["cookies_full"] = cookies
logger.info(f"\u2705 \u5df2\u83b7\u53d6 {len(cookies)} \u4e2aCookie")
except Exception as e:
logger.info(f"\u26a0\ufe0f \u83b7\u53d6Cookie\u5931\u8d25: {str(e)}")
# 获\u53d6\u7528\u6237\u4fe1\u606f
try:
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
storage_dict = json.loads(storage)
user_info = {}
useful_keys = ['b1', 'b1b1', 'p1']
for key in useful_keys:
if key in storage_dict:
try:
value = storage_dict[key]
if value and value.strip():
user_info[key] = json.loads(value) if value.startswith('{') or value.startswith('[') else value
except:
user_info[key] = storage_dict[key]
result["user_info"] = user_info
logger.info(f"\u2705 \u5df2\u83b7\u53d6\u7528\u6237\u4fe1\u606f: {list(user_info.keys())}")
except Exception as e:
logger.info(f"\u26a0\ufe0f \u83b7\u53d6\u7528\u6237\u4fe1\u606f\u5931\u8d25: {str(e)}")
# 获\u53d6\u5b8c\u6574\u7684\u767b\u5f55\u72b6\u6001
try:
localStorage_data = {}
sessionStorage_data = {}
try:
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
localStorage_data = json.loads(storage)
except Exception as e:
logger.info(f"\u26a0\ufe0f \u83b7\u53d6localStorage\u5931\u8d25: {str(e)}")
try:
session_storage = await self.page.evaluate('() => JSON.stringify(sessionStorage)')
sessionStorage_data = json.loads(session_storage)
except Exception as e:
logger.info(f"\u26a0\ufe0f \u83b7\u53d6sessionStorage\u5931\u8d25: {str(e)}")
result["login_state"] = {
"cookies": result["cookies_full"],
"localStorage": localStorage_data,
"sessionStorage": sessionStorage_data,
"url": current_url,
"timestamp": time.time()
}
logger.info("\u2705 \u5df2\u6784\u5efa\u5b8c\u6574\u767b\u5f55\u72b6\u6001")
except Exception as e:
logger.info(f"\u26a0\ufe0f \u6784\u5efa\u767b\u5f55\u72b6\u6001\u5931\u8d25: {str(e)}")
return result
# 还在登录页或不检查登录状态,继续提取二维码和状态
# 提取二维码图片
qrcode_selectors = [
'.qrcode-img',
'img.qrcode-img',
'.qrcode img',
'img[src*="data:image"]',
'img[alt*="二维码"]',
]
for selector in qrcode_selectors:
try:
qrcode_img = await self.page.wait_for_selector(selector, timeout=3000)
if qrcode_img:
# 获取src属性
src = await qrcode_img.get_attribute('src')
if src:
if src.startswith('data:image'):
result["qrcode_image"] = src
else:
# 如果是URL,尝试下载转换
try:
async with aiohttp.ClientSession() as session:
async with session.get(src, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
img_data = await response.read()
import base64
img_base64 = base64.b64encode(img_data).decode('utf-8')
content_type = response.headers.get('Content-Type', 'image/png')
result["qrcode_image"] = f"data:{content_type};base64,{img_base64}"
logger.success("✅ 成功下载并转换二维码")
except Exception as e:
logger.error(f"⚠️ 下载二维码失败: {str(e)}")
# 如果还是没有图片,尝试截图
if not result["qrcode_image"]:
try:
screenshot_bytes = await qrcode_img.screenshot()
if screenshot_bytes:
import base64
img_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
result["qrcode_image"] = f"data:image/png;base64,{img_base64}"
logger.success("✅ 成功截取二维码")
except Exception as e:
logger.error(f"⚠️ 截取二维码失败: {str(e)}")
break
except Exception as e:
continue
if not result["qrcode_image"]:
return {
"success": False,
"error": "未找到二维码图片"
}
# 提取状态信息
logger.debug("正在提取二维码状态...")
status_selectors = [
'.status',
'.qrcode-status',
'[class*="status"]',
]
for selector in status_selectors:
try:
status_el = await self.page.query_selector(selector)
if status_el:
# 检查状态是否可见
is_visible = await status_el.is_visible()
if not is_visible:
logger.info("二维码状态元素不可见,说明二维码有效")
result["status_text"] = "" # 空字符串表示正常状态
result["is_expired"] = False
break
logger.success(f"✅ 找到状态元素: {selector}")
# 提取状态文本
status_text_el = await status_el.query_selector('.status-text')
if status_text_el:
status_text = await status_text_el.inner_text()
result["status_text"] = status_text.strip()
logger.info(f"状态文本: {result['status_text']}")
# 提取状态描述
status_desc_el = await status_el.query_selector('.status-desc')
if status_desc_el:
status_desc = await status_desc_el.inner_text()
result["status_desc"] = status_desc.strip()
logger.info(f"状态描述: {result['status_desc']}")
# 判断是否过期
if "过期" in result["status_text"] or "过期" in result["status_desc"]:
result["is_expired"] = True
logger.warning("⚠️ 二维码已过期")
break
except Exception as e:
continue
# 如果没有找到状态元素,说明二维码正常(不设置status_text,小程序端自己显示)
if not result["status_text"]:
result["status_text"] = "" # 空字符串表示正常状态,小程序端不显示覆盖层
result["is_expired"] = False
logger.success(f"✅ 二维码提取完成: 状态={result['status_text']}, 过期={result['is_expired']}, 登录成功={result['login_success']}")
return result
except Exception as e:
logger.error(f"提取二维码状态失败: {str(e)}")
return {
"success": False,
"error": str(e)
}
async def refresh_qrcode(self) -> Dict[str, Any]:
"""
刷新二维码(当二维码过期时点击刷新)
Returns:
Dict containing new qrcode and status
"""
try:
if not self.page:
return {
"success": False,
"error": "浏览器未初始化"
}
# 检查page状态,如果是空白页,需要重新导航到登录页
try:
current_url = self.page.url
logger.info(f"[刷新二维码] 当前URL: {current_url}")
if current_url == 'about:blank' or current_url == '':
logger.warning("[刷新二维码] 检测到空白页,重新导航到explore页面")
await self.page.goto('https://www.xiaohongshu.com/explore', wait_until='networkidle')
await asyncio.sleep(1)
except Exception as e:
logger.error(f"[刷新二维码] 检查page状态异常: {str(e)}")
logger.info("[刷新二维码] 正在刷新...")
# 🔥 关键修改: 先注册路由监听,然后再打开登录弹窗
qrcode_create_data = None
# 设置路由监听二维码创建 API
async def handle_qrcode_create(route):
nonlocal qrcode_create_data
try:
# 记录请求
request = route.request
logger.info(f"[刷新二维码] API请求: {request.method} {request.url}")
response = await route.fetch()
body = await response.body()
try:
data = json.loads(body.decode('utf-8'))
logger.info(f"[刷新二维码] API响应: {json.dumps(data, ensure_ascii=False)}")
if data.get('code') == 0 and data.get('success') and data.get('data'):
qrcode_create_data = data.get('data')
logger.success(f"[刷新二维码] 获取到新二维码 qr_id={qrcode_create_data.get('qr_id')}")
except Exception as e:
logger.error(f"[刷新二维码] 解析响应失败: {str(e)}")
await route.fulfill(response=response)
except Exception as e:
logger.error(f"[刷新二维码] 处理API请求失败: {str(e)}")
await route.continue_()
# 注册路由 (在打开登录页之前)
await self.page.route('**/api/sns/web/v1/login/qrcode/create', handle_qrcode_create)
logger.info("[刷新二维码] 已注册 API路由监听")
# 确保在登录页面或扫码页面
current_url = self.page.url
if 'login' not in current_url.lower():
# 如果不在登录页,先打开登录页
logger.info("[刷新二维码] 不在登录页,先打开登录页")
try:
login_btn = await self.page.wait_for_selector('text="登录"', timeout=3000)
if login_btn:
await login_btn.click()
await asyncio.sleep(1)
except Exception as e:
logger.warning(f"[刷新二维码] 打开登录页失败: {str(e)}")
# 确保切换到扫码登录选项卡
qrcode_tab_selectors = [
'text="扫码登录"',
'div:has-text("扫码登录")',
'text="二维码登录"',
'div:has-text("二维码登录")',
'.qrcode-tab',
'[data-type="qrcode"]',
]
for selector in qrcode_tab_selectors:
try:
qrcode_tab = await self.page.query_selector(selector)
if qrcode_tab:
logger.info("[刷新二维码] 切换到扫码登录模式")
await qrcode_tab.click()
await asyncio.sleep(0.5)
break
except Exception:
continue
# 查找刷新按钮或刷新文本
refresh_selectors = [
'.status-desc.refresh',
'text="点击刷新"',
'.refresh',
'[class*="refresh"]',
]
refresh_clicked = False
for selector in refresh_selectors:
try:
refresh_el = await self.page.query_selector(selector)
if refresh_el:
logger.info(f"[刷新二维码] 找到刷新按钮: {selector}")
await refresh_el.click()
logger.success("[刷新二维码] 已点击刷新")
await asyncio.sleep(1)
refresh_clicked = True
break
except Exception:
continue
if not refresh_clicked:
return {
"success": False,
"error": "未找到刷新按钮"
}
# 等待二维码创建 API请求完成(最多等待 3 秒)
for i in range(30): # 30 * 0.1 = 3秒
if qrcode_create_data:
break
await asyncio.sleep(0.1)
if not qrcode_create_data:
logger.warning("[刷新二维码] 未捕获到二维码创建 API请求")
# 重新提取二维码
qrcode_result = await self.extract_qrcode_with_status(check_login_success=False)
# 如果获取到二维码创建信息,添加到结果中
if qrcode_create_data:
qrcode_result["qr_id"] = qrcode_create_data.get('qr_id')
qrcode_result["qr_code"] = qrcode_create_data.get('code')
qrcode_result["qr_url"] = qrcode_create_data.get('url')
qrcode_result["multi_flag"] = qrcode_create_data.get('multi_flag')
logger.success("[刷新二维码] 已将二维码创建信息添加到返回结果")
return qrcode_result
except Exception as e:
logger.error(f"[刷新二维码] 失败: {str(e)}")
return {
"success": False,
"error": str(e)
}