Files
ai_wht_wechat/backend/xhs_login.py
2025-12-19 22:36:48 +08:00

1415 lines
68 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
小红书登录服务
使用 Playwright 模拟浏览器登录小红书
"""
from playwright.async_api import async_playwright, Browser, Page, BrowserContext
from typing import Dict, Any, Optional
import asyncio
import json
import random
import unicodedata
import sys
class XHSLoginService:
"""小红书登录服务"""
def __init__(self):
self.playwright = None
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self.current_phone = None
async def init_browser(self, cookies: Optional[list] = None, proxy: Optional[str] = None, user_agent: Optional[str] = None):
"""
初始化浏览器
Args:
cookies: 可选的Cookie列表用于恢复登录状态
proxy: 可选的代理地址,例如 http://user:pass@ip:port
user_agent: 可选的自定义User-Agent
"""
try:
self.playwright = await async_playwright().start()
# 启动浏览器使用chromium
# headless=True 在服务器环境下运行,不显示浏览器界面
launch_kwargs = {
"headless": True, # 服务器环境使用无头模式本地调试可改为False
"args": ['--disable-blink-features=AutomationControlled'],
}
if proxy:
launch_kwargs["proxy"] = {"server": proxy}
self.browser = await self.playwright.chromium.launch(**launch_kwargs)
# 创建浏览器上下文,模拟真实用户
context_kwargs = {
"viewport": {'width': 1280, 'height': 720},
"user_agent": user_agent or 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
}
self.context = await self.browser.new_context(**context_kwargs)
# 如果提供了Cookies注入到浏览器上下文
if cookies:
await self.context.add_cookies(cookies)
print(f"已注入 {len(cookies)} 个Cookie", file=sys.stderr)
# 创建新页面
self.page = await self.context.new_page()
print("浏览器初始化成功", file=sys.stderr)
except Exception as e:
print(f"浏览器初始化失败: {str(e)}", file=sys.stderr)
raise
async def close_browser(self):
"""关闭浏览器"""
try:
if self.page:
await self.page.close()
if self.context:
await self.context.close()
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
print("浏览器已关闭", file=sys.stderr)
except Exception as e:
print(f"关闭浏览器异常: {str(e)}", file=sys.stderr)
async def send_verification_code(self, phone: str, country_code: str = "+86") -> Dict[str, Any]:
"""
发送验证码
Args:
phone: 手机号
country_code: 国家区号
Returns:
Dict containing success status and error message if any
"""
try:
if not self.page:
await self.init_browser()
self.current_phone = phone
# 访问小红书创作者平台登录页(专门的登录页面)
print(f"正在访问小红书创作者平台登录页...", file=sys.stderr)
# 直接访问创作者平台登录页面超时时间延长到60秒
try:
await self.page.goto('https://creator.xiaohongshu.com/login', wait_until='domcontentloaded', timeout=60000)
print("✅ 页面加载完成", file=sys.stderr)
except Exception as e:
print(f"访问页面超时,但继续尝试: {str(e)}", file=sys.stderr)
# 等待登录表单加载
await asyncio.sleep(2)
print("✅ 已进入创作者平台登录页面", file=sys.stderr)
# 根据记忆:小红书登录跳过协议复选框,无需处理
# 但保留协议弹窗处理逻辑,以防页面变化
try:
await asyncio.sleep(0.5)
agreement_selectors = [
'text="同意并继续"',
'text="已阅读并同意"',
'button:has-text("同意")',
'button:has-text("继续")',
]
for selector in agreement_selectors:
try:
agreement_btn = await self.page.wait_for_selector(selector, timeout=1000)
if agreement_btn:
await agreement_btn.click()
print(f"✅ 已点击协议按钮: {selector}", file=sys.stderr)
await asyncio.sleep(0.5)
break
except Exception:
continue
except Exception as e:
print(f"无协议弹窗(正常情况)", file=sys.stderr)
# 输入手机号
try:
# 创作者平台登录页面的手机号输入框选择器
print("查找手机号输入框...", file=sys.stderr)
phone_input_selectors = [
'input[placeholder="手机号"]', # 根据HTML精确匹配
'input.css-nt440g', # 根据HTML中的class
'input[placeholder*="手机号"]',
'input[autocomplete="on"][autofocus]',
'input[type="tel"]',
'input[type="text"]',
'input[name*="phone"]',
'input[name*="mobile"]',
]
phone_input = None
for selector in phone_input_selectors:
try:
phone_input = await self.page.wait_for_selector(selector, timeout=2000)
if phone_input:
print(f"✅ 找到手机号输入框: {selector}", file=sys.stderr)
break
except Exception:
continue
if not phone_input:
# 打印页面信息用于调试
print("⚠️ 未找到手机号输入框,打印页面信息...", file=sys.stderr)
print(f"页面URL: {self.page.url}", file=sys.stderr)
# 查找所有input元素
inputs = await self.page.query_selector_all('input')
print(f"页面上找到 {len(inputs)} 个input元素", file=sys.stderr)
for i, inp in enumerate(inputs[:5]):
try:
placeholder = await inp.get_attribute('placeholder')
input_type = await inp.get_attribute('type')
name = await inp.get_attribute('name')
class_name = await inp.get_attribute('class')
print(f"Input {i+1}: type={input_type}, placeholder={placeholder}, name={name}, class={class_name}", file=sys.stderr)
except Exception:
pass
return {
"success": False,
"error": "未找到手机号输入框,请检查页面是否正确加载"
}
# 清空并输入手机号
await phone_input.click()
await asyncio.sleep(0.2)
# 使用 Ctrl+A 全选后输入,更快速地清空
await phone_input.press('Control+A')
await phone_input.type(phone, delay=50) # 模拟真实输入每个字符50ms延迟
print(f"✅ 已输入手机号: {phone}", file=sys.stderr)
await asyncio.sleep(0.3)
except Exception as e:
return {
"success": False,
"error": f"输入手机号失败: {str(e)}"
}
# 点击发送验证码按钮
try:
print("查找发送验证码按钮...", file=sys.stderr)
# 创作者平台登录页面的验证码按钮选择器
send_code_btn = None
selectors = [
'text="发送验证码"', # 根据截图
'text="重新发送"', # 根据HTML
'div.css-uyobdj', # 根据HTML中的class
'button:has-text("发送验证码")',
'button:has-text("重新发送")',
'div:has-text("重新发送")',
'text="获取验证码"',
'button:has-text("获取验证码")',
]
for selector in selectors:
try:
send_code_btn = await self.page.wait_for_selector(selector, timeout=1500)
if send_code_btn:
print(f"✅ 找到发送验证码按钮: {selector}", file=sys.stderr)
break
except Exception:
continue
if not send_code_btn:
# 尝试查找所有按钮和div元素
print("⚠️ 未找到预定选择器,查找所有可点击元素...", file=sys.stderr)
buttons = await self.page.query_selector_all('button, div[class*="css-"]')
print(f"页面上找到 {len(buttons)} 个可能的元素", file=sys.stderr)
for i, btn in enumerate(buttons[:20]): # 查看前20个
try:
text = await btn.inner_text()
if text and len(text.strip()) > 0: # 只打印有文本的
classes = await btn.get_attribute('class')
print(f"元素 {i+1}: 文本=[{text.strip()}] class=[{classes}]", file=sys.stderr)
except Exception:
pass
# 尝试根据文本内容查找
print("尝试根据文本内容查找验证码按钮...", file=sys.stderr)
for btn in buttons:
try:
text = await btn.inner_text()
if text and ('验证码' in text or '发送' in text or '获取' in text or '重新' in text):
send_code_btn = btn
print(f"✅ 通过文本找到按钮: {text.strip()}", file=sys.stderr)
break
except Exception:
continue
if send_code_btn:
await send_code_btn.click()
print("✅ 已点击发送验证码", file=sys.stderr)
await asyncio.sleep(2) # 等待验证码发送
# 点击后可能再次出现协议弹窗,再次处理
try:
await asyncio.sleep(0.5)
agreement_selectors = [
'text="同意并继续"',
'text="已阅读并同意"',
'button:has-text("同意")',
]
for selector in agreement_selectors:
try:
agreement_btn = await self.page.wait_for_selector(selector, timeout=1000)
if agreement_btn:
await agreement_btn.click()
print(f"✅ 再次点击协议按钮: {selector}", file=sys.stderr)
await asyncio.sleep(0.5)
break
except Exception:
continue
except Exception as e:
print(f"无二次协议弹窗(正常)", file=sys.stderr)
else:
return {
"success": False,
"error": "未找到发送验证码按钮,请检查页面结构"
}
except Exception as e:
return {
"success": False,
"error": f"点击发送验证码失败: {str(e)}"
}
# 检查是否需要滑块验证
try:
await asyncio.sleep(1)
# 如果出现滑块,需要手动处理或使用自动化工具
slider_selectors = [
'.slider',
'.captcha',
'[class*="captcha"]',
'[class*="slider"]',
'[id*="captcha"]',
]
slider_found = False
for selector in slider_selectors:
try:
slider = await self.page.query_selector(selector)
if slider:
slider_found = True
print("⚠️ 检测到滑块验证,请手动完成...", file=sys.stderr)
# 等待用户手动完成滑块
await asyncio.sleep(15)
break
except Exception:
pass
if not slider_found:
print("✅ 未检测到滑块验证", file=sys.stderr)
except Exception as e:
print(f"滑块检测异常: {str(e)}", file=sys.stderr)
print("\n✅ 验证码发送流程完成,请查看手机短信", file=sys.stderr)
print("请在小程序中输入收到的验证码并点击登录\n", file=sys.stderr)
return {
"success": True,
"message": "验证码发送成功,请查看手机短信"
}
except Exception as e:
error_msg = str(e)
print(f"\n\u274c 发送验证码异常: {error_msg}", file=sys.stderr)
print(f"当前页面URL: {self.page.url if self.page else 'N/A'}", file=sys.stderr)
# 打印调试信息
if self.page:
try:
print("尝试截图保存错误状态...", file=sys.stderr)
await self.page.screenshot(path='error_screenshot.png')
print("✅ 错误状态已截图保存到 error_screenshot.png", file=sys.stderr)
except Exception:
pass
return {
"success": False,
"error": error_msg
}
async def login(self, phone: str, code: str, country_code: str = "+86") -> Dict[str, Any]:
"""
使用验证码登录
Args:
phone: 手机号
code: 验证码
country_code: 国家区号
Returns:
Dict containing login result, user info and cookies
"""
try:
if not self.page:
return {
"success": False,
"error": "页面未初始化,请先发送验证码"
}
# 输入验证码
try:
print("查找验证码输入框...", file=sys.stderr)
code_input_selectors = [
'input[placeholder="验证码"]', # 根据HTML精确匹配
'input.css-1ge5flv', # 根据HTML中的class
'input[placeholder*="验证码"]',
'input[type="text"]:not([placeholder*="手机"])',
]
code_input = None
for selector in code_input_selectors:
try:
code_input = await self.page.wait_for_selector(selector, timeout=2000)
if code_input:
print(f"✅ 找到验证码输入框: {selector}", file=sys.stderr)
break
except Exception:
continue
if not code_input:
return {
"success": False,
"error": "未找到验证码输入框"
}
await code_input.click()
await asyncio.sleep(0.2)
await code_input.press('Control+A')
await code_input.type(code, delay=50)
print(f"✅ 已输入验证码: {code}", file=sys.stderr)
await asyncio.sleep(0.5)
except Exception as e:
return {
"success": False,
"error": f"输入验证码失败: {str(e)}"
}
# 点击登录按钮
try:
print("查找登录按钮...", file=sys.stderr)
login_btn_selectors = [
'button.beer-login-btn', # 根据HTML中的class
'button.css-y4h4ay', # 根据HTML
'button:has-text("登 录")', # 注意有空格
'button:has-text("登录")',
'text="登 录"',
'text="登录"',
'.login-button',
]
login_btn = None
for selector in login_btn_selectors:
try:
login_btn = await self.page.wait_for_selector(selector, timeout=2000)
if login_btn:
print(f"✅ 找到登录按钮: {selector}", file=sys.stderr)
break
except Exception:
continue
if not login_btn:
# 打印所有按钮用于调试
print("⚠️ 未找到登录按钮,打印所有按钮...", file=sys.stderr)
buttons = await self.page.query_selector_all('button')
print(f"页面上找到 {len(buttons)} 个按钮", file=sys.stderr)
for i, btn in enumerate(buttons[:10]):
try:
text = await btn.inner_text()
classes = await btn.get_attribute('class')
print(f"按钮 {i+1}: 文本=[{text.strip()}] class=[{classes}]", file=sys.stderr)
except Exception:
pass
return {
"success": False,
"error": "未找到登录按钮"
}
await login_btn.click()
print("✅ 已点击登录按钮", file=sys.stderr)
# 等待一下,检查是否出现协议弹窗
await asyncio.sleep(1)
# 处理登录后可能出现的协议弹窗
try:
agreement_popup_selectors = [
'text="同意并继续"',
'button:has-text("同意并继续")',
'text="已阅读并同意"',
]
for selector in agreement_popup_selectors:
try:
popup_btn = await self.page.wait_for_selector(selector, timeout=2000)
if popup_btn:
await popup_btn.click()
print(f"✅ 已点击登录后的协议弹窗: {selector}", file=sys.stderr)
await asyncio.sleep(1)
break
except Exception:
continue
except Exception as e:
print(f"无登录后协议弹窗(正常)", file=sys.stderr)
# 等待登录完成
await asyncio.sleep(3)
except Exception as e:
return {
"success": False,
"error": f"点击登录按钮失败: {str(e)}"
}
# 检查是否登录成功
try:
# 等待页面跳转或出现用户信息
await self.page.wait_for_selector('.user-info, .avatar, [class*="user"]', timeout=10000)
print("登录成功", file=sys.stderr)
except Exception as e:
return {
"success": False,
"error": f"登录验证失败,可能验证码错误: {str(e)}"
}
# 获取Cookies
cookies = await self.context.cookies()
# 注意:这里返回两种格式
# 1. cookies_dict: 键值对格式(用于 API 返回,方便前端展示)
# 2. cookies: Playwright 完整格式(用于保存文件和后续使用)
cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}
# 打印重要的Cookies
print(f"\n========== Cookies 信息 ==========", file=sys.stderr)
print(f"共获取到 {len(cookies)} 个Cookie", file=sys.stderr)
# 打印所有Cookie名称
print(f"\nCookie名称列表: {list(cookies_dict.keys())}", file=sys.stderr)
# 完整打印所有Cookies键值对格式
print(f"\n完整Cookies内容键值对格式:", file=sys.stderr)
for name, value in cookies_dict.items():
print(f" {name}: {value}", file=sys.stderr)
print(f"\n================================\n", file=sys.stderr)
# 获取用户信息从页面或API
user_info = {}
try:
# 等待页面完全加载
await asyncio.sleep(2)
# 尝试从localStorage获取用户信息
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
storage_dict = json.loads(storage)
print(f"LocalStorage内容: {list(storage_dict.keys())}", file=sys.stderr)
# 提取有用的localStorage数据
useful_keys = ['b1', 'b1b1', 'p1', 'xhs_context_networkQuality']
for key in useful_keys:
if key in storage_dict:
try:
# 尝试解析JSON
value = storage_dict[key]
if value and value.strip():
user_info[key] = json.loads(value) if value.startswith('{') or value.startswith('[') else value
except:
user_info[key] = storage_dict[key]
# 小红书可能将用户信息存储在特定键中
for key, value in storage_dict.items():
if 'user' in key.lower():
try:
user_data = json.loads(value)
user_info['user_data'] = user_data
print(f"从localStorage获取用户信息 - key: {key}", file=sys.stderr)
break
except:
pass
# 尝试从window对象获取用户信息更完整的方式
try:
# 获取window.__INITIAL_STATE__或其他可能的用户信息对象
window_data = await self.page.evaluate('''
() => {
const result = {};
// 尝试获取常见的用户信息存储位置
if (window.__INITIAL_STATE__) result.initial_state = window.__INITIAL_STATE__;
if (window.user) result.user = window.user;
if (window.userInfo) result.userInfo = window.userInfo;
if (window.__APOLLO_STATE__) result.apollo_state = window.__APOLLO_STATE__;
// 尝试从Redux store获取
if (window.__REDUX_DEVTOOLS_EXTENSION__) {
try {
const state = window.__REDUX_DEVTOOLS_EXTENSION__.extractState();
if (state) result.redux_state = state;
} catch(e) {}
}
return result;
}
''')
if window_data:
user_info['window_data'] = window_data
print(f"从window对象获取数据包含键: {list(window_data.keys())}", file=sys.stderr)
except Exception as e:
print(f"从window对象获取失败: {str(e)}", file=sys.stderr)
# 尝试从页面元素获取用户信息
if not user_info.get('username'):
try:
# 尝试获取用户昵称
username_el = await self.page.query_selector('.user-name, .username, [class*="user"][class*="name"]')
if username_el:
username = await username_el.inner_text()
user_info['username'] = username
print(f"从页面获取用户名: {username}", file=sys.stderr)
except:
pass
print(f"最终获取到用户信息字段: {list(user_info.keys())}", file=sys.stderr)
except Exception as e:
print(f"获取用户信息失败: {str(e)}", file=sys.stderr)
# 获取当前URL可能包含token等信息
current_url = self.page.url
print(f"当前URL: {current_url}", file=sys.stderr)
# 将Cookies保存到文件Playwright 完整格式)
try:
with open('cookies.json', 'w', encoding='utf-8') as f:
json.dump(cookies, f, ensure_ascii=False, indent=2)
print("✅ 已保存 Cookies 到 cookies.json 文件Playwright 格式)", file=sys.stderr)
print(f" 文件包含 {len(cookies)} 个完整的 Cookie 对象", file=sys.stderr)
except Exception as e:
print(f"保存Cookies文件失败: {str(e)}", file=sys.stderr)
return {
"success": True,
"user_info": user_info,
"cookies": cookies_dict, # API 返回:键值对格式(方便前端展示)
"cookies_full": cookies, # API 返回:完整格式(可选,供需要者使用)
"url": current_url
}
except Exception as e:
print(f"登录异常: {str(e)}", file=sys.stderr)
return {
"success": False,
"error": str(e)
}
async def get_user_profile(self) -> Dict[str, Any]:
"""
获取用户详细信息
登录成功后可以调用此方法获取更多用户信息
"""
try:
if not self.page:
return {
"success": False,
"error": "页面未初始化"
}
# 访问用户主页
await self.page.goto('https://www.xiaohongshu.com/user/profile', wait_until='networkidle')
await asyncio.sleep(2)
# 这里可以根据实际需求抓取用户信息
# 示例:获取用户昵称、头像等
return {
"success": True,
"profile": {}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def verify_login_status(self) -> Dict[str, Any]:
"""
验证当前登录状态
访问小红书创作者平台检查是否已登录
Returns:
Dict containing login status and user info if logged in
"""
try:
if not self.page:
return {
"success": False,
"logged_in": False,
"error": "页面未初始化"
}
print("正在验证登录状态...", file=sys.stderr)
# 访问小红书创作者平台(而不是首页)
print("访问创作者平台...", file=sys.stderr)
try:
await self.page.goto('https://creator.xiaohongshu.com/', wait_until='domcontentloaded', timeout=60000)
await asyncio.sleep(2) # 等待页面加载
print(f"✅ 已访问创作者平台当前URL: {self.page.url}", file=sys.stderr)
except Exception as e:
print(f"访问创作者平台失败: {str(e)}", file=sys.stderr)
return {
"success": False,
"logged_in": False,
"error": f"访问创作者平台失败: {str(e)}"
}
# 检查是否被重定向到登录页(未登录状态)
current_url = self.page.url
if 'login' in current_url.lower():
print("❌ 未登录状态(被重定向到登录页)", file=sys.stderr)
return {
"success": True,
"logged_in": False,
"cookie_expired": True, # 标识Cookie已失效
"message": "Cookie已失效或未登录",
"url": current_url
}
# 如果在创作者平台主页,说明已登录
if 'creator.xiaohongshu.com' in current_url and 'login' not in current_url.lower():
print("✅ 已登录状态(成功访问创作者平台)", file=sys.stderr)
# 获取当前的Cookies
cookies = await self.context.cookies()
# 转换为键值对格式(用于 API 返回)
cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}
# 尝试获取用户信息
user_info = {}
try:
storage = await self.page.evaluate('() => JSON.stringify(localStorage)')
storage_dict = json.loads(storage)
# 提取有用的localStorage数据
for key, value in storage_dict.items():
if 'user' in key.lower():
try:
user_data = json.loads(value)
user_info['user_data'] = user_data
break
except:
pass
except Exception as e:
print(f"获取用户信息失败: {str(e)}", file=sys.stderr)
return {
"success": True,
"logged_in": True,
"message": "Cookie有效已登录",
"cookies": cookies_dict, # 键值对格式(前端展示)
"cookies_full": cookies, # Playwright完整格式数据库存储/脚本使用)
"user_info": user_info,
"url": current_url
}
else:
print("❌ 未登录状态URL异常", file=sys.stderr)
return {
"success": True,
"logged_in": False,
"cookie_expired": True, # 标识Cookie已失效
"message": "Cookie已失效或未登录",
"url": current_url
}
except Exception as e:
print(f"验证登录状态异常: {str(e)}", file=sys.stderr)
return {
"success": False,
"logged_in": False,
"error": str(e)
}
def _calculate_title_width(self, title: str) -> int:
width = 0
for ch in title:
if unicodedata.east_asian_width(ch) in ("F", "W"):
width += 2
else:
width += 1
return width
async def publish_note(self, title: str, content: str, images: list = None, topics: list = None, cookies: list = None) -> Dict[str, Any]:
"""
发布笔记支持Cookie注入
Args:
title: 笔记标题
content: 笔记内容
images: 图片路径列表(本地文件路径)
topics: 话题标签列表
cookies: 可选的Cookie列表Playwright完整格式用于注入登录态
Returns:
Dict containing publish result
"""
try:
# ========== 内容验证 ==========
print("\n========== 开始验证发布内容 ==========", file=sys.stderr)
# 1. 验证标题长度
if not title or len(title.strip()) == 0:
return {
"success": False,
"error": "标题不能为空",
"error_type": "validation_error"
}
title = title.strip()
title_width = self._calculate_title_width(title)
if title_width > 40:
return {
"success": False,
"error": f"标题超出限制:当前宽度 {title_width},平台限制 40",
"error_type": "validation_error"
}
print(f"✅ 标题验证通过: 宽度 {title_width}/40", file=sys.stderr)
# 2. 验证内容长度
if not content or len(content.strip()) == 0:
return {
"success": False,
"error": "内容不能为空",
"error_type": "validation_error"
}
content_length = len(content)
if content_length > 1000:
return {
"success": False,
"error": f"内容超出限制:当前 {content_length} 个字,最多 1000 个字",
"error_type": "validation_error"
}
print(f"✅ 内容验证通过: {content_length}/1000 个字", file=sys.stderr)
# 3. 验证图片数量
images_count = len(images) if images else 0
if images_count == 0:
return {
"success": False,
"error": "至少需要 1 张图片",
"error_type": "validation_error"
}
if images_count > 18:
return {
"success": False,
"error": f"图片超出限制:当前 {images_count} 张,最多 18 张",
"error_type": "validation_error"
}
print(f"✅ 图片数量验证通过: {images_count}/18 张", file=sys.stderr)
print("✅ 所有验证通过,开始发布\n", file=sys.stderr)
# ========== 开始发布流程 ==========
# 如果提供了Cookie初始化浏览器并注入Cookie
if cookies:
print("✅ 检测到Cookie将注入到浏览器", file=sys.stderr)
if not self.page:
await self.init_browser(cookies)
else:
# 如果浏览器已存在添加Cookie
await self.context.add_cookies(cookies)
print(f"✅ 已注入 {len(cookies)} 个Cookie", file=sys.stderr)
if not self.page:
return {
"success": False,
"error": "页面未初始化请先登录或提供Cookie"
}
print("\n========== 开始发布笔记 ==========", file=sys.stderr)
print(f"标题: {title}", file=sys.stderr)
print(f"内容: {content[:50]}..." if len(content) > 50 else f"内容: {content}", file=sys.stderr)
print(f"图片数量: {len(images) if images else 0}", file=sys.stderr)
print(f"话题: {topics if topics else []}", file=sys.stderr)
# 访问官方创作者平台发布页面带有Cookie的状态下直接访问
print("访问创作者平台图文发布页面...", file=sys.stderr)
try:
await self.page.goto('https://creator.xiaohongshu.com/publish/publish?source=official',
wait_until='networkidle', timeout=60000)
# 等待页面核心元素加载完成,而不是固定时间
await asyncio.sleep(3) # 增加等待时间确保JavaScript完全执行
# 点击「上传图文」tab符合平台规范
try:
print("查找“上传图文”tab...", file=sys.stderr)
tab_selectors = [
'button:has-text("上传图文")',
'div:has-text("上传图文")',
'text="上传图文"',
]
tab_clicked = False
for selector in tab_selectors:
try:
tab = await self.page.wait_for_selector(selector, timeout=3000)
if tab:
await tab.click()
tab_clicked = True
print(f"✅ 已点击“上传图文”tab: {selector}", file=sys.stderr)
await asyncio.sleep(1)
break
except Exception:
continue
if not tab_clicked:
print("⚠️ 未找到“上传图文”tab将继续使用当前页面进行发布", file=sys.stderr)
except Exception as e:
print(f"点击“上传图文”tab时异常: {str(e)}", file=sys.stderr)
print("✅ 已进入图文发布页面", file=sys.stderr)
except Exception as e:
return {
"success": False,
"error": f"访问发布页面失败: {str(e)}"
}
# 上传图片(如果有)
if images and len(images) > 0:
try:
print(f"开始上传 {len(images)} 张图片...", file=sys.stderr)
await asyncio.sleep(1) # 等待图文上传页面完全加载
# 直接查找图片上传控件(已经在图文上传页面了)
print("查找图片上传控件...", file=sys.stderr)
upload_selectors = [
'input[type="file"][accept*="image"]',
'input[type="file"]',
'input[accept*="image"]',
'.upload-input',
'[class*="upload"] input[type="file"]',
]
file_input = None
for selector in upload_selectors:
try:
file_input = await self.page.wait_for_selector(selector, timeout=3000)
if file_input:
print(f"找到文件上传控件: {selector}", file=sys.stderr)
break
except Exception:
continue
if file_input:
# 批量上传图片
images_count = len(images)
print(f"正在上传 {images_count} 张图片: {images}", file=sys.stderr)
await file_input.set_input_files(images)
print(f"已设置文件路径,等待上传...", file=sys.stderr)
# 等待所有图片上传完成(检测多张图片)
upload_success = False
uploaded_count = 0
for i in range(20): # 最多等待20秒多图需要更长时间
await asyncio.sleep(1)
try:
# 查找所有已上传的图片缩略图
uploaded_images = await self.page.query_selector_all('img[src*="blob:"]')
if not uploaded_images:
# 尝试其他选择器
uploaded_images = await self.page.query_selector_all('[class*="image"][class*="item"] img')
uploaded_count = len(uploaded_images)
if uploaded_count > 0:
print(f"✅ 已上传 {uploaded_count}/{images_count} 张图片", file=sys.stderr)
# 检查是否所有图片都已上传
if uploaded_count >= images_count:
print(f"✅ 所有图片上传完成!共 {uploaded_count}", file=sys.stderr)
upload_success = True
break
print(f"等待图片上传... {uploaded_count}/{images_count} ({i+1}/20秒)", file=sys.stderr)
except Exception as e:
print(f"检测上传状态异常: {e}", file=sys.stderr)
pass
if upload_success:
print(f"✅ 图片上传成功!共 {uploaded_count}", file=sys.stderr)
await asyncio.sleep(2) # 额外等待2秒确保完全上传
else:
print(f"⚠️ 仅检测到 {uploaded_count}/{images_count} 张图片,但继续执行...", file=sys.stderr)
else:
print("未找到隐藏的file input尝试查找可点击的上传区域...", file=sys.stderr)
# 调试: 打印页面上所有包含upload的元素
try:
all_elements = await self.page.query_selector_all('[class*="upload"], [id*="upload"]')
print(f"\u627e{len(all_elements)} 个包含upload的元素", file=sys.stderr)
for i, el in enumerate(all_elements[:10]): # 只看前10个
try:
tag_name = await el.evaluate('el => el.tagName')
class_name = await el.evaluate('el => el.className')
print(f" [{i+1}] {tag_name} class='{class_name}'", file=sys.stderr)
except Exception:
pass
except Exception:
pass
# 尝试点击上传区域或按钮
upload_area_selectors = [
'[class*="upload"][class*="box"]',
'[class*="upload"][class*="area"]',
'[class*="upload"][class*="wrapper"]',
'.upload-zone',
'div:has-text("上传图片")',
'div:has-text("点击上传")',
'button:has-text("上传图片")',
]
clicked = False
for selector in upload_area_selectors:
try:
area = await self.page.wait_for_selector(selector, timeout=2000)
if area:
print(f"找到上传区域: {selector}", file=sys.stderr)
await area.click()
await asyncio.sleep(0.5)
# 点击后再次查找file input
file_input = await self.page.wait_for_selector('input[type="file"]', timeout=2000)
if file_input:
images_count = len(images)
print(f"正在上传 {images_count} 张图片: {images}", file=sys.stderr)
await file_input.set_input_files(images)
print(f"已设置文件路径,等待上传...", file=sys.stderr)
# 等待所有图片上传完成
upload_success = False
uploaded_count = 0
for i in range(20):
await asyncio.sleep(1)
try:
uploaded_images = await self.page.query_selector_all('img[src*="blob:"]')
if not uploaded_images:
uploaded_images = await self.page.query_selector_all('[class*="image"][class*="item"] img')
uploaded_count = len(uploaded_images)
if uploaded_count > 0:
print(f"✅ 已上传 {uploaded_count}/{images_count} 张图片", file=sys.stderr)
if uploaded_count >= images_count:
print(f"✅ 所有图片上传完成!共 {uploaded_count}", file=sys.stderr)
upload_success = True
break
print(f"等待图片上传... {uploaded_count}/{images_count} ({i+1}/20秒)", file=sys.stderr)
except Exception as e:
print(f"检测上传状态异常: {e}", file=sys.stderr)
pass
if upload_success:
print(f"✅ 图片上传成功!共 {uploaded_count}", file=sys.stderr)
await asyncio.sleep(2)
else:
print(f"⚠️ 仅检测到 {uploaded_count}/{images_count} 张图片,但继续执行...", file=sys.stderr)
clicked = True
break
except Exception:
continue
if not clicked:
print("⚠️ 未找到任何上传控件,跳过图片上传", file=sys.stderr)
except Exception as e:
print(f"上传图片失败: {str(e)}", file=sys.stderr)
# 不中断流程,继续发布文字
# 输入标题和内容
try:
print("开始输入文字内容...", file=sys.stderr)
# 查找标题输入框(使用显式等待确保元素可交互)
title_selectors = [
'input[placeholder*="标题"]',
'input[placeholder*="填写标题"]',
'input[placeholder*="曝光"]',
'.title-input',
'[class*="title"] input',
]
title_input = None
for selector in title_selectors:
try:
# 等待元素可见且可编辑
title_input = await self.page.wait_for_selector(
selector,
state='visible', # 确保元素可见
timeout=5000 # 增加超时时间
)
if title_input:
# 确保元素可交互(等待一小段时间让JS初始化完成)
await asyncio.sleep(0.5)
print(f"找到标题输入框: {selector}", file=sys.stderr)
break
except Exception as e:
print(f"选择器 {selector} 未找到: {str(e)}", file=sys.stderr)
continue
if title_input:
await title_input.click()
await asyncio.sleep(0.3)
await title_input.fill(title)
print(f"已输入标题: {title}", file=sys.stderr)
else:
print("未找到标题输入框,可能不需要单独标题", file=sys.stderr)
# 查找内容输入框(正文)(使用显式等待确保元素可交互)
content_selectors = [
'div[contenteditable="true"]',
'div[placeholder*="正文"]',
'div[placeholder*="输入正文"]',
'textarea[placeholder*="输入正文"]',
'textarea[placeholder*="填写笔记内容"]',
'textarea[placeholder*="笔记内容"]',
'[class*="content"] div[contenteditable="true"]',
'[class*="editor"] div[contenteditable="true"]',
'textarea',
]
content_input = None
for selector in content_selectors:
try:
# 等待元素可见且可编辑
content_input = await self.page.wait_for_selector(
selector,
state='visible', # 确保元素可见
timeout=5000 # 增加超时时间
)
if content_input:
# 确保元素可交互
await asyncio.sleep(0.5)
print(f"找到内容输入框: {selector}", file=sys.stderr)
break
except Exception as e:
print(f"选择器 {selector} 未找到: {str(e)}", file=sys.stderr)
continue
if content_input:
# 清空并输入内容
await content_input.click()
await asyncio.sleep(0.5)
# 检查是否是contenteditable元素
try:
is_contenteditable = await content_input.evaluate('el => el.getAttribute("contenteditable") === "true"')
if is_contenteditable:
# 使用innerText设置内容
await content_input.evaluate(f'el => el.innerText = {json.dumps(content)}')
else:
# 普通textarea
await content_input.fill(content)
except Exception:
# 如果判断失败尝试直接fill
await content_input.fill(content)
print("已输入笔记内容", file=sys.stderr)
await asyncio.sleep(0.5)
# 添加话题标签
if topics:
print(f"添加话题标签: {topics}", file=sys.stderr)
for topic in topics:
# 在内容末尾添加话题
topic_text = f" #{topic}"
try:
is_contenteditable = await content_input.evaluate('el => el.getAttribute("contenteditable") === "true"')
if is_contenteditable:
await content_input.evaluate(f'el => el.innerText += {json.dumps(topic_text)}')
else:
current_value = await content_input.evaluate('el => el.value')
await content_input.fill(current_value + topic_text)
except Exception:
# 如果添加失败,继续下一个
pass
print(f"已添加 {len(topics)} 个话题标签", file=sys.stderr)
await asyncio.sleep(1)
# 单独在话题输入框中模拟人类方式输入标签
if topics:
print("尝试在话题输入框中逐个输入标签...", file=sys.stderr)
tag_input_selectors = [
'input[placeholder*="话题"]',
'input[placeholder*="#"]',
'input[placeholder*="添加标签"]',
'[class*="tag"] input',
'[class*="topic"] input',
]
tag_input = None
for selector in tag_input_selectors:
try:
tag_input = await self.page.wait_for_selector(selector, timeout=3000)
if tag_input:
print(f"找到话题输入框: {selector}", file=sys.stderr)
break
except Exception:
continue
if tag_input:
for topic in topics:
try:
await tag_input.click()
await asyncio.sleep(0.3)
# 清空已有内容
try:
await tag_input.fill("")
except Exception:
pass
await tag_input.type("#" + topic, delay=50)
await asyncio.sleep(0.8)
# 等待联想列表并选择第一项
suggestion = None
suggestion_selectors = [
'[class*="suggest"] li',
'[role="listbox"] li',
'[class*="dropdown"] li',
]
for s_selector in suggestion_selectors:
try:
suggestion = await self.page.query_selector(s_selector)
if suggestion:
break
except Exception:
continue
if suggestion:
await suggestion.click()
print(f"✅ 已选择联想话题: {topic}", file=sys.stderr)
else:
# 没有联想列表时,通过回车确认
await tag_input.press("Enter")
print(f"✅ 未找到联想列表,使用回车确认话题: {topic}", file=sys.stderr)
await asyncio.sleep(0.5)
except Exception as e:
print(f"添加话题 {topic} 到输入框失败: {str(e)}", file=sys.stderr)
else:
print("⚠️ 未找到话题输入框,已退回到在正文中追加 #话题 的方式", file=sys.stderr)
else:
return {
"success": False,
"error": "未找到内容输入框"
}
except Exception as e:
return {
"success": False,
"error": f"输入内容失败: {str(e)}"
}
# 模拟简单的人类滚动行为
try:
for _ in range(3):
await self.page.mouse.wheel(0, random.randint(200, 500))
await asyncio.sleep(random.uniform(0.3, 0.8))
except Exception:
pass
# 点击发布按钮
try:
print("查找发布按钮...", file=sys.stderr)
submit_selectors = [
'button:has-text("发布笔记")',
'button:has-text("发布")',
'text="发布笔记"',
'text="发布"',
'.publish-btn',
'.submit-btn',
]
submit_btn = None
for selector in submit_selectors:
try:
submit_btn = await self.page.wait_for_selector(selector, timeout=3000)
if submit_btn:
# 检查按钮是否可点击
is_disabled = await submit_btn.evaluate('el => el.disabled')
if not is_disabled:
print(f"找到发布按钮: {selector}", file=sys.stderr)
break
else:
submit_btn = None
except Exception:
continue
if submit_btn:
# 设置网络监听,捕获发布接口响应
note_id = None
share_link = None
async def handle_response(response):
nonlocal note_id, share_link
try:
# 监听发布笔记的API响应
if '/web_api/sns/v2/note' in response.url:
print(f"✅ 捕获到发布API响应: {response.url}", file=sys.stderr)
if response.status == 200:
try:
data = await response.json()
print(f"API响应数据: {json.dumps(data, ensure_ascii=False)}", file=sys.stderr)
if data.get('success') and data.get('data'):
note_id = data['data'].get('id')
# 优先使用share_link,如果没有则使用note_id拼接
if 'share_link' in data:
share_link = data['share_link']
print(f"✅ 获取到笔记链接: {share_link}", file=sys.stderr)
elif note_id:
share_link = f"https://www.xiaohongshu.com/discovery/item/{note_id}"
print(f"✅ 根据ID生成笔记链接: {share_link}", file=sys.stderr)
except Exception as e:
print(f"解析API响应失败: {str(e)}", file=sys.stderr)
except Exception as e:
print(f"处理响应失败: {str(e)}", file=sys.stderr)
# 添加响应监听器
self.page.on('response', handle_response)
await submit_btn.click()
print("✅ 已点击发布按钮", file=sys.stderr)
await asyncio.sleep(3) # 等待更长时间以捕获API响应
# 检查是否出现社区规范限制提示
print("检查是否有社区规范限制...", file=sys.stderr)
try:
# 尝试查找各种可能的错误提示
error_selectors = [
'text="因违反社区规范禁止发笔记"',
'text*="违反社区规范"',
'text*="禁止发布"',
'text*="账号被限制"',
'text*="账号异常"',
'.error-tip',
'.warning-tip',
'[class*="error"]',
'[class*="warning"]',
]
for selector in error_selectors:
try:
error_el = await self.page.wait_for_selector(selector, timeout=2000)
if error_el:
error_text = await error_el.inner_text()
print(f"❌ 检测到错误提示: {error_text}", file=sys.stderr)
return {
"success": False,
"error": f"发布失败: {error_text}",
"error_type": "community_violation", # 标记错误类型
"message": error_text
}
except Exception:
continue
except Exception as e:
print(f"检查错误提示异常: {str(e)}", file=sys.stderr)
# 检查是否发布成功
print("检查发布结果...", file=sys.stderr)
try:
await asyncio.sleep(2) # 等待发布完成
# 如果捕获到了真实的笔记链接,直接返回
if share_link:
print(f"✅ 发布成功,获取到笔记链接: {share_link}", file=sys.stderr)
return {
"success": True,
"message": "笔记发布成功",
"data": {
"note_id": note_id,
"note_url": share_link
},
"url": share_link # 保持兼容性
}
# 如果没有捕获到,使用原来的逻辑
# 等待发布成功的提示或页面跳转
success_selectors = [
'text="发布成功"',
'text="发布完成"',
'text*="成功"',
'.success-tip',
'.success-message',
]
publish_success = False
for selector in success_selectors:
try:
success_el = await self.page.wait_for_selector(selector, timeout=3000)
if success_el:
success_text = await success_el.inner_text()
print(f"✅ 检测到发布成功提示: {success_text}", file=sys.stderr)
publish_success = True
break
except Exception:
continue
# 如果没有明确的成功提示检查URL是否变化
current_url = self.page.url
if not publish_success:
# 如果还在发布页面,可能是发布失败
if 'publish' in current_url.lower():
print("⚠️ 未检测到成功提示,但继续执行", file=sys.stderr)
else:
print("✅ URL已变化似乎发布成功", file=sys.stderr)
publish_success = True
print(f"发布后URL: {current_url}", file=sys.stderr)
return {
"success": True,
"message": "笔记发布成功",
"url": current_url
}
except Exception as e:
print(f"检查发布结果异常: {str(e)}", file=sys.stderr)
# 即使检查异常,也返回成功(因为按钮已点击)
return {
"success": True,
"message": "笔记已提交发布,但未能确认结果",
"url": self.page.url
}
else:
return {
"success": False,
"error": "未找到可用的发布按钮,可能内容不完整"
}
except Exception as e:
return {
"success": False,
"error": f"点击发布按钮失败: {str(e)}"
}
except Exception as e:
print(f"发布笔记异常: {str(e)}", file=sys.stderr)
return {
"success": False,
"error": str(e)
}