Files
ai_wht_wechat/backend/xhs_login.py
2026-01-07 12:18:55 +08:00

2522 lines
129 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
小红书登录服务
使用 Playwright 模拟浏览器登录小红书
"""
from playwright.async_api import async_playwright, Browser, Page, BrowserContext
from typing import Dict, Any, Optional
import asyncio
import json
import random
import unicodedata
import sys
import os
import tempfile
import aiohttp
import time
from datetime import datetime
from pathlib import Path
from browser_pool import get_browser_pool
from error_screenshot import save_error_screenshot, save_screenshot_with_html
async def download_image(url: str) -> str:
"""
下载网络图片到临时文件
Args:
url: 图片URL
Returns:
本地文件路径
"""
try:
print(f"下载网络图片: {url}", file=sys.stderr)
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status == 200:
# 获取文件扩展名
ext = '.jpg' # 默认jpg
content_type = response.headers.get('Content-Type', '')
if 'png' in content_type:
ext = '.png'
elif 'jpeg' in content_type or 'jpg' in content_type:
ext = '.jpg'
elif 'webp' in content_type:
ext = '.webp'
# 创建临时文件
temp_dir = Path(tempfile.gettempdir()) / 'xhs_images'
temp_dir.mkdir(exist_ok=True)
temp_file = temp_dir / f"img_{random.randint(10000, 99999)}{ext}"
# 保存图片
with open(temp_file, 'wb') as f:
f.write(await response.read())
print(f"✅ 图片下载成功: {temp_file}", file=sys.stderr)
return str(temp_file)
else:
raise Exception(f"HTTP {response.status}")
except Exception as e:
print(f"⚠️ 下载图片失败: {str(e)}", file=sys.stderr)
raise
class XHSLoginService:
"""小红书登录服务"""
def __init__(self, use_pool: bool = True, headless: bool = True, session_id: Optional[str] = None):
"""
初始化登录服务
Args:
use_pool: 是否使用浏览器池默认True提升性能
headless: 是否使用无头模式False为有头模式方便调试
session_id: 会话ID用于并发隔离不同的session_id会创建独立的浏览器实例
"""
self.use_pool = use_pool
self.headless = headless
self.session_id = session_id # 保存session_id用于并发隔离
self.browser_pool = get_browser_pool(headless=headless) if use_pool else None
self.playwright = None
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self.current_phone = None
async def init_browser(self, cookies: Optional[list] = None, proxy: Optional[str] = None, user_agent: Optional[str] = None, restore_state: bool = False):
"""
初始化浏览器
Args:
cookies: 可选的Cookie列表用于恢复登录状态
proxy: 可选的代理地址,例如 http://user:pass@ip:port
user_agent: 可选的自定义User-Agent
restore_state: 是否从log_state.json文件恢复完整登录状态
"""
try:
# 如果要求恢复状态,先加载 login_state.json
login_state = None
if restore_state and os.path.exists('login_state.json'):
try:
with open('login_state.json', 'r', encoding='utf-8') as f:
login_state = json.load(f)
print("✅ 加载到保存的登录状态", file=sys.stderr)
# 使用保存的配置
cookies = login_state.get('cookies', cookies)
if not user_agent and login_state.get('user_agent'):
user_agent = login_state['user_agent']
except Exception as e:
print(f"⚠️ 加载登录状态失败: {str(e)}", file=sys.stderr)
# 使用浏览器池
if self.use_pool and self.browser_pool:
print(f"[浏览器池模式] 从浏览器池获取实例 (session_id={self.session_id}, headless={self.headless})", file=sys.stderr)
self.browser, self.context, self.page = await self.browser_pool.get_browser(
cookies=cookies, proxy=proxy, user_agent=user_agent, session_id=self.session_id,
headless=self.headless # 传递headless参数
)
# 如果有localStorage/sessionStorage恢复它们
if login_state:
await self._restore_storage(login_state)
print("浏览器初始化成功(池模式)", file=sys.stderr)
return
# 传统模式(每次新建)
print("[传统模式] 创建新浏览器实例", file=sys.stderr)
# Windows环境下需要设置事件循环策略
if sys.platform == 'win32':
try:
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
except Exception as e:
print(f"警告: 设置事件循环策略失败: {str(e)}", file=sys.stderr)
self.playwright = await async_playwright().start()
# 启动浏览器使用chromium
# headless=True 在服务器环境下运行,不显示浏览器界面
launch_kwargs = {
"headless": self.headless, # 使用配置的headless参数
"args": [
'--disable-blink-features=AutomationControlled',
'--disable-infobars',
'--no-first-run',
'--no-default-browser-check',
],
}
if proxy:
launch_kwargs["proxy"] = {"server": proxy}
self.browser = await self.playwright.chromium.launch(**launch_kwargs)
# 创建浏览器上下文,模拟真实用户
context_kwargs = {
"viewport": login_state.get('viewport') if login_state else {'width': 1280, 'height': 720},
"user_agent": user_agent or 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
}
self.context = await self.browser.new_context(**context_kwargs)
# 添加初始化脚本,隐藏自动化特征
await self.context.add_init_script("""
// 移除webdriver标记
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// 阻止检测自动化调试端口
window.chrome = {
runtime: {}
};
// 阻止检测Chrome DevTools Protocol
const originalFetch = window.fetch;
window.fetch = function(...args) {
const url = args[0];
// 阻止小红书检测本地调试端口
if (typeof url === 'string' && (
url.includes('127.0.0.1:9222') ||
url.includes('127.0.0.1:54345') ||
url.includes('localhost:9222') ||
url.includes('chrome-extension://invalid')
)) {
return Promise.reject(new Error('blocked'));
}
return originalFetch.apply(this, args);
};
// 阻止XMLHttpRequest检测
const originalXHROpen = XMLHttpRequest.prototype.open;
XMLHttpRequest.prototype.open = function(...args) {
const url = args[1];
if (typeof url === 'string' && (
url.includes('127.0.0.1:9222') ||
url.includes('127.0.0.1:54345') ||
url.includes('localhost:9222') ||
url.includes('chrome-extension://invalid')
)) {
throw new Error('blocked');
}
return originalXHROpen.apply(this, args);
};
// 添加chrome.app
Object.defineProperty(window, 'chrome', {
get: () => ({
app: {
isInstalled: false,
},
webstore: {
onInstallStageChanged: {},
onDownloadProgress: {},
},
runtime: {
PlatformOs: {
MAC: 'mac',
WIN: 'win',
ANDROID: 'android',
CROS: 'cros',
LINUX: 'linux',
OPENBSD: 'openbsd',
},
PlatformArch: {
ARM: 'arm',
X86_32: 'x86-32',
X86_64: 'x86-64',
},
PlatformNaclArch: {
ARM: 'arm',
X86_32: 'x86-32',
X86_64: 'x86-64',
},
RequestUpdateCheckStatus: {
THROTTLED: 'throttled',
NO_UPDATE: 'no_update',
UPDATE_AVAILABLE: 'update_available',
},
OnInstalledReason: {
INSTALL: 'install',
UPDATE: 'update',
CHROME_UPDATE: 'chrome_update',
SHARED_MODULE_UPDATE: 'shared_module_update',
},
OnRestartRequiredReason: {
APP_UPDATE: 'app_update',
OS_UPDATE: 'os_update',
PERIODIC: 'periodic',
},
},
}),
configurable: true,
});
// 模拟permissions
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
// 添加plugins
Object.defineProperty(navigator, 'plugins', {
get: () => [
{
0: {type: "application/x-google-chrome-pdf", suffixes: "pdf", description: "Portable Document Format"},
description: "Portable Document Format",
filename: "internal-pdf-viewer",
length: 1,
name: "Chrome PDF Plugin"
},
{
0: {type: "application/pdf", suffixes: "pdf", description: ""},
description: "",
filename: "mhjfbmdgcfjbbpaeojofohoefgiehjai",
length: 1,
name: "Chrome PDF Viewer"
},
{
0: {type: "application/x-nacl", suffixes: "", description: "Native Client Executable"},
1: {type: "application/x-pnacl", suffixes: "", description: "Portable Native Client Executable"},
description: "",
filename: "internal-nacl-plugin",
length: 2,
name: "Native Client"
}
],
});
""")
print("✅ 已注入反检测脚本", file=sys.stderr)
# 如果提供了Cookies注入到浏览器上下文
if cookies:
await self.context.add_cookies(cookies)
print(f"已注入 {len(cookies)} 个Cookie", file=sys.stderr)
# 创建新页面
self.page = await self.context.new_page()
# 使用Playwright路由拦截直接阻止小红书的检测请求
async def block_detection_requests(route, request):
url = request.url
# 阻止所有检测自动化的请求
if any([
'127.0.0.1:9222' in url,
'127.0.0.1:54345' in url,
'localhost:9222' in url,
'chrome-extension://invalid' in url,
'chrome-extension://bla' in url,
]):
await route.abort()
else:
await route.continue_()
# 注册路由拦截,匹配所有请求
await self.page.route('**/*', block_detection_requests)
print("✅ 已启用请求拦截,阻止检测自动化", file=sys.stderr)
# 添加页面跳转监控,检测无限跳转
self.redirect_count = 0
self.last_redirect_time = 0
async def on_response(response):
"""监控页面响应,检测重定向循环"""
if response.status in [301, 302, 303, 307, 308]:
import time
current_time = time.time()
if current_time - self.last_redirect_time < 1: # 1秒内连续重定向
self.redirect_count += 1
if self.redirect_count > 5:
print(f"⚠️ 检测到频繁重定向 ({self.redirect_count}次),可能是无限跳转", file=sys.stderr)
else:
self.redirect_count = 0
self.last_redirect_time = current_time
self.page.on('response', on_response)
# 如果有localStorage/sessionStorage恢复它们
if login_state:
await self._restore_storage(login_state)
print("浏览器初始化成功(传统模式)", file=sys.stderr)
except Exception as e:
print(f"浏览器初始化失败: {str(e)}", file=sys.stderr)
raise
async def _restore_storage(self, login_state: dict):
"""恢夏localStorage和sessionStorage"""
try:
# 首先访问小红书的任意页面以便注入storage
target_url = login_state.get('url', 'https://www.xiaohongshu.com')
print(f"正在访问 {target_url} 以注入storage...", file=sys.stderr)
# 设置更短的超时时间,避免长时间等待
try:
await self.page.goto(target_url, wait_until='domcontentloaded', timeout=15000)
await asyncio.sleep(1)
# 检查是否被重定向到登录页
current_url = self.page.url
if 'login' in current_url.lower():
print("⚠️ 检测到被重定向到登录页跳过storage恢复", file=sys.stderr)
return
except Exception as e:
print(f"⚠️ 访问页面失败: {str(e)}跳过storage恢复", file=sys.stderr)
return
# 恢夏localStorage
if login_state.get('localStorage'):
for key, value in login_state['localStorage'].items():
try:
await self.page.evaluate(f'localStorage.setItem("{key}", {json.dumps(value)})')
except Exception as e:
print(f"⚠️ 设置localStorage {key} 失败: {str(e)}", file=sys.stderr)
print(f"✅ 已恢复 {len(login_state['localStorage'])} 个localStorage项", file=sys.stderr)
# 恢夏sessionStorage
if login_state.get('sessionStorage'):
for key, value in login_state['sessionStorage'].items():
try:
await self.page.evaluate(f'sessionStorage.setItem("{key}", {json.dumps(value)})')
except Exception as e:
print(f"⚠️ 设置sessionStorage {key} 失败: {str(e)}", file=sys.stderr)
print(f"✅ 已恢复 {len(login_state['sessionStorage'])} 个sessionStorage项", file=sys.stderr)
except Exception as e:
print(f"⚠️ 恢夏storage失败: {str(e)}", file=sys.stderr)
async def init_browser_with_storage_state(self, storage_state_path: str, proxy: Optional[str] = None):
"""
使用Playwright原生storage_state初始化浏览器最优方案
Args:
storage_state_path: storage_state文件路径
proxy: 可选的代理地址
"""
try:
if not os.path.exists(storage_state_path):
raise Exception(f"storage_state文件不存在: {storage_state_path}")
print(f"✅ 使用 storage_state 初始化浏览器: {storage_state_path}", file=sys.stderr)
# Windows环境下需要设置事件循环策略
if sys.platform == 'win32':
try:
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
except Exception as e:
print(f"警告: 设置事件循环策略失败: {str(e)}", file=sys.stderr)
self.playwright = await async_playwright().start()
# 启动浏览器
launch_kwargs = {
"headless": self.headless,
"args": [
'--disable-blink-features=AutomationControlled',
'--disable-infobars',
'--no-first-run',
'--no-default-browser-check',
],
}
if proxy:
launch_kwargs["proxy"] = {"server": proxy}
self.browser = await self.playwright.chromium.launch(**launch_kwargs)
# 使用storage_state创建上下文Playwright原生API
self.context = await self.browser.new_context(storage_state=storage_state_path)
print(f"✅ 已使用 storage_state 创建浏览器上下文", file=sys.stderr)
# 添加反检测脚本
await self.context.add_init_script("""
// 移除webdriver标记
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// 阻止检测自动化调试端口
window.chrome = {
runtime: {}
};
""")
print("✅ 已注入反检测脚本", file=sys.stderr)
# 创建页面
self.page = await self.context.new_page()
# 添加请求拦截
async def block_detection_requests(route, request):
url = request.url
if any([
'127.0.0.1:9222' in url,
'127.0.0.1:54345' in url,
'localhost:9222' in url,
'chrome-extension://invalid' in url,
]):
await route.abort()
else:
await route.continue_()
await self.page.route('**/*', block_detection_requests)
print("✅ 已启用请求拦截,阻止检测自动化", file=sys.stderr)
print("✅ 浏览器初始化成功storage_state模式", file=sys.stderr)
except Exception as e:
print(f"浏览器初始化失败: {str(e)}", file=sys.stderr)
raise
async def close_browser(self):
"""关闭浏览器(池模式下不关闭,仅清理引用)"""
try:
# 浏览器池模式:不关闭浏览器,保持复用
if self.use_pool and self.browser_pool:
print("[浏览器池模式] 保留浏览器实例供下次复用", file=sys.stderr)
# 仅清理当前服务的引用,浏览器池保持运行
self.browser = None
self.context = None
self.page = None
return
# 传统模式:完全关闭
print("[传统模式] 完全关闭浏览器", file=sys.stderr)
if self.page:
await self.page.close()
if self.context:
await self.context.close()
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
print("浏览器已关闭", file=sys.stderr)
except Exception as e:
print(f"关闭浏览器异常: {str(e)}", file=sys.stderr)
async def extract_verification_qrcode(self) -> Optional[str]:
"""
提取验证页面的二维码图片
Returns:
二维码图片的base64数据如果提取失败则返回none
"""
try:
if not self.page:
return None
print("正在提取验证二维码...", file=sys.stderr)
# 尝试查找二维码图片元素
qrcode_selectors = [
'.qrcode-img', # 根据您提供的HTML
'img.qrcode-img',
'.qrcode-container img',
'img[src*="data:image"]', # base64图片
'img[src*="qrcode"]',
'img[alt*="二维码"]',
'img[alt*="qrcode"]',
]
for selector in qrcode_selectors:
try:
qrcode_img = await self.page.wait_for_selector(selector, timeout=3000)
if qrcode_img:
print(f"✅ 找到二维码图片: {selector}", file=sys.stderr)
# 获取图片src属性
src = await qrcode_img.get_attribute('src')
if src:
# 如果是base64格式直接返回
if src.startswith('data:image'):
print("✅ 二维码已是base64格式直接返回", file=sys.stderr)
return src
# 如果是URL尝试下载并转换为base64
print(f"二维码是URL格式: {src[:100]}...", file=sys.stderr)
try:
async with aiohttp.ClientSession() as session:
async with session.get(src, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
img_data = await response.read()
import base64
img_base64 = base64.b64encode(img_data).decode('utf-8')
# 根据内容类型确定格式
content_type = response.headers.get('Content-Type', 'image/png')
base64_str = f"data:{content_type};base64,{img_base64}"
print("✅ 成功下载并转换为base64", file=sys.stderr)
return base64_str
except Exception as e:
print(f"⚠️ 下载二维码图片失败: {str(e)}", file=sys.stderr)
# 如果src方法失败尝试截图
print("尝试截取二维码区域...", file=sys.stderr)
screenshot_bytes = await qrcode_img.screenshot()
if screenshot_bytes:
import base64
img_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
base64_str = f"data:image/png;base64,{img_base64}"
print("✅ 成功截取二维码并转换为base64", file=sys.stderr)
return base64_str
break
except Exception as e:
print(f"尝试选择器 {selector} 失败: {str(e)}", file=sys.stderr)
continue
print("⚠️ 未找到二维码图片", file=sys.stderr)
return None
except Exception as e:
print(f"⚠️ 提取二维码失败: {str(e)}", file=sys.stderr)
return None
async def send_verification_code(self, phone: str, country_code: str = "+86", login_page: str = "creator") -> Dict[str, Any]:
"""
发送验证码
Args:
phone: 手机号
country_code: 国家区号
login_page: 登录页面类型creator(创作者中心) 或 home(小红书首页)
Returns:
Dict containing success status and error message if any
"""
try:
if not self.page:
await self.init_browser()
self.current_phone = phone
# 根据login_page参数选择登录URL
if login_page == "home":
login_url = 'https://www.xiaohongshu.com'
page_name = "小红书首页"
else:
login_url = 'https://creator.xiaohongshu.com/login'
page_name = "创作者中心"
# 优化:如果浏览器已预热且在登录页,直接使用
current_url = self.page.url if self.page else ""
if self.use_pool and self.browser_pool and self.browser_pool.is_preheated:
if login_url in current_url:
print(f"✅ 浏览器已预热在{page_name}登录页,直接使用!", file=sys.stderr)
else:
# 页面变了,重新访问登录页
print(f"[预热] 页面已变更 ({current_url}),重新访问{page_name}登录页...", file=sys.stderr)
await self.page.goto(login_url, wait_until='networkidle', timeout=30000)
await asyncio.sleep(0.5)
else:
# 未预热或不是池模式,正常访问页面
print(f"正在访问{page_name}登录页...", file=sys.stderr)
# 优化超时时间缩短到30秒使用networkidle提升加载速度
try:
await self.page.goto(login_url, wait_until='networkidle', timeout=30000)
print("✅ 页面加载完成", file=sys.stderr)
except Exception as e:
print(f"页面加载超时,尝试继续: {str(e)}", file=sys.stderr)
# 超时后等待500ms让关键元素加载
await asyncio.sleep(0.5)
print(f"✅ 已进入{page_name}登录页面", file=sys.stderr)
# 根据登录页面类型处理协议复选框
if login_page == "home":
# 小红书首页需要主动触发登录框
print("处理小红书首页登录流程...", file=sys.stderr)
try:
# 首先尝试触发登录框(点击登录按钮)
print("查找并点击登录按钮以弹出登录框...", file=sys.stderr)
login_trigger_selectors = [
'.login', # 常见的登录按钮class
'text="登录"',
'button:has-text("登录")',
'a:has-text("登录")',
'.header-login',
'[class*="login"]',
]
login_triggered = False
for selector in login_trigger_selectors:
try:
login_btn = await self.page.query_selector(selector)
if login_btn:
# 检查是否可见
is_visible = await login_btn.is_visible()
if is_visible:
print(f"✅ 找到登录触发按钮: {selector}", file=sys.stderr)
await login_btn.click()
print("✅ 已点击登录按钮,等待登录框弹出...", file=sys.stderr)
await asyncio.sleep(0.5) # 从1秒减少到0.5秒
login_triggered = True
break
except Exception as e:
print(f"尝试选择器 {selector} 失败: {str(e)}", file=sys.stderr)
continue
if not login_triggered:
print("⚠️ 未找到登录触发按钮,假设登录框已存在", file=sys.stderr)
# 等待登录弹窗中的元素加载
print("等待登录弹窗中的元素加载...", file=sys.stderr)
# 直接等待手机号输入框出现(说明登录框已弹出)
phone_input_ready = False
try:
await self.page.wait_for_selector('input[placeholder="输入手机号"]', timeout=3000) # 从to 8秒减少到3秒
phone_input_ready = True
print("✅ 登录弹窗已弹出,手机号输入框就绪", file=sys.stderr)
except Exception:
print("⚠️ 等待登录弹窗超时,尝试继续...", file=sys.stderr)
# 检查是否需要点击“手机号登录”选项卡(如果有多个登录方式)
phone_login_tab_selectors = [
'text="手机号登录"',
'div:has-text("手机号登录")',
'.title:has-text("手机号登录")',
]
phone_login_tab = None
for selector in phone_login_tab_selectors:
try:
phone_login_tab = await self.page.query_selector(selector)
if phone_login_tab:
# 检查是否已经选中
is_active = await phone_login_tab.evaluate('el => el.classList.contains("active") || el.parentElement.classList.contains("active")')
if not is_active:
print(f"✅ 找到手机号登录选项卡: {selector}", file=sys.stderr)
await phone_login_tab.click()
print("✅ 已点击手机号登录选项卡", file=sys.stderr)
await asyncio.sleep(0.3) # 从0.5秒减少到0.3秒
else:
print("✅ 手机号登录选项卡已选中", file=sys.stderr)
break
except Exception:
continue
if not phone_login_tab:
print("✅ 未找到手机号登录选项卡,可能已经是手机号登录界面", file=sys.stderr)
# 查找并点击协议复选框(小红书首页特有)
agreement_selectors = [
'.agree-icon',
'.agreements .icon-wrapper',
'span.agree-icon',
'.icon-wrapper',
]
agreement_checkbox = None
for selector in agreement_selectors:
agreement_checkbox = await self.page.query_selector(selector)
if agreement_checkbox:
# 检查是否已勾选
is_checked = await agreement_checkbox.evaluate('el => el.classList.contains("checked") || el.querySelector(".checked") !== null')
if not is_checked:
print(f"✅ 找到协议复选框: {selector}", file=sys.stderr)
await agreement_checkbox.click()
print("✅ 已勾选协议", file=sys.stderr)
await asyncio.sleep(0.2)
else:
print("✅ 协议已勾选", file=sys.stderr)
break
if not agreement_checkbox:
print("⚠️ 未找到协议复选框,尝试继续...", file=sys.stderr)
except Exception as e:
print(f"处理首页登录流程失败: {str(e)}", file=sys.stderr)
else:
# 创作者中心登录流程
# 根据记忆:小红书登录跳过协议复选框,无需处理
# 优化:简化协议处理,减少等待时间
try:
agreement_btn = await self.page.query_selector('text="同意并继续"')
if agreement_btn:
await agreement_btn.click()
print(f"✅ 已点击协议按钮", file=sys.stderr)
await asyncio.sleep(0.3)
except Exception:
pass # 无协议弹窗(正常情况)
# 输入手机号
try:
print("查找手机号输入框...", file=sys.stderr)
# 根据登录页面类型选择不同的选择器
if login_page == "home":
# 小红书首页的手机号输入框(已经在上面等待过了)
phone_input_selectors = [
'input[placeholder="输入手机号"]',
'label.phone input',
'input[name="blur"]',
'input[type="text"]',
]
else:
# 创作者中心的手机号输入框
phone_input_selectors = [
'input[placeholder="手机号"]',
'input.css-nt440g',
'input[placeholder*="手机号"]',
'input[type="tel"]',
'input[type="text"]',
]
# 优化:直接查找,不重试(因为已经等待过元素就绪)
phone_input = None
for selector in phone_input_selectors:
phone_input = await self.page.query_selector(selector)
if phone_input:
print(f"✅ 找到手机号输入框: {selector}", file=sys.stderr)
# 清空并输入手机号使用原生JS避免上下文销毁
await self.page.evaluate(f'''
(selector) => {{
const input = document.querySelector(selector);
if (input) {{
input.value = '';
input.focus();
input.value = '{phone}';
input.dispatchEvent(new Event('input', {{ bubbles: true }}));
input.dispatchEvent(new Event('change', {{ bubbles: true }}));
}}
}}
''', selector)
print(f"✅ 已输入手机号: {phone}", file=sys.stderr)
await asyncio.sleep(0.3)
break
if not phone_input:
# 打印页面信息用于调试
print("⚠️ 未找到手机号输入框,打印页面信息...", file=sys.stderr)
print(f"页面URL: {self.page.url}", file=sys.stderr)
# 查找所有input元素
inputs = await self.page.query_selector_all('input')
print(f"页面上找到 {len(inputs)} 个input元素", file=sys.stderr)
for i, inp in enumerate(inputs[:5]):
try:
placeholder = await inp.get_attribute('placeholder')
input_type = await inp.get_attribute('type')
name = await inp.get_attribute('name')
class_name = await inp.get_attribute('class')
print(f"Input {i+1}: type={input_type}, placeholder={placeholder}, name={name}, class={class_name}", file=sys.stderr)
except Exception:
pass
return {
"success": False,
"error": "未找到手机号输入框,请检查页面是否正确加载"
}
except Exception as e:
# 保存错误截图
await save_error_screenshot(
self.page,
"send_code_input_phone_failed",
f"输入手机号失败: {str(e)}"
)
return {
"success": False,
"error": f"输入手机号失败: {str(e)}"
}
# 点击发送验证码按钮
try:
print("查找发送验证码按钮...", file=sys.stderr)
# 等待页面稳定(输入手机号后可能有动态渲染)
await asyncio.sleep(0.3) # 从0.5秒减少到0.3秒
# 根据登录页面类型选择不同的选择器
if login_page == "home":
# 小红书首页的验证码按钮
selectors = [
'span.code-button',
'.code-button',
'text="获取验证码"',
'span:has-text("获取验证码")',
]
else:
# 创作者中心的验证码按钮
selectors = [
'div.css-uyobdj',
'text="发送验证码"',
'div:has-text("发送验证码")',
'text="重新发送"',
'text="获取验证码"',
]
# 直接查找,不重试
send_code_btn = None
for selector in selectors:
send_code_btn = await self.page.query_selector(selector)
if send_code_btn:
print(f"✅ 找到发送验证码按钮: {selector}", file=sys.stderr)
break
if send_code_btn:
# 获取按钮文本内容
btn_text = await send_code_btn.inner_text()
btn_text = btn_text.strip() if btn_text else ""
print(f"📝 按钮文本: '{btn_text}'", file=sys.stderr)
# 检查按钮是否处于倒计时状态
# 倒计时状态通常显示为: "59s", "58s", "60秒后重新获取" 等
if btn_text and (btn_text[-1] == 's' or '' in btn_text or btn_text.isdigit()):
print(f"⚠️ 按钮处于倒计时状态: {btn_text}", file=sys.stderr)
return {
"success": False,
"error": f"验证码发送过于频繁,请{btn_text}后再试"
}
# 检查按钮文本是否为期望的"获取验证码"或"发送验证码"
expected_texts = ["获取验证码", "发送验证码", "重新发送"]
if btn_text not in expected_texts:
print(f"⚠️ 按钮文本不符合预期: '{btn_text}', 期望: {expected_texts}", file=sys.stderr)
return {
"success": False,
"error": f"按钮状态异常(当前文本: {btn_text}),请刷新页面重试"
}
# 检查按钮是否有 active 类小红书首页的按钮需要active才能点击
if login_page == "home":
class_name = await send_code_btn.get_attribute('class') or ""
if 'active' not in class_name:
print(f"⚠️ 按钮未激活状态: class={class_name}", file=sys.stderr)
return {
"success": False,
"error": "按钮未激活,请检查手机号是否正确输入"
}
print(f"✅ 按钮已激活: class={class_name}", file=sys.stderr)
# 点击按钮
await send_code_btn.click()
print("✅ 已点击发送验证码", file=sys.stderr)
# 等待页面响应,检测是否出现验证二维码
await asyncio.sleep(1.5)
# 检查当前页面URL是否包含captcha验证页面
current_url = self.page.url
if 'captcha' in current_url or 'verify' in current_url:
print(f"⚠️ 检测到验证页面: {current_url}", file=sys.stderr)
# 尝试提取二维码图片
qrcode_data = await self.extract_verification_qrcode()
if qrcode_data:
print("✅ 成功提取验证二维码", file=sys.stderr)
return {
"success": False,
"need_captcha": True,
"captcha_type": "qrcode",
"qrcode_image": qrcode_data,
"message": "需要扫码验证请使用小红书APP扫描二维码"
}
else:
return {
"success": False,
"need_captcha": True,
"captcha_type": "unknown",
"message": "出现验证码验证,请稍后重试"
}
# 直接返回成功,不再检测滑块
print("\n✅ 验证码发送流程完成,请查看手机短信", file=sys.stderr)
print("请在小程序中输入收到的验证码并点击登录\n", file=sys.stderr)
print("[响应即将返回] success=True, message=验证码发送成功", file=sys.stderr)
return {
"success": True,
"message": "验证码发送成功,请查看手机短信"
}
else:
return {
"success": False,
"error": "未找到发送验证码按钮,请检查页面结构"
}
except Exception as e:
# 保存错误截图
await save_error_screenshot(
self.page,
"send_code_click_button_failed",
f"点击发送验证码失败: {str(e)}"
)
return {
"success": False,
"error": f"点击发送验证码失败: {str(e)}"
}
except Exception as e:
error_msg = str(e)
print(f"\n❌ 发送验证码异常: {error_msg}", file=sys.stderr)
print(f"当前页面URL: {self.page.url if self.page else 'N/A'}", file=sys.stderr)
# 打印调试信息
if self.page:
try:
print("尝试截图保存错误状态...", file=sys.stderr)
await self.page.screenshot(path='error_screenshot.png')
print("✅ 错误状态已截图保存到 error_screenshot.png", file=sys.stderr)
except Exception:
pass
return {
"success": False,
"error": error_msg
}
async def login(self, phone: str, code: str, country_code: str = "+86", login_page: str = "creator") -> Dict[str, Any]:
"""
使用验证码登录
Args:
phone: 手机号
code: 验证码
country_code: 国家区号
login_page: 登录页面类型creator(创作者中心) 或 home(小红书首页)
Returns:
Dict containing login result, user info and cookies
"""
try:
if not self.page:
return {
"success": False,
"error": "页面未初始化,请先发送验证码"
}
# 输入验证码
try:
print("查找验证码输入框...", file=sys.stderr)
# 根据登录页面类型选择不同的选择器
if login_page == "home":
# 小红书首页的验证码输入框
code_input_selectors = [
'input[placeholder="输入验证码"]', # 从您提供的HTML中找到
'label.auth-code input',
'input[type="number"]',
'input[placeholder*="验证码"]',
]
else:
# 创作者中心的验证码输入框
code_input_selectors = [
'input[placeholder="验证码"]', # 根据HTML精确匹配
'input.css-1ge5flv', # 根据HTML中的class
'input[placeholder*="验证码"]',
'input[type="text"]:not([placeholder*="手机"])',
]
code_input = None
for selector in code_input_selectors:
try:
code_input = await self.page.wait_for_selector(selector, timeout=2000)
if code_input:
print(f"✅ 找到验证码输入框: {selector}", file=sys.stderr)
break
except Exception:
continue
if not code_input:
return {
"success": False,
"error": "未找到验证码输入框"
}
await code_input.click()
await asyncio.sleep(0.2)
await code_input.press('Control+A')
await code_input.type(code, delay=50)
print(f"✅ 已输入验证码: {code}", file=sys.stderr)
await asyncio.sleep(0.5)
except Exception as e:
return {
"success": False,
"error": f"输入验证码失败: {str(e)}"
}
# 点击登录按钮
try:
print("查找登录按钮...", file=sys.stderr)
# 根据登录页面类型选择不同的选择器
if login_page == "home":
# 小红书首页的登录按钮
login_btn_selectors = [
'button.submit', # 从您提供的HTML中找到
'button:has-text("登录")',
'text="登录"',
'.submit',
]
else:
# 创作者中心的登录按钮
login_btn_selectors = [
'button.beer-login-btn', # 根据HTML中的class
'button.css-y4h4ay', # 根据HTML
'button:has-text("登 录")', # 注意有空格
'button:has-text("登录")',
'text="登 录"',
'text="登录"',
'.login-button',
]
login_btn = None
for selector in login_btn_selectors:
try:
login_btn = await self.page.wait_for_selector(selector, timeout=2000)
if login_btn:
print(f"✅ 找到登录按钮: {selector}", file=sys.stderr)
break
except Exception:
continue
if not login_btn:
# 打印所有按钮用于调试
print("⚠️ 未找到登录按钮,打印所有按钮...", file=sys.stderr)
buttons = await self.page.query_selector_all('button')
print(f"页面上找到 {len(buttons)} 个按钮", file=sys.stderr)
for i, btn in enumerate(buttons[:10]):
try:
text = await btn.inner_text()
classes = await btn.get_attribute('class')
print(f"按钮 {i+1}: 文本=[{text.strip()}] class=[{classes}]", file=sys.stderr)
except Exception:
pass
return {
"success": False,
"error": "未找到登录按钮"
}
await login_btn.click()
print("✅ 已点击登录按钮", file=sys.stderr)
# 优化:简化协议处理,减少等待
await asyncio.sleep(0.5)
try:
popup_btn = await self.page.query_selector('text="同意并继续"')
if popup_btn:
await popup_btn.click()
print(f"✅ 已点击登录后的协议弹窗", file=sys.stderr)
await asyncio.sleep(0.3)
except Exception:
pass # 无弹窗
# 优化直接检测URL跳转不等待元素
print("正在等待登录跳转...", file=sys.stderr)
for i in range(16): # 从20次减少到16次最多等待8秒
await asyncio.sleep(0.5)
current_url = self.page.url
# 严格检查:必须跳转离开登录页
if 'login' not in current_url:
# 已离开登录页,检查是否到达有效页面
if 'creator.xiaohongshu.com' in current_url or 'www.xiaohongshu.com' in current_url:
print(f"✅ 登录成功,跳转到: {current_url}", file=sys.stderr)
# 优化:减少等待时间
await asyncio.sleep(0.5) # 从1秒减少到0.5秒
break
else:
# 8秒后还在登录页可能验证码错误
if 'login' in self.page.url:
# 保存错误截图
await save_error_screenshot(
self.page,
"login_failed_wrong_code",
"登录失败,验证码可能错误"
)
return {
"success": False,
"error": "登录失败,请检查验证码是否正确"
}
except Exception as e:
# 保存错误截图
await save_error_screenshot(
self.page,
"login_click_button_failed",
f"点击登录按钮失败: {str(e)}"
)
return {
"success": False,
"error": f"点击登录按钮失败: {str(e)}"
}
# 检查是否登录成功
# 优化已经通过URL跳转检查但需要再次确认页面稳定
print("✅ 登录成功,正在确认页面稳定性...", file=sys.stderr)
# 优化:减少等待时间
await asyncio.sleep(1) # 从2秒减少到1秒
final_url = self.page.url
if 'login' in final_url:
print("⚠️ 检测到页面被重定向回登录页Cookie可能被小红书拒绝", file=sys.stderr)
await save_error_screenshot(
self.page,
"login_redirect_back",
"登录后被重定向回登录页"
)
return {
"success": False,
"error": "登录失败:小红书检测到异常登录行为,请稍后再试或使用手动登录"
}
print(f"✅ 页面稳定最终URL: {final_url}", file=sys.stderr)
# 获取Cookies
cookies = await self.context.cookies()
# 注意:这里返回两种格式
# 1. cookies_dict: 键值对格式(用于 API 返回,方便前端展示)
# 2. cookies: Playwright 完整格式(用于保存文件和后续使用)
cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}
# 打印重要的Cookies
print(f"\n========== Cookies 信息 ==========", file=sys.stderr)
print(f"共获取到 {len(cookies)} 个Cookie", file=sys.stderr)
# 打印所有Cookie名称
print(f"\nCookie名称列表: {list(cookies_dict.keys())}", file=sys.stderr)
# 完整打印所有Cookies键值对格式
print(f"\n完整Cookies内容键值对格式:", file=sys.stderr)
for name, value in cookies_dict.items():
print(f" {name}: {value}", file=sys.stderr)
print(f"\n================================\n", file=sys.stderr)
# 获取用户信息从页面或API
user_info = {}
try:
# 优化减少等待时间直接获取localStorage
# await asyncio.sleep(0.5) # 删除不必要的等待
# 从 localStorage 获取用户信息(最关键)
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
storage_dict = json.loads(storage)
# 提取有用的localStorage数据
useful_keys = ['b1', 'b1b1', 'p1']
for key in useful_keys:
if key in storage_dict:
try:
value = storage_dict[key]
if value and value.strip():
user_info[key] = json.loads(value) if value.startswith('{') or value.startswith('[') else value
except:
user_info[key] = storage_dict[key]
# 获取用户数据
for key, value in storage_dict.items():
if 'user' in key.lower():
try:
user_info['user_data'] = json.loads(value)
break
except:
pass
print(f"✅ 获取到用户信息: {list(user_info.keys())}", file=sys.stderr)
except Exception as e:
print(f"⚠️ 获取用户信息失败: {str(e)}", file=sys.stderr)
# 获取当前URL可能包含token等信息
current_url = self.page.url
print(f"当前URL: {current_url}", file=sys.stderr)
# 获取完整的localStorage数据
localStorage_data = {}
try:
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
localStorage_data = json.loads(storage)
print(f"✅ 获取到 {len(localStorage_data)} 个localStorage项", file=sys.stderr)
except Exception as e:
print(f"⚠️ 获取localStorage失败: {str(e)}", file=sys.stderr)
# 获取sessionStorage数据
sessionStorage_data = {}
try:
session_storage = await self.page.evaluate('() => JSON.stringify(sessionStorage)')
sessionStorage_data = json.loads(session_storage)
print(f"✅ 获取到 {len(sessionStorage_data)} 个sessionStorage项", file=sys.stderr)
except Exception as e:
print(f"⚠️ 获取sessionStorage失败: {str(e)}", file=sys.stderr)
# 保存完整的登录状态包含Cookies、localStorage、sessionStorage
try:
login_state = {
"cookies": cookies, # Playwright 完整格式
"localStorage": localStorage_data,
"sessionStorage": sessionStorage_data,
"url": current_url,
"timestamp": time.time(),
"user_agent": self.context._impl_obj._options.get('userAgent'),
"viewport": self.context._impl_obj._options.get('viewport')
}
# 保存到文件(兼容旧版)
with open('login_state.json', 'w', encoding='utf-8') as f:
json.dump(login_state, f, ensure_ascii=False, indent=2)
print("✅ 已保存完整登录状态到 login_state.json 文件", file=sys.stderr)
print(f" 包含: {len(cookies)} 个Cookies, {len(localStorage_data)} 个localStorage, {len(sessionStorage_data)} 个sessionStorage", file=sys.stderr)
# 兼容性同时保存单独的cookies.json文件
with open('cookies.json', 'w', encoding='utf-8') as f:
json.dump(cookies, f, ensure_ascii=False, indent=2)
print("✅ 已保存 Cookies 到 cookies.json 文件(兼容旧版)", file=sys.stderr)
# 新增使用Playwright原生storage_state保存按手机号命名
storage_state_dir = 'storage_states'
os.makedirs(storage_state_dir, exist_ok=True)
storage_state_filename = f"xhs_{phone}.json"
storage_state_path = os.path.join(storage_state_dir, storage_state_filename)
# 使用Playwright原生API保存storage_state
storage_state_data = await self.context.storage_state(path=storage_state_path)
print(f"✅ 已保存 Playwright Storage State 到: {storage_state_path}", file=sys.stderr)
print(f" 此文件包含完整的浏览器上下文状态,可用于后续免登录恢复", file=sys.stderr)
except Exception as e:
print(f"保存登录状态文件失败: {str(e)}", file=sys.stderr)
return {
"success": True,
"user_info": user_info,
"cookies": cookies_dict, # API 返回:键值对格式(方便前端展示)
"cookies_full": cookies, # API 返回Playwright完整格式数据库存储/脚本使用)
"login_state": login_state, # API 返回完整登录状态供Go服务存储到数据库
"localStorage": localStorage_data, # API 返回localStorage数据
"sessionStorage": sessionStorage_data, # API 返回sessionStorage数据
"url": current_url,
"storage_state_path": storage_state_path # 新增storage_state文件路径
}
except Exception as e:
print(f"登录异常: {str(e)}", file=sys.stderr)
# 保存错误截图(通用错误)
await save_error_screenshot(
self.page,
"login_exception",
f"登录异常: {str(e)}"
)
return {
"success": False,
"error": str(e)
}
async def get_user_profile(self) -> Dict[str, Any]:
"""
获取用户详细信息
登录成功后可以调用此方法获取更多用户信息
"""
try:
if not self.page:
return {
"success": False,
"error": "页面未初始化"
}
# 访问用户主页
await self.page.goto('https://www.xiaohongshu.com/user/profile', wait_until='networkidle')
await asyncio.sleep(2)
# 这里可以根据实际需求抓取用户信息
# 示例:获取用户昵称、头像等
return {
"success": True,
"profile": {}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def verify_login_status(self, url: str = None) -> Dict[str, Any]:
"""
验证当前登录状态
访问指定的小红书页面检查是否已登录
Args:
url: 可选的验证URL默认访问创作者平台
Returns:
Dict containing login status and user info if logged in
"""
try:
if not self.page:
return {
"success": False,
"logged_in": False,
"error": "页面未初始化"
}
print("正在验证登录状态...", file=sys.stderr)
# 确定要访问的URL
target_url = url or 'https://creator.xiaohongshu.com/'
page_name = "创作者平台" if "creator" in target_url else "小红书首页"
print(f"访问{page_name}...", file=sys.stderr)
# 重置跳转计数器
self.redirect_count = 0
self.last_redirect_time = 0
try:
await self.page.goto(target_url, wait_until='domcontentloaded', timeout=60000)
await asyncio.sleep(2) # 等待页面加载
# 检查是否发生了频繁跳转
if self.redirect_count > 5:
print(f"❌ 检测到无限跳转 ({self.redirect_count}次重定向)Cookie已失效", file=sys.stderr)
return {
"success": True,
"logged_in": False,
"cookie_expired": True,
"infinite_redirect": True,
"message": "Cookie已失效小红书检测到异常登录行为",
"url": self.page.url
}
print(f"✅ 已访问{page_name}当前URL: {self.page.url}", file=sys.stderr)
except Exception as e:
print(f"访问{page_name}失败: {str(e)}", file=sys.stderr)
return {
"success": False,
"logged_in": False,
"error": f"访问{page_name}失败: {str(e)}"
}
# 检查是否被重定向到登录页(未登录状态)
current_url = self.page.url
if 'login' in current_url.lower():
print("❌ 未登录状态(被重定向到登录页)", file=sys.stderr)
return {
"success": True,
"logged_in": False,
"cookie_expired": True, # 标识Cookie已失效
"message": "Cookie已失效或未登录",
"url": current_url
}
# 如果成功访问目标页面且未被重定向到登录页,说明已登录
if 'xiaohongshu.com' in current_url and 'login' not in current_url.lower():
print(f"✅ 已登录状态(成功访问{page_name}", file=sys.stderr)
# 获取当前的Cookies
cookies = await self.context.cookies()
# 转换为键值对格式(用于 API 返回)
cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}
# 尝试获取用户信息
user_info = {}
try:
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
storage_dict = json.loads(storage)
# 提取有用的localStorage数据
for key, value in storage_dict.items():
if 'user' in key.lower():
try:
user_data = json.loads(value)
user_info['user_data'] = user_data
break
except:
pass
except Exception as e:
print(f"获取用户信息失败: {str(e)}", file=sys.stderr)
return {
"success": True,
"logged_in": True,
"message": "Cookie有效已登录",
"cookies": cookies_dict, # 键值对格式(前端展示)
"cookies_full": cookies, # Playwright完整格式数据库存储/脚本使用)
"user_info": user_info,
"url": current_url
}
else:
print("❌ 未登录状态URL异常", file=sys.stderr)
return {
"success": True,
"logged_in": False,
"cookie_expired": True, # 标识Cookie已失效
"message": "Cookie已失效或未登录",
"url": current_url
}
except Exception as e:
print(f"验证登录状态异常: {str(e)}", file=sys.stderr)
return {
"success": False,
"logged_in": False,
"error": str(e)
}
def _calculate_title_width(self, title: str) -> int:
width = 0
for ch in title:
if unicodedata.east_asian_width(ch) in ("F", "W"):
width += 2
else:
width += 1
return width
async def publish_note(self, title: str, content: str, images: list = None, topics: list = None, cookies: list = None, proxy: str = None, user_agent: str = None) -> Dict[str, Any]:
"""
发布笔记支持Cookie注入
Args:
title: 笔记标题
content: 笔记内容
images: 图片路径列表(本地文件路径)
topics: 话题标签列表
cookies: 可选的Cookie列表Playwright完整格式用于注入登录态
proxy: 可选的代理地址,例如 http://ip:port
user_agent: 可选的自定义User-Agent用于防指纹识别
Returns:
Dict containing publish result
"""
try:
# ========== 内容验证 ==========
print("\n========== 开始验证发布内容 ==========", file=sys.stderr)
# 1. 验证标题长度
if not title or len(title.strip()) == 0:
return {
"success": False,
"error": "标题不能为空",
"error_type": "validation_error"
}
title = title.strip()
title_width = self._calculate_title_width(title)
if title_width > 40:
return {
"success": False,
"error": f"标题超出限制:当前宽度 {title_width},平台限制 40",
"error_type": "validation_error"
}
print(f"✅ 标题验证通过: 宽度 {title_width}/40", file=sys.stderr)
# 2. 验证内容长度
if not content or len(content.strip()) == 0:
return {
"success": False,
"error": "内容不能为空",
"error_type": "validation_error"
}
content_length = len(content)
if content_length > 1000:
return {
"success": False,
"error": f"内容超出限制:当前 {content_length} 个字,最多 1000 个字",
"error_type": "validation_error"
}
print(f"✅ 内容验证通过: {content_length}/1000 个字", file=sys.stderr)
# 3. 验证图片数量
images_count = len(images) if images else 0
if images_count == 0:
return {
"success": False,
"error": "至少需要 1 张图片",
"error_type": "validation_error"
}
if images_count > 18:
return {
"success": False,
"error": f"图片超出限制:当前 {images_count} 张,最多 18 张",
"error_type": "validation_error"
}
print(f"✅ 图片数量验证通过: {images_count}/18 张", file=sys.stderr)
print("✅ 所有验证通过,开始发布\n", file=sys.stderr)
# ========== 开始发布流程 ==========
# 如果提供了Cookie且使用浏览器池创建独立的context和page
if cookies:
print("✅ 检测到Cookie将创建独立的浏览器环境", file=sys.stderr)
# 调试打印cookies格式
if cookies and len(cookies) > 0:
print(f" Cookie格式检查: 类型={type(cookies).__name__}, 数量={len(cookies)}", file=sys.stderr)
if isinstance(cookies, list) and len(cookies) > 0:
first_cookie = cookies[0]
print(f" 第一个cookie字段: {list(first_cookie.keys()) if isinstance(first_cookie, dict) else 'not dict'}", file=sys.stderr)
if isinstance(first_cookie, dict):
# 检查关键字段的类型
for key in ['name', 'value', 'expires', 'sameSite']:
if key in first_cookie:
val = first_cookie[key]
print(f" {key}: type={type(val).__name__}, value={val}", file=sys.stderr)
# 使用浏览器池模式复用主浏览器但为发布创建独立的context
if self.use_pool and self.browser_pool:
print("[浏览器池模式] 复用主浏览器实例", file=sys.stderr)
# 从池中获取浏览器仅获取browser实例
self.browser, _, _ = await self.browser_pool.get_browser()
print("[浏览器池] 复用主浏览器实例", file=sys.stderr)
# 为发布任务创建全新的context不复用预热的context
context_kwargs = {
"viewport": {'width': 1280, 'height': 720},
"user_agent": user_agent or 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
}
self.context = await self.browser.new_context(**context_kwargs)
print("[浏览器池模式] 为发布创建独立的context避免污染预热环境", file=sys.stderr)
# 注入Cookie到新的context
await self.context.add_cookies(cookies)
print(f"✅ 已注入 {len(cookies)} 个Cookie", file=sys.stderr)
# 创建发布页面
print("[浏览器池模式] 创建发布专用页面", file=sys.stderr)
self.page = await self.context.new_page()
print("✅ 发布页面创建成功\n", file=sys.stderr)
elif not self.page:
# 非池模式且页面不存在,初始化浏览器
await self.init_browser(cookies, proxy=proxy, user_agent=user_agent)
else:
# 非池模式但页面已存在添加Cookie
await self.context.add_cookies(cookies)
print(f"✅ 已注入 {len(cookies)} 个Cookie", file=sys.stderr)
# 如果没有Cookie且没有page尝试使用池
if not self.page:
if self.use_pool and self.browser_pool:
print("[浏览器池模式] 获取浏览器实例", file=sys.stderr)
self.browser, self.context, self.page = await self.browser_pool.get_browser(proxy=proxy, user_agent=user_agent)
else:
return {
"success": False,
"error": "页面未初始化请先登录或提供Cookie"
}
print("\n========== 开始发布笔记 ==========", file=sys.stderr)
print(f"标题: {title}", file=sys.stderr)
print(f"内容: {content[:50]}..." if len(content) > 50 else f"内容: {content}", file=sys.stderr)
print(f"图片数量: {len(images) if images else 0}", file=sys.stderr)
print(f"话题: {topics if topics else []}", file=sys.stderr)
# 优化:直接访问图文发布页面URL,跳过点击tab步骤
print("访问创作者平台图文发布页面...", file=sys.stderr)
publish_url = 'https://creator.xiaohongshu.com/publish/publish?source=official&from=menu&target=image'
# 尝试访问页面(最多重试2次)
page_loaded = False
for attempt in range(2):
try:
if attempt > 0:
print(f"{attempt + 1} 次尝试加载页面...", file=sys.stderr)
else:
print("开始加载页面...", file=sys.stderr)
# 使用更宽松的等待条件,不等待networkidle
await self.page.goto(
publish_url,
wait_until='load', # 从networkidle改为load,更快
timeout=40000 # 增加到40秒
)
# 等待页面稳定
await asyncio.sleep(2)
# 检查是否被跳转回登录页或其他页面
current_url = self.page.url
# 先打印URL信息但不立即判定为错误
if current_url != publish_url:
print(f"⚠️ 检测到页面跳转: {current_url}", file=sys.stderr)
print(f"⚠️ 期望页面: {publish_url}", file=sys.stderr)
# 关键优化等待5秒给小红书时间自动重定向回发布页
if 'redirectReason' in current_url or 'login' in current_url:
print("🔄 检测到重定向参数等待5秒让小红书自动重定向...", file=sys.stderr)
await asyncio.sleep(5)
# 再次检查最终URL
final_url = self.page.url
print(f"🔍 最终页面URL: {final_url}", file=sys.stderr)
# 如果最终还是在发布页,则认为成功
if 'publish/publish' in final_url:
print("✅ 自动重定向成功,已到达发布页", file=sys.stderr)
current_url = final_url # 更新当前URL
elif 'login' in final_url and 'publish' not in final_url:
# 真的停留在登录页Cookie失效
return {
"success": False,
"error": "Cookie可能已失效,页面跳转到登录页",
"error_type": "cookie_expired"
}
# 最终检查只要URL中包含'publish/publish',就认为在发布页
if 'publish/publish' not in current_url:
print(f"❌ 页面最终未到达发布页: {current_url}", file=sys.stderr)
# 其他跳转,重试
if attempt < 1:
print("等待3秒后重试...", file=sys.stderr)
await asyncio.sleep(3)
continue
else:
return {
"success": False,
"error": f"页面跳转到意外地址: {current_url}"
}
# 验证页面是否加载成功(检查是否有上传控件)
upload_check = await self.page.query_selector('input[type="file"]')
if upload_check:
print(f"✅ 已进入图文发布页面: {current_url}", file=sys.stderr)
page_loaded = True
break
else:
print("⚠️ 页面加载完成但未找到上传控件,可能需要重试", file=sys.stderr)
if attempt < 1: # 还有重试机会
await asyncio.sleep(2)
continue
else:
# 最后一次尝试也失败了,继续执行看看
print("⚠️ 未找到上传控件,但继续执行", file=sys.stderr)
page_loaded = True
break
except Exception as e:
error_msg = f"访问发布页面失败(尝试{attempt + 1}/2): {str(e)}"
print(f"{error_msg}", file=sys.stderr)
# 保存错误截图
try:
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
error_type = type(e).__name__
screenshot_path = f"error_screenshots/{timestamp}_{error_type}.png"
os.makedirs('error_screenshots', exist_ok=True)
await self.page.screenshot(path=screenshot_path, full_page=True)
print(f"📸 已保存错误截图: {screenshot_path}", file=sys.stderr)
except Exception as screenshot_error:
print(f"⚠️ 保存截图失败: {screenshot_error}", file=sys.stderr)
if attempt < 1: # 还有重试机会
print("等待3秒后重试...", file=sys.stderr)
await asyncio.sleep(3)
continue
else:
# 所有重试都失败了
import traceback
traceback.print_exc()
return {
"success": False,
"error": f"访问发布页面失败(已重试2次): {str(e)}"
}
if not page_loaded:
return {
"success": False,
"error": "页面加载失败"
}
# 上传图片(如果有)
if images and len(images) > 0:
try:
print(f"开始上传 {len(images)} 张图片...", file=sys.stderr)
# 预处理图片:将网络图片下载到本地
local_images = []
downloaded_files = [] # 用于清理临时文件
# OSS域名前缀用于补充不完整的图片路径
oss_prefix = "https://bxmkb-beijing.oss-cn-beijing.aliyuncs.com/Images/"
for img_path in images:
original_path = img_path
# 检查是否需要补充OSS前缀
if not (img_path.startswith('http://') or img_path.startswith('https://')):
# 不是完整URL
if not os.path.isabs(img_path):
# 也不是绝对路径检查是否需要补充OSS前缀
if '/' in img_path and not img_path.startswith('/'):
# 可能是OSS相对路径如 20251221/xxx.png补充前缀
img_path = oss_prefix + img_path
print(f" 检测到相对路径补充OSS前缀: {original_path} -> {img_path}", file=sys.stderr)
if img_path.startswith('http://') or img_path.startswith('https://'):
# 网络图片,需要下载
try:
local_path = await download_image(img_path)
local_images.append(local_path)
downloaded_files.append(local_path) # 记录以便后续清理
except Exception as e:
print(f"⚠️ 下载图片 {img_path} 失败: {str(e)}", file=sys.stderr)
return {
"success": False,
"error": f"下载图片失败: {str(e)}"
}
else:
# 本地图片,直接使用
local_images.append(img_path)
print(f"✅ 图片预处理完成,共 {len(local_images)} 张本地图片", file=sys.stderr)
# 优化:减少等待时间
await asyncio.sleep(0.5)
# 优化直接使用最常见的选择器先用query_selector快速查找
print("查找图片上传控件...", file=sys.stderr)
upload_selectors = [
'input[type="file"][accept*="image"]',
'input[type="file"]',
'input[accept*="image"]',
'.upload-input',
'[class*="upload"] input[type="file"]',
]
file_input = None
for selector in upload_selectors:
try:
# 优化使用query_selector代替wait_for_selector更快
file_input = await self.page.query_selector(selector)
if file_input:
print(f"找到文件上传控件: {selector}", file=sys.stderr)
break
except Exception:
continue
# 如果快速查找失败再用wait方式
if not file_input:
for selector in upload_selectors:
try:
file_input = await self.page.wait_for_selector(selector, timeout=3000)
if file_input:
print(f"找到文件上传控件: {selector}", file=sys.stderr)
break
except Exception:
continue
if file_input:
# 批量上传图片(使用本地图片)
images_count = len(local_images)
print(f"正在上传 {images_count} 张本地图片: {local_images}", file=sys.stderr)
# 验证文件是否存在
for img_path in local_images:
if not os.path.exists(img_path):
print(f"⚠️ 警告: 图片文件不存在: {img_path}", file=sys.stderr)
else:
file_size = os.path.getsize(img_path) / 1024
print(f" ✅ 文件存在: {img_path} ({file_size:.1f}KB)", file=sys.stderr)
await file_input.set_input_files(local_images)
print(f"已设置文件路径,等待上传...", file=sys.stderr)
# 等待一下让页面处理文件
await asyncio.sleep(1)
# 优化更快速的图片上传检测500ms间隔
upload_success = False
uploaded_count = 0
page_destroyed = False
for i in range(60): # 最多等待30秒60次 × 500ms
await asyncio.sleep(0.5) # 优化从1秒改为500ms
try:
# 检查页面是否还有效
if self.page.is_closed():
print("检测到页面已关闭", file=sys.stderr)
page_destroyed = True
break
# 查找所有已上传的图片缩略图 - 增加更多选择器
uploaded_images = await self.page.query_selector_all('img[src*="blob:"]')
if not uploaded_images:
# 尝试其他选择器
uploaded_images = await self.page.query_selector_all('[class*="image"][class*="item"] img')
if not uploaded_images:
# 再尝试其他可能的选择器
uploaded_images = await self.page.query_selector_all('.image-item img, .upload-item img, .pic-item img')
if not uploaded_images:
# 最后尝试查找包含图片的元素
uploaded_images = await self.page.query_selector_all('img[src*="data:image"]')
uploaded_count = len(uploaded_images)
if uploaded_count > 0:
# 检查是否所有图片都已上传
if uploaded_count >= images_count:
print(f"✅ 所有图片上传完成!共 {uploaded_count}", file=sys.stderr)
upload_success = True
break
# 每秒打印一次进度(避免刷屏)
if i % 2 == 0:
print(f"等待图片上传... {uploaded_count}/{images_count} ({(i+1)*0.5:.1f}/30秒)", file=sys.stderr)
except Exception as e:
error_msg = str(e)
# 检查是否是页面跳转/销毁导致的异常
if 'context was destroyed' in error_msg.lower() or 'navigation' in error_msg.lower():
print(f"检测到页面跳转: {error_msg}", file=sys.stderr)
page_destroyed = True
break
print(f"检测上传状态异常: {e}", file=sys.stderr)
# 连续异常可能说明页面有问题,等待更长时间
if i > 10: # 5秒后还在异常
await asyncio.sleep(1)
# 如果页面被销毁,尝试等待重定向完成
if page_destroyed:
print("⚠️ 页面发生跳转检查当前URL...", file=sys.stderr)
await asyncio.sleep(3)
# 检查跳转后的URL
current_url = self.page.url
print(f"跳转后的URL: {current_url}", file=sys.stderr)
# 如果跳转到登录页说明Cookie失效
if 'login' in current_url:
# 清理临时文件
for temp_file in downloaded_files:
try:
os.remove(temp_file)
except Exception:
pass
return {
"success": False,
"error": "Cookie已失效上传过程中跳转到登录页",
"error_type": "cookie_expired"
}
# 如果仍然在发布页,重新检查图片
if 'publish/publish' in current_url:
print("✅ 仍在发布页,重新检查图片...", file=sys.stderr)
try:
uploaded_images = await self.page.query_selector_all('img[src*="blob:"], img[src*="data:image"], [class*="image"][class*="item"] img')
uploaded_count = len(uploaded_images)
if uploaded_count >= images_count:
print(f"✅ 页面稳定后确认图片已上传!共 {uploaded_count}", file=sys.stderr)
upload_success = True
else:
print(f"⚠️ 页面稳定后检测到 {uploaded_count}/{images_count} 张图片", file=sys.stderr)
except Exception as e:
print(f"页面稳定后检测失败: {e}", file=sys.stderr)
else:
# 跳转到其他页面
# 清理临时文件
for temp_file in downloaded_files:
try:
os.remove(temp_file)
except Exception:
pass
return {
"success": False,
"error": f"上传过程中页面跳转到: {current_url}"
}
if upload_success:
print(f"✅ 图片上传成功!共 {uploaded_count}", file=sys.stderr)
await asyncio.sleep(0.5) # 优化从2秒减少到0.5秒
# 清理下载的临时文件
for temp_file in downloaded_files:
try:
os.remove(temp_file)
print(f"✅ 已清理临时文件: {temp_file}", file=sys.stderr)
except Exception:
pass
else:
print(f"⚠️ 仅检测到 {uploaded_count}/{images_count} 张图片,但继续执行...", file=sys.stderr)
else:
print("未找到隐藏的file input尝试查找可点击的上传区域...", file=sys.stderr)
# 调试: 打印页面上所有包含upload的元素
try:
all_elements = await self.page.query_selector_all('[class*="upload"], [id*="upload"]')
print(f"\u627e{len(all_elements)} 个包含upload的元素", file=sys.stderr)
for i, el in enumerate(all_elements[:10]): # 只看前10个
try:
tag_name = await el.evaluate('el => el.tagName')
class_name = await el.evaluate('el => el.className')
print(f" [{i+1}] {tag_name} class='{class_name}'", file=sys.stderr)
except Exception:
pass
except Exception:
pass
# 尝试点击上传区域或按钮
upload_area_selectors = [
'[class*="upload"][class*="box"]',
'[class*="upload"][class*="area"]',
'[class*="upload"][class*="wrapper"]',
'.upload-zone',
'div:has-text("上传图片")',
'div:has-text("点击上传")',
'button:has-text("上传图片")',
]
clicked = False
for selector in upload_area_selectors:
try:
area = await self.page.wait_for_selector(selector, timeout=2000)
if area:
print(f"找到上传区域: {selector}", file=sys.stderr)
await area.click()
await asyncio.sleep(0.5)
# 点击后再次查找file input
file_input = await self.page.wait_for_selector('input[type="file"]', timeout=2000)
if file_input:
images_count = len(local_images)
print(f"正在上传 {images_count} 张本地图片: {local_images}", file=sys.stderr)
await file_input.set_input_files(local_images)
print(f"已设置文件路径,等待上传...", file=sys.stderr)
# 等待一下让页面处理文件
await asyncio.sleep(1)
# 优化:更快的图片上传检测
upload_success = False
uploaded_count = 0
page_destroyed = False
for i in range(60): # 最多30秒
await asyncio.sleep(0.5) # 优化500ms间隔
try:
# 检查页面是否还有效
if self.page.is_closed():
print("检测到页面已关闭", file=sys.stderr)
page_destroyed = True
break
uploaded_images = await self.page.query_selector_all('img[src*="blob:"]')
if not uploaded_images:
uploaded_images = await self.page.query_selector_all('[class*="image"][class*="item"] img')
if not uploaded_images:
uploaded_images = await self.page.query_selector_all('.image-item img, .upload-item img, .pic-item img')
if not uploaded_images:
uploaded_images = await self.page.query_selector_all('img[src*="data:image"]')
uploaded_count = len(uploaded_images)
if uploaded_count > 0:
if uploaded_count >= images_count:
print(f"✅ 所有图片上传完成!共 {uploaded_count}", file=sys.stderr)
upload_success = True
break
# 每秒打印一次进度
if i % 2 == 0:
print(f"等待图片上传... {uploaded_count}/{images_count} ({(i+1)*0.5:.1f}/30秒)", file=sys.stderr)
except Exception as e:
error_msg = str(e)
if 'context was destroyed' in error_msg.lower() or 'navigation' in error_msg.lower():
print(f"检测到页面跳转: {error_msg}", file=sys.stderr)
page_destroyed = True
break
print(f"检测上传状态异常: {e}", file=sys.stderr)
if i > 10:
await asyncio.sleep(1)
# 如果页面被销毁,尝试等待重定向完成
if page_destroyed:
print("⚠️ 页面发生跳转,等待页面稳定...", file=sys.stderr)
await asyncio.sleep(3)
try:
uploaded_images = await self.page.query_selector_all('img[src*="blob:"], img[src*="data:image"], [class*="image"][class*="item"] img')
uploaded_count = len(uploaded_images)
if uploaded_count >= images_count:
print(f"✅ 页面稳定后确认图片已上传!共 {uploaded_count}", file=sys.stderr)
upload_success = True
else:
print(f"⚠️ 页面稳定后检测到 {uploaded_count}/{images_count} 张图片", file=sys.stderr)
except Exception as e:
print(f"页面稳定后检测失败: {e}", file=sys.stderr)
if upload_success:
print(f"✅ 图片上传成功!共 {uploaded_count}", file=sys.stderr)
await asyncio.sleep(0.5) # 优化0.5秒
# 清理下载的临时文件
for temp_file in downloaded_files:
try:
os.remove(temp_file)
print(f"✅ 已清理临时文件: {temp_file}", file=sys.stderr)
except Exception:
pass
else:
print(f"⚠️ 仅检测到 {uploaded_count}/{images_count} 张图片,但继续执行...", file=sys.stderr)
clicked = True
break
except Exception:
continue
if not clicked:
print("⚠️ 未找到任何上传控件,跳过图片上传", file=sys.stderr)
except Exception as e:
print(f"上传图片失败: {str(e)}", file=sys.stderr)
# 不中断流程,继续发布文字
# 输入标题和内容
try:
print("开始输入文字内容...", file=sys.stderr)
# 查找标题输入框(使用显式等待确保元素可交互)
title_selectors = [
'input[placeholder*="标题"]',
'input[placeholder*="填写标题"]',
'input[placeholder*="曝光"]',
'.title-input',
'[class*="title"] input',
]
title_input = None
# 优化先用快速query_selector查找
for selector in title_selectors:
try:
title_input = await self.page.query_selector(selector)
if title_input:
# 检查元素是否可见
is_visible = await title_input.is_visible()
if is_visible:
await asyncio.sleep(0.2) # 优化:减少等待时间
print(f"找到标题输入框: {selector}", file=sys.stderr)
break
else:
title_input = None
except Exception:
continue
# 如果快速查找失败再用wait方式
if not title_input:
for selector in title_selectors:
try:
title_input = await self.page.wait_for_selector(
selector,
state='visible',
timeout=3000 # 优化:减少超时时间
)
if title_input:
await asyncio.sleep(0.2)
print(f"找到标题输入框: {selector}", file=sys.stderr)
break
except Exception:
continue
if title_input:
await title_input.click()
await asyncio.sleep(0.3)
await title_input.fill(title)
print(f"已输入标题: {title}", file=sys.stderr)
else:
print("未找到标题输入框,可能不需要单独标题", file=sys.stderr)
# 查找内容输入框(正文)(使用显式等待确保元素可交互)
content_selectors = [
'div[contenteditable="true"]',
'div[placeholder*="正文"]',
'div[placeholder*="输入正文"]',
'textarea[placeholder*="输入正文"]',
'textarea[placeholder*="填写笔记内容"]',
'textarea[placeholder*="笔记内容"]',
'[class*="content"] div[contenteditable="true"]',
'[class*="editor"] div[contenteditable="true"]',
'textarea',
]
content_input = None
# 优化先用快速query_selector查找
for selector in content_selectors:
try:
content_input = await self.page.query_selector(selector)
if content_input:
is_visible = await content_input.is_visible()
if is_visible:
await asyncio.sleep(0.2) # 优化:减少等待时间
print(f"找到内容输入框: {selector}", file=sys.stderr)
break
else:
content_input = None
except Exception:
continue
# 如果快速查找失败再用wait方式
if not content_input:
for selector in content_selectors:
try:
content_input = await self.page.wait_for_selector(
selector,
state='visible',
timeout=3000 # 优化:减少超时时间
)
if content_input:
await asyncio.sleep(0.2)
print(f"找到内容输入框: {selector}", file=sys.stderr)
break
except Exception:
continue
if content_input:
# 清空并输入内容
await content_input.click()
await asyncio.sleep(0.2) # 优化:减少等待时间
# 检查是否是contenteditable元素
try:
is_contenteditable = await content_input.evaluate('el => el.getAttribute("contenteditable") === "true"')
if is_contenteditable:
# 使用innerText设置内容
await content_input.evaluate(f'el => el.innerText = {json.dumps(content)}')
else:
# 普通textarea
await content_input.fill(content)
except Exception:
# 如果判断失败尝试直接fill
await content_input.fill(content)
print("已输入笔记内容", file=sys.stderr)
await asyncio.sleep(0.2) # 优化:减少等待时间
# 添加话题标签
if topics:
print(f"添加话题标签: {topics}", file=sys.stderr)
for topic in topics:
# 在内容末尾添加话题
topic_text = f" #{topic}"
try:
is_contenteditable = await content_input.evaluate('el => el.getAttribute("contenteditable") === "true"')
if is_contenteditable:
await content_input.evaluate(f'el => el.innerText += {json.dumps(topic_text)}')
else:
current_value = await content_input.evaluate('el => el.value')
await content_input.fill(current_value + topic_text)
except Exception:
# 如果添加失败,继续下一个
pass
print(f"已添加 {len(topics)} 个话题标签", file=sys.stderr)
await asyncio.sleep(0.5) # 优化:减少等待时间
# 单独在话题输入框中模拟人类方式输入标签
if topics:
print("尝试在话题输入框中逐个输入标签...", file=sys.stderr)
tag_input_selectors = [
'input[placeholder*="话题"]',
'input[placeholder*="#"]',
'input[placeholder*="添加标签"]',
'[class*="tag"] input',
'[class*="topic"] input',
]
tag_input = None
# 优化先用query_selector快速查找
for selector in tag_input_selectors:
try:
tag_input = await self.page.query_selector(selector)
if tag_input:
print(f"找到话题输入框: {selector}", file=sys.stderr)
break
except Exception:
continue
# 快速查找失败再用wait
if not tag_input:
for selector in tag_input_selectors:
try:
tag_input = await self.page.wait_for_selector(selector, timeout=2000)
if tag_input:
print(f"找到话题输入框: {selector}", file=sys.stderr)
break
except Exception:
continue
if tag_input:
for topic in topics:
try:
await tag_input.click()
await asyncio.sleep(0.2) # 优化:减少等待时间
# 清空已有内容
try:
await tag_input.fill("")
except Exception:
pass
# 优化使用fill代替type更快
await tag_input.fill("#" + topic)
await asyncio.sleep(0.5) # 优化:减少等待时间
# 等待联想列表并选择第一项
suggestion = None
suggestion_selectors = [
'[class*="suggest"] li',
'[role="listbox"] li',
'[class*="dropdown"] li',
]
for s_selector in suggestion_selectors:
try:
suggestion = await self.page.query_selector(s_selector)
if suggestion:
break
except Exception:
continue
if suggestion:
await suggestion.click()
print(f"✅ 已选择联想话题: {topic}", file=sys.stderr)
else:
# 没有联想列表时,通过回车确认
await tag_input.press("Enter")
print(f"✅ 未找到联想列表,使用回车确认话题: {topic}", file=sys.stderr)
await asyncio.sleep(0.3) # 优化:减少等待时间
except Exception as e:
print(f"添加话题 {topic} 到输入框失败: {str(e)}", file=sys.stderr)
else:
print("⚠️ 未找到话题输入框,已退回到在正文中追加 #话题 的方式", file=sys.stderr)
else:
return {
"success": False,
"error": "未找到内容输入框"
}
except Exception as e:
return {
"success": False,
"error": f"输入内容失败: {str(e)}"
}
# 模拟简单的人类滚动行为
try:
for _ in range(3):
await self.page.mouse.wheel(0, random.randint(200, 500))
await asyncio.sleep(random.uniform(0.3, 0.8))
except Exception:
pass
# 点击发布按钮
try:
print("查找发布按钮...", file=sys.stderr)
submit_selectors = [
'button:has-text("发布笔记")',
'button:has-text("发布")',
'text="发布笔记"',
'text="发布"',
'.publish-btn',
'.submit-btn',
]
submit_btn = None
for selector in submit_selectors:
try:
submit_btn = await self.page.wait_for_selector(selector, timeout=3000)
if submit_btn:
# 检查按钮是否可点击
is_disabled = await submit_btn.evaluate('el => el.disabled')
if not is_disabled:
print(f"找到发布按钮: {selector}", file=sys.stderr)
break
else:
submit_btn = None
except Exception:
continue
if submit_btn:
# 设置网络监听,捕获发布接口响应
note_id = None
share_link = None
async def handle_response(response):
nonlocal note_id, share_link
try:
# 监听发布笔记的API响应
if '/web_api/sns/v2/note' in response.url:
print(f"✅ 捕获到发布API响应: {response.url}", file=sys.stderr)
if response.status == 200:
try:
data = await response.json()
print(f"API响应数据: {json.dumps(data, ensure_ascii=False)}", file=sys.stderr)
if data.get('success') and data.get('data'):
note_id = data['data'].get('id')
# 优先使用share_link,如果没有则使用note_id拼接
if 'share_link' in data:
share_link = data['share_link']
print(f"✅ 获取到笔记链接: {share_link}", file=sys.stderr)
elif note_id:
share_link = f"https://www.xiaohongshu.com/discovery/item/{note_id}"
print(f"✅ 根据ID生成笔记链接: {share_link}", file=sys.stderr)
except Exception as e:
print(f"解析API响应失败: {str(e)}", file=sys.stderr)
except Exception as e:
print(f"处理响应失败: {str(e)}", file=sys.stderr)
# 添加响应监听器
self.page.on('response', handle_response)
await submit_btn.click()
print("✅ 已点击发布按钮", file=sys.stderr)
await asyncio.sleep(3) # 等待更长时间以捕获API响应
# 检查是否出现社区规范限制提示
print("检查是否有社区规范限制...", file=sys.stderr)
try:
# 尝试查找各种可能的错误提示
error_selectors = [
'text="因违反社区规范禁止发笔记"',
'text*="违反社区规范"',
'text*="禁止发布"',
'text*="账号被限制"',
'text*="账号异常"',
'.error-tip',
'.warning-tip',
'[class*="error"]',
'[class*="warning"]',
]
for selector in error_selectors:
try:
error_el = await self.page.wait_for_selector(selector, timeout=2000)
if error_el:
error_text = await error_el.inner_text()
print(f"❌ 检测到错误提示: {error_text}", file=sys.stderr)
return {
"success": False,
"error": f"发布失败: {error_text}",
"error_type": "community_violation", # 标记错误类型
"message": error_text
}
except Exception:
continue
except Exception as e:
print(f"检查错误提示异常: {str(e)}", file=sys.stderr)
# 检查是否发布成功
print("检查发布结果...", file=sys.stderr)
try:
await asyncio.sleep(2) # 等待发布完成
# 如果捕获到了真实的笔记链接,直接返回
if share_link:
print(f"✅ 发布成功,获取到笔记链接: {share_link}", file=sys.stderr)
# 如果是浏览器池模式且使用了Cookie关闭发布专用页面
if self.use_pool and self.browser_pool and cookies:
try:
print("[浏览器池模式] 关闭发布专用页面", file=sys.stderr)
await self.page.close()
self.page = None
print("✅ 发布页面已关闭", file=sys.stderr)
except Exception as e:
print(f"⚠️ 关闭页面失败: {str(e)}", file=sys.stderr)
return {
"success": True,
"message": "笔记发布成功",
"data": {
"note_id": note_id,
"note_url": share_link
},
"url": share_link # 保持兼容性
}
# 如果没有捕获到,使用原来的逻辑
# 等待发布成功的提示或页面跳转
success_selectors = [
'text="发布成功"',
'text="发布完成"',
'text*="成功"',
'.success-tip',
'.success-message',
]
publish_success = False
for selector in success_selectors:
try:
success_el = await self.page.wait_for_selector(selector, timeout=3000)
if success_el:
success_text = await success_el.inner_text()
print(f"✅ 检测到发布成功提示: {success_text}", file=sys.stderr)
publish_success = True
break
except Exception:
continue
# 如果没有明确的成功提示检查URL是否变化
current_url = self.page.url
if not publish_success:
# 如果还在发布页面,可能是发布失败
if 'publish' in current_url.lower():
print("⚠️ 未检测到成功提示,但继续执行", file=sys.stderr)
else:
print("✅ URL已变化似乎发布成功", file=sys.stderr)
publish_success = True
print(f"发布后URL: {current_url}", file=sys.stderr)
# 如果是浏览器池模式且使用了Cookie关闭发布专用页面和context
if self.use_pool and self.browser_pool and cookies:
try:
print("[浏览器池模式] 关闭发布专用环境", file=sys.stderr)
if self.page:
await self.page.close()
self.page = None
print("✅ 发布页面已关闭", file=sys.stderr)
if self.context:
await self.context.close()
self.context = None
print("✅ 发布context已关闭预热环境保持不受影响", file=sys.stderr)
except Exception as e:
print(f"⚠️ 关闭发布环境失败: {str(e)}", file=sys.stderr)
return {
"success": True,
"message": "笔记发布成功",
"url": current_url
}
except Exception as e:
print(f"检查发布结果异常: {str(e)}", file=sys.stderr)
# 如果是浏览器池模式且使用了Cookie,关闭发布专用页面和context
if self.use_pool and self.browser_pool and cookies:
try:
print("[浏览器池模式] 关闭发布专用环境", file=sys.stderr)
if self.page:
await self.page.close()
self.page = None
print("✅ 发布页面已关闭", file=sys.stderr)
if self.context:
await self.context.close()
self.context = None
print("✅ 发布context已关闭预热环境保持不受影响", file=sys.stderr)
except Exception as e2:
print(f"⚠️ 关闭发布环境失败: {str(e2)}", file=sys.stderr)
# 即使检查异常,也返回成功(因为按钮已点击)
return {
"success": True,
"message": "笔记已提交发布,但未能确认结果",
"url": self.page.url if self.page else ""
}
else:
return {
"success": False,
"error": "未找到可用的发布按钮,可能内容不完整"
}
except Exception as e:
return {
"success": False,
"error": f"点击发布按钮失败: {str(e)}"
}
except Exception as e:
print(f"发布笔记异常: {str(e)}", file=sys.stderr)
return {
"success": False,
"error": str(e)
}