# Windows兼容性:必须在任何异步操作之前设置事件循环策略 import sys import asyncio import aiohttp import json if sys.platform == 'win32': # Windows下使用ProactorEventLoopPolicy来支持Playwright的子进程 asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) print("[系统] Windows环境已设置ProactorEventLoopPolicy", file=sys.stderr) # 加载配置 from config import init_config, get_config from dotenv import load_dotenv load_dotenv() # 从 .env 文件加载环境变量(可选,用于覆盖配置文件) from fastapi import FastAPI, HTTPException, File, UploadFile, Form, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, Dict, Any, List from datetime import datetime import os import shutil from pathlib import Path from xhs_login import XHSLoginService from browser_pool import get_browser_pool from scheduler import XHSScheduler from error_screenshot import cleanup_old_screenshots from ali_sms_service import AliSmsService app = FastAPI(title="小红书登录API") # CORS配置 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 生产环境应该限制具体域名 allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 全局登录服务实例(延迟初始化,避免在startup前创建浏览器池) login_service = None # 全局浏览器池实例(在startup时初始化) browser_pool = None # 全局调度器实例 scheduler = None # 全局阿里云短信服务实例 sms_service = None # 由于禁用了浏览器池,使用一个简单的字典来存储session temp_sessions = {} # WebSocket连接管理器 class ConnectionManager: def __init__(self): # session_id -> WebSocket连接 self.active_connections: Dict[str, WebSocket] = {} # session_id -> 消息队列(用于缓存连接建立前的消息) self.pending_messages: Dict[str, list] = {} # 未确认消息:{session_id: {message_id: {message, timestamp, retry_count}}} self.unconfirmed_messages: Dict[str, Dict[str, dict]] = {} # 消息重发定时器 self.retry_tasks: Dict[str, asyncio.Task] = {} async def connect(self, session_id: str, websocket: WebSocket): await websocket.accept() self.active_connections[session_id] = websocket print(f"[WebSocket] ========== 新连接建立 ==========", file=sys.stderr) print(f"[WebSocket] Session ID: {session_id}", file=sys.stderr) print(f"[WebSocket] 当前活跃连接数: {len(self.active_connections)}", file=sys.stderr) print(f"[WebSocket] 连接时间: {__import__('datetime').datetime.now()}", file=sys.stderr) print(f"[WebSocket] ===================================", file=sys.stderr) # 检查未确认消息(断线期间的消息) if session_id in self.unconfirmed_messages: unconfirmed_count = len(self.unconfirmed_messages[session_id]) print(f"[WebSocket] 发现 {unconfirmed_count} 条未确认消息(断线期间)", file=sys.stderr) # 等待100ms让前端监听器就绪 await asyncio.sleep(0.1) # 立即重发所有未确认消息 for idx, (message_id, msg_data) in enumerate(self.unconfirmed_messages[session_id].items()): try: print(f"[WebSocket] 重发未确认消息 [{idx+1}/{unconfirmed_count}]: {msg_data['message'].get('type')}", file=sys.stderr) await websocket.send_json(msg_data['message']) # 每条消息间隔1100ms await asyncio.sleep(0.1) except Exception as e: print(f"[WebSocket] 重发未确认消息失败: {str(e)}", file=sys.stderr) import traceback traceback.print_exc() print(f"[WebSocket] 已重发所有未确认消息", file=sys.stderr) # 检查pending消息(连接建立前的消息) if session_id in self.pending_messages: pending_count = len(self.pending_messages[session_id]) print(f"[WebSocket] 发现 {pending_count} 条pending消息", file=sys.stderr) print(f"[WebSocket] pending消息内容: {self.pending_messages[session_id]}", file=sys.stderr) # 等待100ms让前端监听器就绪 await asyncio.sleep(0.1) for idx, message in enumerate(self.pending_messages[session_id]): try: print(f"[WebSocket] 准备发送第{idx+1}条消息...", file=sys.stderr) await websocket.send_json(message) print(f"[WebSocket] 已发送pending消息 [{idx+1}/{pending_count}]: {message.get('type')}", file=sys.stderr) # 每条消息间隔100ms await asyncio.sleep(0.1) except Exception as e: print(f"[WebSocket] 发送pending消息失败: {str(e)}", file=sys.stderr) import traceback traceback.print_exc() del self.pending_messages[session_id] print(f"[WebSocket] pending消息已清空: {session_id}", file=sys.stderr) else: print(f"[WebSocket] 没有pending消息: {session_id}", file=sys.stderr) def disconnect(self, session_id: str, reason: str = "未知原因"): """断开WebSocket连接并记录原因""" if session_id in self.active_connections: del self.active_connections[session_id] print(f"[WebSocket] ========== 连接断开 ==========", file=sys.stderr) print(f"[WebSocket] Session ID: {session_id}", file=sys.stderr) print(f"[WebSocket] 断开原因: {reason}", file=sys.stderr) print(f"[WebSocket] 剩余活跃连接数: {len(self.active_connections)}", file=sys.stderr) # 检查是否有未确认消息 if session_id in self.unconfirmed_messages: unconfirmed_count = len(self.unconfirmed_messages[session_id]) print(f"[WebSocket] 保留 {unconfirmed_count} 条未确认消息,等待重连后重发", file=sys.stderr) print(f"[WebSocket] ===================================", file=sys.stderr) # 清理pending_messages(这些是连接建立前的消息,已经过时) if session_id in self.pending_messages: pending_count = len(self.pending_messages[session_id]) if pending_count > 0: print(f"[WebSocket] 清理 {pending_count} 条过时的pending消息", file=sys.stderr) del self.pending_messages[session_id] # 不清理unconfirmed_messages和retry_tasks,让它们在重连后继续工作 # 这样就能实现断线重连后的消息恢复 async def send_message(self, session_id: str, message: dict): """发送消息并等待确认""" import uuid import time # 为消息添加唯一ID和时间戳 if 'message_id' not in message: message['message_id'] = str(uuid.uuid4()) if 'timestamp' not in message: message['timestamp'] = time.time() message_id = message['message_id'] print(f"[WebSocket] ========== 尝试发送消息 ==========", file=sys.stderr) print(f"[WebSocket] Session ID: {session_id}", file=sys.stderr) print(f"[WebSocket] 消息类型: {message.get('type')}", file=sys.stderr) print(f"[WebSocket] 消息ID: {message_id}", file=sys.stderr) print(f"[WebSocket] 当前活跃连接数: {len(self.active_connections)}", file=sys.stderr) print(f"[WebSocket] 活跃连接session_ids: {list(self.active_connections.keys())}", file=sys.stderr) print(f"[WebSocket] session_id在连接中: {session_id in self.active_connections}", file=sys.stderr) print(f"[WebSocket] ===================================", file=sys.stderr) if session_id in self.active_connections: try: await self.active_connections[session_id].send_json(message) print(f"[WebSocket] 发送消息到 {session_id}: {message.get('type')}, message_id={message_id}", file=sys.stderr) # 存储未确认消息 if session_id not in self.unconfirmed_messages: self.unconfirmed_messages[session_id] = {} self.unconfirmed_messages[session_id][message_id] = { 'message': message, 'timestamp': time.time(), 'retry_count': 0 } # 启动重发任务 if session_id not in self.retry_tasks: self.retry_tasks[session_id] = asyncio.create_task( self._retry_unconfirmed_messages(session_id) ) print(f"[WebSocket] 已启动消息重发任务: {session_id}", file=sys.stderr) except Exception as e: print(f"[WebSocket] ========== 发送消息失败 ==========", file=sys.stderr) print(f"[WebSocket] Session ID: {session_id}", file=sys.stderr) print(f"[WebSocket] 失败原因: {str(e)}", file=sys.stderr) print(f"[WebSocket] 消息类型: {message.get('type')}", file=sys.stderr) print(f"[WebSocket] ===================================", file=sys.stderr) self.disconnect(session_id, reason=f"发送消息失败: {str(e)}") else: # WebSocket还未连接,缓存消息 print(f"[WebSocket] 连接尚未建立,缓存消息: {session_id}", file=sys.stderr) if session_id not in self.pending_messages: self.pending_messages[session_id] = [] self.pending_messages[session_id].append(message) # 最多缓存10条消息 if len(self.pending_messages[session_id]) > 10: self.pending_messages[session_id].pop(0) async def _retry_unconfirmed_messages(self, session_id: str): """定期检查并重发未确认的消息""" import time while True: try: await asyncio.sleep(3) # 每3秒检查一次 if session_id not in self.unconfirmed_messages: continue current_time = time.time() messages_to_retry = [] messages_to_remove = [] for message_id, msg_data in self.unconfirmed_messages[session_id].items(): elapsed = current_time - msg_data['timestamp'] # 超过60秒未确认,标记为失败并移除(给重连更多时间) if elapsed > 60: print(f"[WebSocket] 消息超时未确认: {message_id}, 已等待{elapsed:.1f}秒", file=sys.stderr) messages_to_remove.append(message_id) continue # 超过3秒未确认且重试次数<5,重发(增加重试次数) if elapsed > 3 and msg_data['retry_count'] < 5: messages_to_retry.append((message_id, msg_data)) # 移除超时消息 for message_id in messages_to_remove: del self.unconfirmed_messages[session_id][message_id] print(f"[WebSocket] 已移除超时消息: {message_id}", file=sys.stderr) # 重发消息(只有在连接活跃时才发送) for message_id, msg_data in messages_to_retry: if session_id in self.active_connections: try: await self.active_connections[session_id].send_json(msg_data['message']) msg_data['retry_count'] += 1 msg_data['timestamp'] = current_time print(f"[WebSocket] 重发消息: {message_id}, 第{msg_data['retry_count']}次重试", file=sys.stderr) except Exception as e: print(f"[WebSocket] 重发消息失败: {message_id}, {str(e)}", file=sys.stderr) # 不移除,等待下次重试 else: # 连接未建立,等待重连 print(f"[WebSocket] 连接未建立,等待重连后重发: {message_id}", file=sys.stderr) except asyncio.CancelledError: print(f"[WebSocket] 重发任务被取消: {session_id}", file=sys.stderr) break except Exception as e: print(f"[WebSocket] 重发任务异常: {session_id}, {str(e)}", file=sys.stderr) def confirm_message(self, session_id: str, message_id: str): """确认收到消息""" if session_id in self.unconfirmed_messages: if message_id in self.unconfirmed_messages[session_id]: del self.unconfirmed_messages[session_id][message_id] print(f"[WebSocket] 消息已确认: {session_id}, message_id={message_id}", file=sys.stderr) print(f"[WebSocket] 剩余未确认消息: {len(self.unconfirmed_messages[session_id])}", file=sys.stderr) # 如果没有未确认消息了,取消重发任务 if len(self.unconfirmed_messages[session_id]) == 0: if session_id in self.retry_tasks: self.retry_tasks[session_id].cancel() del self.retry_tasks[session_id] print(f"[WebSocket] 所有消息已确认,取消重发任务: {session_id}", file=sys.stderr) def get_unconfirmed_messages(self, session_id: str) -> list: """获取未确认的消息列表""" if session_id in self.unconfirmed_messages: messages = [msg_data['message'] for msg_data in self.unconfirmed_messages[session_id].values()] print(f"[WebSocket] 获取未确认消息: {session_id}, 共{len(messages)}条", file=sys.stderr) return messages return [] # 全局WebSocket管理器 ws_manager = ConnectionManager() async def fetch_proxy_from_pool() -> Optional[str]: """从代理池接口获取一个代理地址(http://ip:port),获取失败返回None""" config = get_config() if not config.get_bool('proxy_pool.enabled', False): return None api_url = config.get_str('proxy_pool.api_url', '') if not api_url: return None try: timeout = aiohttp.ClientTimeout(total=10) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(api_url) as resp: if resp.status != 200: print(f"[代理池] 接口返回非200状态码: {resp.status}", file=sys.stderr) return None text = (await resp.text()).strip() if not text: print("[代理池] 返回内容为空", file=sys.stderr) return None line = text.splitlines()[0].strip() if not line: print("[代理池] 首行内容为空", file=sys.stderr) return None if line.startswith("http://") or line.startswith("https://"): return line return "http://" + line except Exception as e: print(f"[代理池] 请求失败: {str(e)}", file=sys.stderr) return None # 临时文件存储目录 TEMP_DIR = Path("temp_uploads") TEMP_DIR.mkdir(exist_ok=True) # 请求模型 class SendCodeRequest(BaseModel): phone: str country_code: str = "+86" login_page: Optional[str] = None # 登录页面:creator 或 home,为None时使用配置文件默认值 session_id: Optional[str] = None # 可选:前端生成的session_id,用于WebSocket通知 class VerifyCodeRequest(BaseModel): phone: str code: str country_code: str = "+86" class LoginRequest(BaseModel): phone: str code: str country_code: str = "+86" login_page: Optional[str] = None # 登录页面:creator 或 home,为None时使用配置文件默认值 session_id: Optional[str] = None # 可选:复用send-code接口的session_id class PublishNoteRequest(BaseModel): title: str content: str images: Optional[list] = None topics: Optional[list] = None class PublishWithCookiesRequest(BaseModel): cookies: Optional[list] = None # 兼容旧版,仅传Cookies login_state: Optional[dict] = None # 新版,传完整的login_state storage_state_path: Optional[str] = None # 新增:storage_state文件路径(最优先) phone: Optional[str] = None # 新增:手机号,用于查找storage_state文件 title: str content: str images: Optional[list] = None topics: Optional[list] = None class InjectCookiesRequest(BaseModel): cookies: Optional[list] = None # 兼容旧版,仅传Cookies login_state: Optional[dict] = None # 新版,传完整的login_state target_page: Optional[str] = "creator" # 目标页面:creator 或 home # 响应模型 class BaseResponse(BaseModel): code: int message: str data: Optional[Dict[str, Any]] = None @app.on_event("startup") async def startup_event(): """启动时启动后台清理任务和定时发布任务(已禁用预热)""" # 初始化配置(从ENV环境变量读取,默认dev) config = init_config() print("[服务启动] FastAPI服务启动,浏览器池已就绪") # 清理旧的错误截图(保留最近7天) try: cleanup_old_screenshots(days=7) except Exception as e: print(f"[启动] 清理旧截图失败: {str(e)}") # 从配置文件读取headless参数 headless = config.get_bool('scheduler.headless', True) # 定时发布的headless配置 login_headless = config.get_bool('login.headless', False) # 登录/绑定的headless配置,默认为有头模式 login_page = config.get_str('login.page', 'creator') # 登录页面类型,默认为创作者中心 # 根据配置自动调整预热URL if login_page == "home": preheat_url = "https://www.xiaohongshu.com" else: preheat_url = "https://creator.xiaohongshu.com/login" # 初始化全局浏览器池(使用配置的headless参数) # 已禁用:使用AdsPower管理浏览器,不需要浏览器池 global browser_pool, login_service, sms_service browser_pool = None # 使用AdsPower,不创建浏览器池 # browser_pool = get_browser_pool(idle_timeout=1800, headless=headless) # print(f"[服务启动] 浏览器池模式: {'headless(无头模式)' if headless else 'headed(有头模式)'}") print(f"[服务启动] 浏览器管理: 使用AdsPower") # 初始化登录服务(使用独立的login.headless配置) login_service = XHSLoginService(use_pool=False, headless=login_headless) # 使用AdsPower,不需要浏览器池 print(f"[服务启动] 登录服务模式: {'headless(无头模式)' if login_headless else 'headed(有头模式)'}") # 初始化阿里云短信服务 sms_dict = config.get_dict('ali_sms') sms_service = AliSmsService( access_key_id=sms_dict.get('access_key_id', ''), access_key_secret=sms_dict.get('access_key_secret', ''), sign_name=sms_dict.get('sign_name', ''), template_code=sms_dict.get('template_code', '') ) print("[服务启动] 阿里云短信服务已初始化") # 启动浏览器池清理任务 # 已禁用:使用AdsPower,不需要浏览器池清理 # asyncio.create_task(browser_cleanup_task()) # 已禁用预热功能,避免干扰正常业务流程 # asyncio.create_task(browser_preheat_task()) print("[服务启动] 浏览器预热功能已禁用") # 启动定时发布任务 global scheduler # 从配置文件读取数据库配置 db_dict = config.get_dict('database') db_config = { 'host': db_dict.get('host', 'localhost'), 'port': db_dict.get('port', 3306), 'user': db_dict.get('username', 'root'), 'password': db_dict.get('password', ''), 'database': db_dict.get('dbname', 'ai_wht') } # 从配置文件读取调度器配置 scheduler_enabled = config.get_bool('scheduler.enabled', False) proxy_pool_enabled = config.get_bool('proxy_pool.enabled', False) proxy_pool_api_url = config.get_str('proxy_pool.api_url', '') proxy_username = config.get_str('proxy_pool.username', '') # 新增:代理用户名 proxy_password = config.get_str('proxy_pool.password', '') # 新增:代理密码 enable_random_ua = config.get_bool('scheduler.enable_random_ua', True) min_publish_interval = config.get_int('scheduler.min_publish_interval', 30) max_publish_interval = config.get_int('scheduler.max_publish_interval', 120) # headless已经在上面读取了 if scheduler_enabled: # 从配置读取是否启用AdsPower use_adspower_scheduler = config.get_bool('adspower.enabled', False) scheduler = XHSScheduler( db_config=db_config, max_concurrent=config.get_int('scheduler.max_concurrent', 2), publish_timeout=config.get_int('scheduler.publish_timeout', 300), max_articles_per_user_per_run=config.get_int('scheduler.max_articles_per_user_per_run', 2), max_failures_per_user_per_run=config.get_int('scheduler.max_failures_per_user_per_run', 3), max_daily_articles_per_user=config.get_int('scheduler.max_daily_articles_per_user', 6), max_hourly_articles_per_user=config.get_int('scheduler.max_hourly_articles_per_user', 2), proxy_pool_enabled=proxy_pool_enabled, proxy_pool_api_url=proxy_pool_api_url, proxy_username=proxy_username, # 新增:传递代理用户名 proxy_password=proxy_password, # 新增:传递代理密码 enable_random_ua=enable_random_ua, min_publish_interval=min_publish_interval, max_publish_interval=max_publish_interval, headless=headless, # 新增: 传递headless参数 use_adspower=use_adspower_scheduler # 新增:是否使用AdsPower ) cron_expr = config.get_str('scheduler.cron', '*/5 * * * * *') scheduler.start(cron_expr) print(f"[服务启动] 定时发布任务已启动,Cron: {cron_expr}", file=sys.stderr) else: print("[服务启动] 定时发布任务未启用", file=sys.stderr) async def browser_cleanup_task(): """后台任务:定期清理空闲浏览器""" while True: await asyncio.sleep(300) # 每5分钟检查一次 try: await browser_pool.cleanup_if_idle() except Exception as e: print(f"[清理任务] 浏览器清理异常: {str(e)}") async def browser_preheat_task(): """后台任务:预热浏览器""" try: # 延迟3秒启动,避免影响服务启动速度 await asyncio.sleep(3) print("[预热任务] 开始预热浏览器...") await browser_pool.preheat("https://creator.xiaohongshu.com/login") except Exception as e: print(f"[预热任务] 预热失败: {str(e)}") async def repreheat_browser_after_use(): """后台任务:使用后补充预热浏览器(仅用于登录流程)""" try: # 延迟5秒,确保: # 1. 响应已经返回给用户 # 2. Cookie已经完全获取并保存 # 3. 登录流程完全结束 await asyncio.sleep(5) print("[补充预热任务] 开始补充预热浏览器...") await browser_pool.repreheat("https://creator.xiaohongshu.com/login") except Exception as e: print(f"[补充预热任务] 补充预热失败: {str(e)}") @app.on_event("shutdown") async def shutdown_event(): """关闭时清理浏览器池和停止调度器""" print("[服务关闭] 正在关闭服务...") # 停止调度器 global scheduler if scheduler: scheduler.stop() print("[服务关闭] 调度器已停止") # 关闭浏览器池 # 已禁用:使用AdsPower,不需要浏览器池 # if browser_pool: # await browser_pool.close() # print("[服务关闭] 浏览器池已关闭") print("[服务关闭] 服务关闭完成") @app.post("/api/xhs/send-code", response_model=BaseResponse) async def send_code(request: SendCodeRequest): """ 发送验证码 通过playwright访问小红书官网,输入手机号并触发验证码发送 支持选择从创作者中心或小红书首页登录 并发支持:为每个请求分配独立的浏览器实例 """ # 使用前端传递的session_id,如果没有则生成新的 if request.session_id: session_id = request.session_id print(f"[发送验证码] 使用前端传递的session_id={session_id}, phone={request.phone}", file=sys.stderr) else: import uuid session_id = f"xhs_login_{uuid.uuid4().hex}" print(f"[发送验证码] 前端未传session_id,生成新的session_id={session_id}, phone={request.phone}", file=sys.stderr) # 获取配置中的默认login_page,如果API传入了则优先使用API参数 config = get_config() default_login_page = config.get_str('login.page', 'creator') login_page = request.login_page if request.login_page else default_login_page print(f"[发送验证码] 使用登录页面: {login_page} (配置默认={default_login_page}, API参数={request.login_page})", file=sys.stderr) try: # 从配置读取是否启用AdsPower use_adspower = config.get_bool('adspower.enabled', False) print(f"[发送验证码] AdsPower模式: {'[开启]' if use_adspower else '[关闭]'}", file=sys.stderr) # 每次都创建全新的登录服务实例(不使用浏览器池,确保每次获取新代理IP) request_login_service = XHSLoginService( use_pool=False, # 关键:不使用浏览器池,每次创建新实例 headless=login_service.headless, session_id=session_id, use_adspower=use_adspower ) # 调用登录服务发送验证码 result = await request_login_service.send_verification_code( phone=request.phone, country_code=request.country_code, login_page=login_page, # 传递登录页面参数 session_id=session_id # 传递session_id用于WebSocket通知 ) # 检查是否需要验证(发送验证码时触发风控) if result.get("need_captcha"): print(f"[发送验证码] 检测到需要扫码验证,保持session {session_id} 的浏览器继续运行", file=sys.stderr) return BaseResponse( code=0, # 成功返回二维码 message=result.get("message", "需要扫码验证"), data={ "need_captcha": True, "captcha_type": result.get("captcha_type"), "qrcode_image": result.get("qrcode_image"), "session_id": session_id } ) if result["success"]: # 验证码发送成功,关闭浏览器(下次重新创建) try: await request_login_service.close() print(f"[发送验证码] ✅ 已关闭浏览器实例: {session_id}", file=sys.stderr) except Exception as e: print(f"[发送验证码] 关闭浏览器失败: {str(e)}", file=sys.stderr) return BaseResponse( code=0, message="验证码已发送,请在小红书APP中查看", data={ "sent_at": datetime.now().isoformat(), "session_id": session_id # 返回session_id供前端使用 } ) else: # 发送失败,关闭浏览器 try: await request_login_service.close() print(f"[发送验证码] 已关闭失败的浏览器: {session_id}", file=sys.stderr) except Exception as e: print(f"[发送验证码] 关闭浏览器失败: {str(e)}", file=sys.stderr) return BaseResponse( code=1, message=result.get("error", "发送验证码失败"), data=None ) except Exception as e: print(f"发送验证码异常: {str(e)}", file=sys.stderr) # 异常情况,关闭浏览器 try: if 'request_login_service' in locals(): await request_login_service.close() print(f"[发送验证码] 已关闭异常的浏览器: {session_id}", file=sys.stderr) except Exception as close_error: print(f"[发送验证码] 关闭浏览器失败: {str(close_error)}", file=sys.stderr) return BaseResponse( code=1, message=f"发送验证码失败: {str(e)}", data=None ) @app.post("/api/xhs/phone/send-code", response_model=BaseResponse) async def send_phone_code(request: SendCodeRequest): """ 发送手机短信验证码(使用阿里云短信服务) 用于小红书手机号验证码登录 """ try: # 调用阿里云短信服务发送验证码 result = await sms_service.send_verification_code(request.phone) if result["success"]: return BaseResponse( code=0, message=result.get("message", "验证码已发送"), data={ "sent_at": datetime.now().isoformat(), # 开发环境返回验证码,生产环境应移除 "code": result.get("code") if get_config().get_bool('server.debug', False) else None } ) else: return BaseResponse( code=1, message=result.get("error", "发送验证码失败"), data=None ) except Exception as e: print(f"发送短信验证码异常: {str(e)}") return BaseResponse( code=1, message=f"发送验证码失败: {str(e)}", data=None ) @app.post("/api/xhs/phone/verify-code", response_model=BaseResponse) async def verify_phone_code(request: VerifyCodeRequest): """ 验证手机短信验证码 用于小红书手机号验证码登录 """ try: # 调用阿里云短信服务验证验证码 result = sms_service.verify_code(request.phone, request.code) if result["success"]: return BaseResponse( code=0, message="验证码验证成功", data={"verified_at": datetime.now().isoformat()} ) else: return BaseResponse( code=1, message=result.get("error", "验证码验证失败"), data=None ) except Exception as e: print(f"验证验证码异常: {str(e)}") return BaseResponse( code=1, message=f"验证失败: {str(e)}", data=None ) @app.post("/api/xhs/qrcode/start", response_model=BaseResponse) async def start_qrcode_login(): """ 启动小红书扫码登录,返回二维码图片和状态 每个用户必须使用独立的浏览器实例,不能共享Context """ try: print("[扫码登录] 启动扫码登录流程", file=sys.stderr) # 使用随机UUID创建临时的登录服务实例,完全不复用 import uuid session_id = f"qrcode_login_{uuid.uuid4().hex}" print(f"[扫码登录] 创建全新浏览器实例 session_id={session_id}", file=sys.stderr) qrcode_service = XHSLoginService( use_pool=False, # 使用AdsPower,不需要浏览器池 headless=login_service.headless, session_id=session_id, use_page_isolation=False # 小红书不支持页面隔离,必须独立浏览器 ) # 初始化浏览器 await qrcode_service.init_browser() # 启动扫码登录 result = await qrcode_service.start_qrcode_login() if result["success"]: return BaseResponse( code=0, message="二维码获取成功", data={ "session_id": session_id, "qrcode_image": result["qrcode_image"], "status_text": result.get("status_text", ""), "status_desc": result.get("status_desc", ""), "is_expired": result.get("is_expired", False), # 添加二维码创建信息 "qr_url": result.get("qr_url", ""), "qr_id": result.get("qr_id", ""), "qr_code": result.get("qr_code", ""), "multi_flag": result.get("multi_flag", 0) } ) else: # 失败后释放临时浏览器 if browser_pool and session_id: try: await browser_pool.release_temp_browser(session_id) print(f"[扫码登录] 已释放失败的session: {session_id}", file=sys.stderr) except Exception as release_error: print(f"[扫码登录] 释放浏览器失败: {str(release_error)}", file=sys.stderr) return BaseResponse( code=1, message=result.get("error", "获取二维码失败"), data=None ) except Exception as e: print(f"[扫码登录] 异常: {str(e)}", file=sys.stderr) # 异常后释放临时浏览器 if browser_pool and 'session_id' in locals(): try: await browser_pool.release_temp_browser(session_id) print(f"[扫码登录] 已释放异常的session: {session_id}", file=sys.stderr) except Exception as release_error: print(f"[扫码登录] 释放浏览器失败: {str(release_error)}", file=sys.stderr) return BaseResponse( code=1, message=f"启动扫码登录失败: {str(e)}", data=None ) @app.post("/api/xhs/qrcode/status") async def get_qrcode_status(request: dict): """ 轮询获取扫码状态和最新的二维码图片 """ try: session_id = request.get('session_id') if not session_id: return BaseResponse( code=1, message="session_id不能为空", data=None ) # 检查session是否存在于浏览器池中 if browser_pool and session_id not in browser_pool.temp_browsers: print(f"[扫码状态] session_id={session_id} 已失效,要求重新创建二维码", file=sys.stderr) return BaseResponse( code=2, # 特殊错误码,表示session失效 message="会话已失效,请刷新二维码重新开始", data={ "session_expired": True } ) # 使用session_id获取浏览器实例 qrcode_service = XHSLoginService( use_pool=False, # 使用AdsPower,不需要浏览器池 headless=login_service.headless, session_id=session_id ) # 初始化浏览器(会复用已有的) await qrcode_service.init_browser() # 提取当前二维码状态 result = await qrcode_service.extract_qrcode_with_status() if result["success"]: # 如果登录成功,返回登录信息 if result.get("login_success"): return BaseResponse( code=0, message="扫码登录成功", data={ "login_success": True, "user_info": result.get("user_info"), "cookies": result.get("cookies"), "cookies_full": result.get("cookies_full"), "login_state": result.get("login_state") } ) else: # 还未登录,返回二维码状态 return BaseResponse( code=0, message="获取状态成功", data={ "login_success": False, "qrcode_image": result["qrcode_image"], "status_text": result.get("status_text", ""), "status_desc": result.get("status_desc", ""), "is_expired": result.get("is_expired", False) } ) else: return BaseResponse( code=1, message=result.get("error", "获取状态失败"), data=None ) except Exception as e: print(f"[扫码状态] 异常: {str(e)}", file=sys.stderr) import traceback traceback.print_exc() return BaseResponse( code=1, message=f"获取状态失败: {str(e)}", data=None ) @app.post("/api/xhs/qrcode/refresh") async def refresh_qrcode(request: dict): """ 刷新过期的二维码 """ try: session_id = request.get('session_id') if not session_id: return BaseResponse( code=1, message="session_id不能为空", data=None ) # 使用session_id获取浏览器实例 qrcode_service = XHSLoginService( use_pool=False, # 使用AdsPower,不需要浏览器池 headless=login_service.headless, session_id=session_id ) # 初始化浏览器 await qrcode_service.init_browser() # 刷新二维码 result = await qrcode_service.refresh_qrcode() if result["success"]: return BaseResponse( code=0, message="二维码刷新成功", data={ "qrcode_image": result["qrcode_image"], "status_text": result.get("status_text", ""), "status_desc": result.get("status_desc", ""), "is_expired": result.get("is_expired", False), # 添加二维码创建信息 "qr_url": result.get("qr_url", ""), "qr_id": result.get("qr_id", ""), "qr_code": result.get("qr_code", ""), "multi_flag": result.get("multi_flag", 0) } ) else: # 检查是否需要重启 if result.get("need_restart"): return BaseResponse( code=3, # 特殊错误码,表示需要重启 message="页面已失效,请重新启动扫码登录", data={ "need_restart": True } ) return BaseResponse( code=1, message=result.get("error", "刷新失败"), data=None ) except Exception as e: print(f"[刷新二维码] 异常: {str(e)}", file=sys.stderr) return BaseResponse( code=1, message=f"刷新二维码失败: {str(e)}", data=None ) @app.post("/api/xhs/save-bind-info") async def save_bind_info(request: dict): """ 保存扫码登录的绑定信息到Go后端 与验证码登录不同,扫码登录直接返回了完整数据,需要由Python转发给Go后端保存 """ try: employee_id = request.get('employee_id') cookies_full = request.get('cookies_full', []) user_info = request.get('user_info', {}) login_state = request.get('login_state', {}) if not employee_id: return BaseResponse( code=1, message="employee_id不能为空", data=None ) # 调用Go后端API保存 config = get_config() go_backend_url = config.get_str('go_backend.url', 'http://localhost:8080') # 构造请求数据,模仏bind-xhs接口的返回格式 # Go后端期望接收的是验证码登录的结果 save_data = { "employee_id": employee_id, "cookies_full": cookies_full, "user_info": user_info, "login_state": login_state } import aiohttp async with aiohttp.ClientSession() as session: # 获取小程序传来的token auth_header = request.get('Authorization', '') async with session.post( f"{go_backend_url}/api/xhs/save-qrcode-login", json=save_data, headers={'Authorization': auth_header} if auth_header else {} ) as resp: result = await resp.json() if resp.status == 200 and result.get('code') == 200: return BaseResponse( code=0, message="保存成功", data=result.get('data') ) else: return BaseResponse( code=1, message=result.get('message', '保存失败'), data=None ) except Exception as e: print(f"[保存绑定信息] 异常: {str(e)}", file=sys.stderr) import traceback traceback.print_exc() return BaseResponse( code=1, message=f"保存失败: {str(e)}", data=None ) @app.post("/api/xhs/save-login") async def save_login(request: dict): """ 保存验证码登录的信息到Go后端 与扫码登录不同,验证码登录返回的是storage_state数据 """ try: employee_id = request.get('employee_id') storage_state = request.get('storage_state', {}) storage_state_path = request.get('storage_state_path', '') user_info = request.get('user_info', {}) # 新增: 获取用户信息 if not employee_id: return BaseResponse( code=1, message="employee_id不能为空", data=None ) if not storage_state: return BaseResponse( code=1, message="storage_state不能为空", data=None ) # 调用Go后端API保存 config = get_config() go_backend_url = config.get_str('go_backend.url', 'http://localhost:8080') # 从 storage_state 中提取 cookies cookies_full = storage_state.get('cookies', []) # 构造保存数据 save_data = { "employee_id": employee_id, "cookies_full": cookies_full, "storage_state": storage_state, "storage_state_path": storage_state_path, "user_info": user_info # 新增: 传递用户信息 } print(f"[保存验证码登录] employee_id={employee_id}, cookies数量={len(cookies_full)}, 用户={user_info.get('nickname', '未知')}", file=sys.stderr) import aiohttp async with aiohttp.ClientSession() as session: # 获取小程序传来的token auth_header = request.get('Authorization', '') async with session.post( f"{go_backend_url}/api/xhs/save-login", json=save_data, headers={'Authorization': auth_header} if auth_header else {} ) as resp: result = await resp.json() if resp.status == 200 and result.get('code') == 200: print(f"[保存验证码登录] 保存成功", file=sys.stderr) return BaseResponse( code=0, message="保存成功", data=result.get('data') ) else: print(f"[保存验证码登录] 保存失败: {result.get('message')}", file=sys.stderr) return BaseResponse( code=1, message=result.get('message', '保存失败'), data=None ) except Exception as e: print(f"[保存验证码登录] 异常: {str(e)}", file=sys.stderr) import traceback traceback.print_exc() return BaseResponse( code=1, message=f"保存失败: {str(e)}", data=None ) @app.post("/api/xhs/qrcode/cancel") async def cancel_qrcode_login(request: dict): """ 取消扫码登录,释放浏览器资源 用于用户切换登录方式或关闭页面时 """ try: session_id = request.get('session_id') if not session_id: return BaseResponse( code=1, message="session_id不能为空", data=None ) # 释放临时浏览器 if browser_pool: try: await browser_pool.release_temp_browser(session_id) print(f"[取消扫码] 已释放 session: {session_id}", file=sys.stderr) return BaseResponse( code=0, message="已取消扫码登录", data=None ) except Exception as e: print(f"[取消扫码] 释放浏览器失败: {str(e)}", file=sys.stderr) # 即使失败也返回成功,不影响用户体验 return BaseResponse( code=0, message="已取消扫码登录", data=None ) else: return BaseResponse( code=0, message="浏览器池未初始化", data=None ) except Exception as e: print(f"[取消扫码] 异常: {str(e)}", file=sys.stderr) return BaseResponse( code=0, # 即使异常也返回成功 message="已取消扫码登录", data=None ) @app.post("/api/xhs/login", response_model=BaseResponse) async def login(request: LoginRequest): """ 登录验证 用户填写验证码后,完成登录并获取小红书返回的数据 支持选择从创作者中心或小红书首页登录 并发支持:可复用send-code接口的session_id """ # 必须使用前端传递的session_id来复用浏览器 # 如果前端没有传session_id,说明前端实现有问题 if not request.session_id: return BaseResponse( code=1, message="缺少session_id参数,无法复用浏览器实例,请重新发送验证码", data=None ) session_id = request.session_id print(f"[登录验证] session_id={session_id}, phone={request.phone}", file=sys.stderr) # 获取配置中的默认login_page,如果API传入了则优先使用API参数 config = get_config() default_login_page = config.get_str('login.page', 'creator') login_page = request.login_page if request.login_page else default_login_page print(f"[登录验证] 使用登录页面: {login_page} (配置默认={default_login_page}, API参数={request.login_page})", file=sys.stderr) try: # 如果有session_id,复用send-code的浏览器;否则创建新的 if session_id: print(f"[登录验证] 尝试复用send-code的浏览器: {session_id}", file=sys.stderr) # 先检查浏览器池中是否存在该session if browser_pool and session_id in browser_pool.temp_browsers: print(f"[登录验证] ✅ 在浏览器池中找到session: {session_id}", file=sys.stderr) else: print(f"[登录验证] ⚠️ 浏览器池中未找到session: {session_id}", file=sys.stderr) print(f"[登录验证] 当前池中的session列表: {list(browser_pool.temp_browsers.keys()) if browser_pool else 'None'}", file=sys.stderr) request_login_service = XHSLoginService( use_pool=True, headless=login_service.headless, # 使用配置文件中的login.headless配置 session_id=session_id ) # 初始化浏览器,以便从浏览器池获取临时浏览器 await request_login_service.init_browser() # 再次验证浏览器是否正常初始化 if request_login_service.page: print(f"[登录验证] ✅ 浏览器初始化成功,当前URL: {request_login_service.page.url}", file=sys.stderr) else: print(f"[登录验证] ❌ 浏览器初始化失败,page为None", file=sys.stderr) else: # 旧逻辑:不传session_id,使用全局登录服务 print(f"[登录验证] 使用全局登录服务(旧逻辑)", file=sys.stderr) request_login_service = login_service # 调用登录服务进行登录 result = await request_login_service.login( phone=request.phone, code=request.code, country_code=request.country_code, login_page=login_page # 传递登录页面参数 ) # 检查是否需要扫码验证 if result.get("need_captcha"): # 需要扫码验证,不释放浏览器,保持session_id对应的浏览器继续运行 print(f"[登录验证] 检测到需要扫码验证,保持session {session_id} 的浏览器继续运行", file=sys.stderr) return BaseResponse( code=0, # 成功返回二维码 message=result.get("message", "需要扫码验证"), data={ "need_captcha": True, "captcha_type": result.get("captcha_type"), "qrcode_image": result.get("qrcode_image"), "session_id": session_id } ) # 释放临时浏览器(仅在登录成功或失败时释放) if session_id and browser_pool: try: await browser_pool.release_temp_browser(session_id) print(f"[登录验证] 已释放临时浏览器: {session_id}", file=sys.stderr) except Exception as e: print(f"[登录验证] 释放临时浏览器失败: {str(e)}", file=sys.stderr) if result["success"]: # 登录成功,不再触发预热(已禁用预热功能) # asyncio.create_task(repreheat_browser_after_use()) return BaseResponse( code=0, message="登录成功", data={ "user_info": result.get("user_info"), "cookies": result.get("cookies"), # 键值对格式(前端展示) "cookies_full": result.get("cookies_full"), # Playwright完整格式(数据库存储/脚本使用) "login_state": result.get("login_state"), # 完整登录状态(包含cookies + localStorage + sessionStorage) "localStorage": result.get("localStorage"), # localStorage数据 "sessionStorage": result.get("sessionStorage"), # sessionStorage数据 "url": result.get("url"), # 当前URL "storage_state_path": result.get("storage_state_path"), # storage_state文件路径 "login_time": datetime.now().isoformat() } ) else: # 登录失败 return BaseResponse( code=1, message=result.get("error", "登录失败"), data=None ) except Exception as e: print(f"登录异常: {str(e)}", file=sys.stderr) # 异常情况,释放临时浏览器 if session_id and browser_pool: try: await browser_pool.release_temp_browser(session_id) print(f"[登录验证] 已释放临时浏览器: {session_id}", file=sys.stderr) except Exception as release_error: print(f"[登录验证] 释放临时浏览器失败: {str(release_error)}", file=sys.stderr) return BaseResponse( code=1, message=f"登录失败: {str(e)}", data=None ) @app.get("/") async def root(): """健康检查""" if browser_pool: stats = browser_pool.get_stats() return { "status": "ok", "message": "小红书登录服务运行中(浏览器池模式)", "browser_pool": stats } return {"status": "ok", "message": "服务初始化中..."} @app.get("/api/health") async def health_check(): """健康检查接口(详细)""" if browser_pool: stats = browser_pool.get_stats() return { "status": "healthy", "service": "xhs-login-service", "mode": "browser-pool", "browser_pool_stats": stats, "timestamp": datetime.now().isoformat() } return { "status": "initializing", "service": "xhs-login-service", "timestamp": datetime.now().isoformat() } @app.post("/api/xhs/inject-cookies", response_model=BaseResponse) async def inject_cookies(request: InjectCookiesRequest): """ 注入Cookies或完整登录状态并验证 支持两种模式: 1. 仅注入Cookies(兼容旧版) 2. 注入完整login_state(包含Cookies + localStorage + sessionStorage) 支持选择跳转到创作者中心或小红书首页 重要:为了避免检测,不使用浏览器池,每次创建全新的浏览器实例 """ try: # 关闭旧的浏览器(如果有) if login_service.browser: await login_service.close_browser() # 创建一个独立的登录服务实例,不使用浏览器池 print("✅ 为注入Cookie创建全新的浏览器实例,不使用浏览器池", file=sys.stderr) inject_service = XHSLoginService(use_pool=False, headless=False) # 不使用浏览器池,使用有头模式方便调试 # 优先使用login_state,其次使用cookies if request.login_state: # 新版:使用完整的login_state print("✅ 检测到login_state,将恢复完整登录状态", file=sys.stderr) # 保存login_state到文件,供 init_browser 加载 with open('login_state.json', 'w', encoding='utf-8') as f: json.dump(request.login_state, f, ensure_ascii=False, indent=2) # 使用restore_state=True恢复完整状态 await inject_service.init_browser(restore_state=True) elif request.cookies: # 兼容旧版:仅使用Cookies print("⚠️ 检测到仅有Cookies,建议使用login_state获取更好的兼容性", file=sys.stderr) await inject_service.init_browser(cookies=request.cookies) else: return BaseResponse( code=1, message="请提供 cookies 或 login_state", data=None ) # 根据target_page参数确定验证URL target_page = request.target_page or "creator" if target_page == "home": verify_url = "https://www.xiaohongshu.com" page_name = "小红书首页" else: verify_url = "https://creator.xiaohongshu.com" page_name = "创作者中心" # 访问目标页面并验证登录状态 result = await inject_service.verify_login_status(url=verify_url) # 关闭独立的浏览器实例(注:因为不是池模式,会真正关闭) # await inject_service.close_browser() # 先不关闭,让用户看到结果 if result.get("logged_in"): return BaseResponse( code=0, message=f"{'login_state' if request.login_state else 'Cookie'}注入成功,已跳转到{page_name}", data={ "logged_in": True, "target_page": page_name, "user_info": result.get("user_info"), "cookies": result.get("cookies"), # 键值对格式 "cookies_full": result.get("cookies_full"), # Playwright完整格式 "url": result.get("url") } ) else: # 失败时关闭浏览器 await inject_service.close_browser() return BaseResponse( code=1, message=result.get("message", "{'login_state' if request.login_state else 'Cookie'}已失效,请重新登录"), data={ "logged_in": False } ) except Exception as e: print(f"注入失败: {str(e)}", file=sys.stderr) import traceback traceback.print_exc() return BaseResponse( code=1, message=f"注入失败: {str(e)}", data=None ) @app.post("/api/xhs/publish-with-cookies", response_model=BaseResponse) async def publish_note_with_cookies(request: PublishWithCookiesRequest): """ 使用Cookies或完整login_state或storage_state发布笔记(供Go后端定时任务调用) 支持三种模式(按优先级): 1. 使用storage_state_path(推荐,最完整的登录状态) 2. 传入完整login_state(次选,包含cookies + localStorage + sessionStorage) 3. 仅传入Cookies(兼容旧版) 重要:为了避免检测,不使用浏览器池,每次创建全新的浏览器实例 """ try: # 获取代理(如果启用) proxy = await fetch_proxy_from_pool() if proxy: print(f"[发布接口] 使用代理: {proxy}", file=sys.stderr) # 创建一个独立的登录服务实例,不使用浏览器池,应用所有反检测措施 print("✅ 为发布任务创建全新的浏览器实例,不使用浏览器池", file=sys.stderr) # 从配置读取headless参数 config = get_config() headless = config.get_bool('scheduler.headless', True) publish_service = XHSLoginService(use_pool=False, headless=headless) # 不使用浏览器池 # 优先级判断:storage_state_path > login_state > cookies if request.storage_state_path or request.phone: # 模式1:使用storage_state(最优先) storage_state_file = None if request.storage_state_path: # 直接指定了storage_state路径 storage_state_file = request.storage_state_path elif request.phone: # 根据手机号查找 storage_state_dir = 'storage_states' storage_state_file = os.path.join(storage_state_dir, f"xhs_{request.phone}.json") if storage_state_file and os.path.exists(storage_state_file): print(f"✅ 检测到storage_state文件: {storage_state_file},将使用Playwright原生恢复", file=sys.stderr) # 使用Playwright原生API恢复登录状态 await publish_service.init_browser_with_storage_state( storage_state_path=storage_state_file, proxy=proxy ) else: print(f"⚠️ storage_state文件不存在: {storage_state_file},回退到login_state或cookies模式", file=sys.stderr) # 回退到旧模式 if request.login_state: await _init_with_login_state(publish_service, request.login_state, proxy) elif request.cookies: await publish_service.init_browser(cookies=request.cookies, proxy=proxy) else: return BaseResponse( code=1, message="storage_state文件不存在,且未提供 login_state 或 cookies", data=None ) elif request.login_state: # 模式2:使用login_state print("✅ 检测到login_state,将恢复完整登录状态", file=sys.stderr) await _init_with_login_state(publish_service, request.login_state, proxy) elif request.cookies: # 模式3:仅使用Cookies(兼容旧版) print("⚠️ 检测到仅有Cookies,建议使用storage_state或login_state获取更好的兼容性", file=sys.stderr) await publish_service.init_browser(cookies=request.cookies, proxy=proxy) else: return BaseResponse( code=1, message="请提供 storage_state_path、phone、login_state 或 cookies", data=None ) # 调用发布方法(使用已经初始化好的publish_service) result = await publish_service.publish_note( title=request.title, content=request.content, images=request.images, topics=request.topics, cookies=None, # 已经注入,不需要再传 proxy=None, # 已经设置,不需要再传 ) # 关闭独立的浏览器实例 await publish_service.close_browser() if result["success"]: return BaseResponse( code=0, message="笔记发布成功", data={ "url": result.get("url"), "publish_time": datetime.now().isoformat() } ) else: return BaseResponse( code=1, message=result.get("error", "发布失败"), data=None ) except Exception as e: print(f"发布笔记异常: {str(e)}", file=sys.stderr) import traceback traceback.print_exc() return BaseResponse( code=1, message=f"发布失败: {str(e)}", data=None ) async def _init_with_login_state(publish_service, login_state, proxy): """使用login_state初始化浏览器""" # 保存login_state到临时文件 import tempfile import uuid temp_file = os.path.join(tempfile.gettempdir(), f"login_state_{uuid.uuid4()}.json") with open(temp_file, 'w', encoding='utf-8') as f: json.dump(login_state, f, ensure_ascii=False, indent=2) # 使用restore_state=True恢复完整状态 await publish_service.init_browser( cookies=login_state.get('cookies'), proxy=proxy, user_agent=login_state.get('user_agent') ) # 恢夏localStorage和sessionStorage try: if login_state.get('localStorage') or login_state.get('sessionStorage'): target_url = login_state.get('url', 'https://creator.xiaohongshu.com') await publish_service.page.goto(target_url, wait_until='domcontentloaded', timeout=15000) if login_state.get('localStorage'): for key, value in login_state['localStorage'].items(): await publish_service.page.evaluate(f'localStorage.setItem("{key}", {json.dumps(value)})') if login_state.get('sessionStorage'): for key, value in login_state['sessionStorage'].items(): await publish_service.page.evaluate(f'sessionStorage.setItem("{key}", {json.dumps(value)})') print("✅ 已恢夏localStorage和sessionStorage", file=sys.stderr) except Exception as e: print(f"⚠️ 恢夏storage失败: {str(e)}", file=sys.stderr) # 清理临时文件 try: os.remove(temp_file) except: pass @app.post("/api/xhs/upload-images") async def upload_images(files: List[UploadFile] = File(...)): """ 上传图片到服务器临时目录 返回图片的服务器路径 """ try: uploaded_paths = [] for file in files: # 检查文件类型 if not file.content_type.startswith('image/'): return { "code": 1, "message": f"文件 {file.filename} 不是图片类型", "data": None } # 生成唯一文件名 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") file_ext = os.path.splitext(file.filename)[1] safe_filename = f"{timestamp}{file_ext}" file_path = TEMP_DIR / safe_filename # 保存文件 with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # 使用绝对路径 abs_path = str(file_path.absolute()) uploaded_paths.append(abs_path) print(f"已上传图片: {abs_path}") return { "code": 0, "message": f"成功上传 {len(uploaded_paths)} 张图片", "data": { "paths": uploaded_paths, "count": len(uploaded_paths) } } except Exception as e: print(f"上传图片异常: {str(e)}") return { "code": 1, "message": f"上传失败: {str(e)}", "data": None } async def handle_send_code_ws(session_id: str, phone: str, country_code: str, login_page: str, websocket: WebSocket): """ 异步处理WebSocket发送验证码请求 返回: (result, service_instance) - result是结果字典,service_instance是XHSLoginService实例 """ try: print(f"[WebSocket-SendCode] 开始处理: session={session_id}, phone={phone}", file=sys.stderr) # 从配置读取是否启用AdsPower config = get_config() use_adspower = config.get_bool('adspower.enabled', False) print(f"[WebSocket-SendCode] AdsPower模式: {'[开启]' if use_adspower else '[关闭]'}", file=sys.stderr) # 每次都创建全新的登录服务实例(不使用浏览器池,确保每次获取新代理IP) request_login_service = XHSLoginService( use_pool=False, # 不使用浏览器池,每次创建新实例 headless=login_service.headless, session_id=session_id, use_adspower=use_adspower ) # 调用登录服务发送验证码 result = await request_login_service.send_verification_code( phone=phone, country_code=country_code, login_page=login_page, session_id=session_id ) # 将service实例存储,供后续验证码验证使用 # 注意:由于browser_pool已禁用,使用temp_sessions字典存储 if session_id not in temp_sessions: # 手动创建session记录 temp_sessions[session_id] = { 'browser': request_login_service.browser, 'context': request_login_service.context, 'page': request_login_service.page, 'service': request_login_service, 'created_at': datetime.now() } print(f"[WebSocket-SendCode] 已手动创建session记录: {session_id}", file=sys.stderr) else: # 更新现有session的service实例 temp_sessions[session_id]['service'] = request_login_service print(f"[WebSocket-SendCode] 已更新service实例: {session_id}", file=sys.stderr) # 检查是否需要验证(发送验证码时触发风控) if result.get("need_captcha"): print(f"[WebSocket-SendCode] 检测到风控,需要扫码", file=sys.stderr) await websocket.send_json({ "type": "need_captcha", "captcha_type": result.get("captcha_type"), "qrcode_image": result.get("qrcode_image"), "message": result.get("message", "需要扫码验证") }) print(f"[WebSocket-SendCode] 已推送风控信息", file=sys.stderr) # 返回service实例,供外部启动监听任务 return result, request_login_service if result["success"]: print(f"[WebSocket-SendCode] 验证码发送成功", file=sys.stderr) await websocket.send_json({ "type": "code_sent", "success": True, "message": "验证码已发送,请在小红书APP中查看" }) else: print(f"[WebSocket-SendCode] 发送失败: {result.get('error')}", file=sys.stderr) await websocket.send_json({ "type": "code_sent", "success": False, "message": result.get("error", "发送验证码失败") }) return result, request_login_service except Exception as e: print(f"[WebSocket-SendCode] 异常: {str(e)}", file=sys.stderr) import traceback traceback.print_exc() try: await websocket.send_json({ "type": "code_sent", "success": False, "message": f"发送验证码失败: {str(e)}" }) except: pass return {"success": False, "error": str(e)}, None async def handle_verify_code_ws(session_id: str, phone: str, code: str, country_code: str, login_page: str, websocket: WebSocket): """ 异步处理WebSocket验证码验证请求 """ try: print(f"[WebSocket-VerifyCode] 开始验证: session={session_id}, phone={phone}, code={code}", file=sys.stderr) # 从temp_sessions中获取之前的浏览器实例 if session_id not in temp_sessions: print(f"[WebSocket-VerifyCode] 未找到session: {session_id}", file=sys.stderr) await websocket.send_json({ "type": "login_result", "success": False, "message": "会话已过期,请重新发送验证码" }) return # 获取浏览器实例 browser_data = temp_sessions[session_id] request_login_service = browser_data['service'] # 调用登录服务验证登录 result = await request_login_service.login( phone=phone, code=code, country_code=country_code, login_page=login_page ) # 检查是否需要验证(登录时触发风控) if result.get("need_captcha"): print(f"[WebSocket-VerifyCode] 登录时检测到风控", file=sys.stderr) await websocket.send_json({ "type": "need_captcha", "captcha_type": result.get("captcha_type"), "qrcode_image": result.get("qrcode_image"), "message": result.get("message", "需要扫码验证") }) return if result["success"]: print(f"[WebSocket-VerifyCode] 登录成功", file=sys.stderr) # 获取storage_state storage_state = result.get("storage_state") # 保存storage_state到文件 storage_state_path = None if storage_state: import os os.makedirs('storage_states', exist_ok=True) storage_state_path = f"storage_states/{phone}_state.json" import json with open(storage_state_path, 'w', encoding='utf-8') as f: json.dump(storage_state, f, ensure_ascii=False, indent=2) print(f"[WebSocket-VerifyCode] 已保存storage_state: {storage_state_path}", file=sys.stderr) # 推送登录成功消息 await websocket.send_json({ "type": "login_success", "success": True, "cookies": result.get("cookies"), # 键值对格式 "cookies_full": result.get("cookies_full"), # Playwright完整格式(你需要的格式) "user_info": result.get("user_info"), "login_state": result.get("login_state"), "storage_state": storage_state, "storage_state_path": storage_state_path, "message": "登录成功" }) # 释放浏览器 try: # 使用temp_sessions的自定义清理逻辑 if session_id in temp_sessions: try: service = temp_sessions[session_id].get('service') if service: # 关闭浏览器 await service.close_browser() # 使用正确的方法名 print(f"[WebSocket-VerifyCode] 浏览器已关闭: {session_id}", file=sys.stderr) # 关闭后查询AdsPower Cookie adspower_cookies = await service.get_adspower_cookies_after_close() if adspower_cookies: print(f"[WebSocket-VerifyCode] 获取到AdsPower Cookie: {len(adspower_cookies)}个", file=sys.stderr) # 更新返回给前端的cookies_full result['cookies_full'] = adspower_cookies result['cookies'] = {cookie['name']: cookie['value'] for cookie in adspower_cookies} # 更新storage_state中的cookies if storage_state: storage_state['cookies'] = adspower_cookies result['storage_state'] = storage_state # 重新保存storage_state文件 try: with open(storage_state_path, 'w', encoding='utf-8') as f: json.dump(storage_state, f, ensure_ascii=False, indent=2) print(f"[WebSocket-VerifyCode] 已更新storage_state文件中的Cookie", file=sys.stderr) except Exception as save_err: print(f"[WebSocket-VerifyCode] 更新storage_state失败: {str(save_err)}", file=sys.stderr) # 检查WebSocket连接状态后再推送消息 try: # 重新推送登录成功消息,包含更新后的Cookie await websocket.send_json({ "type": "login_success", "success": True, "cookies": result.get("cookies"), "cookies_full": adspower_cookies, # 使用AdsPower Cookie "user_info": result.get("user_info"), "login_state": result.get("login_state"), "storage_state": storage_state, "storage_state_path": storage_state_path, "message": "登录成功(已同步AdsPower Cookie)" }) print(f"[WebSocket-VerifyCode] 已重新推送包含AdsPower Cookie的登录成功消息", file=sys.stderr) except Exception as send_err: # WebSocket已断开,不记录错误(客户端可能已主动断开) print(f"[WebSocket-VerifyCode] WebSocket已断开,跳过消息推送", file=sys.stderr) else: print(f"[WebSocket-VerifyCode] 未获取到AdsPower Cookie,使用Playwright Cookie", file=sys.stderr) del temp_sessions[session_id] print(f"[WebSocket-VerifyCode] 已释放session: {session_id}", file=sys.stderr) except Exception as e: print(f"[WebSocket-VerifyCode] 释放浏览器失败: {str(e)}", file=sys.stderr) except Exception as e: print(f"[WebSocket-VerifyCode] 释放浏览器失败: {str(e)}", file=sys.stderr) else: print(f"[WebSocket-VerifyCode] 登录失败: {result.get('error')}", file=sys.stderr) await websocket.send_json({ "type": "login_result", "success": False, "message": result.get("error", "登录失败") }) except Exception as e: print(f"[WebSocket-VerifyCode] 异常: {str(e)}", file=sys.stderr) import traceback traceback.print_exc() try: await websocket.send_json({ "type": "login_result", "success": False, "message": f"登录失败: {str(e)}" }) except: pass @app.websocket("/ws/login/{session_id}") async def websocket_login(websocket: WebSocket, session_id: str): """ WebSocket端点:实时监听登录状态 用于扫码验证后的实时通知 """ await ws_manager.connect(session_id, websocket) # 启动Redis订阅任务 import asyncio import redis.asyncio as aioredis import json from config import get_config config = get_config() redis_host = config.get_str('redis.host', 'localhost') redis_port = config.get_int('redis.port', 6379) redis_password = config.get_str('redis.password', '') # 创建Redis订阅客户端 redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}" if redis_password else f"redis://{redis_host}:{redis_port}" redis_client = await aioredis.from_url(redis_url, decode_responses=True) pubsub = redis_client.pubsub() channel = f"ws_message:{session_id}" await pubsub.subscribe(channel) print(f"[WebSocket] 已订阅Redis频道: {channel}", file=sys.stderr) # 启动后台任务监听Redis消息 async def redis_subscriber(): try: async for message in pubsub.listen(): if message['type'] == 'message': try: data = json.loads(message['data']) print(f"[WebSocket] 从Redis收到消息: {data}", file=sys.stderr) await websocket.send_json(data) print(f"[WebSocket] 已转发消息到前端: {session_id}", file=sys.stderr) except Exception as e: print(f"[WebSocket] 处理Redis消息失败: {str(e)}", file=sys.stderr) except Exception as e: print(f"[WebSocket] Redis订阅异常: {str(e)}", file=sys.stderr) # 在后台启动Redis监听 redis_task = asyncio.create_task(redis_subscriber()) try: # 保持连接,等待消息或断开 while True: # 接收客户端消息(ping/pong保持连接) data = await websocket.receive_text() print(f"[WebSocket] 收到客户端消息 {session_id}: {data}", file=sys.stderr) # 处理ping消息 if data == "ping": await websocket.send_text("pong") else: # 尝试解析JSON消息 try: msg = json.loads(data) msg_type = msg.get('type', 'unknown') print(f"[WebSocket] 解析消息类型: {msg_type}", file=sys.stderr) # 处理测试消息 if msg_type == 'test': print(f"[WebSocket] 收到测试消息: {msg.get('message')}", file=sys.stderr) # 回复测试消息 await websocket.send_json({ "type": "test_response", "message": "Test message received by backend successfully!", "timestamp": data }) print(f"[WebSocket] 已回复测试消息", file=sys.stderr) # 处理消息ACK确认 elif msg_type == 'ack': message_id = msg.get('message_id') if message_id: ws_manager.confirm_message(session_id, message_id) print(f"[WebSocket] 收到ACK确认: {message_id}", file=sys.stderr) # 处理拉取未确认消息请求 elif msg_type == 'pull_unconfirmed': unconfirmed = ws_manager.get_unconfirmed_messages(session_id) if unconfirmed: print(f"[WebSocket] 前端请求拉取未确认消息: {len(unconfirmed)}条", file=sys.stderr) # 逐条重发 for msg in unconfirmed: await websocket.send_json(msg) await asyncio.sleep(0.1) # 间陑00ms print(f"[WebSocket] 已重发所有未确认消息", file=sys.stderr) else: print(f"[WebSocket] 没有未确认消息", file=sys.stderr) # 处理发送验证码消息 elif msg_type == 'send_code': phone = msg.get('phone') country_code = msg.get('country_code', '+86') login_page = msg.get('login_page', 'creator') print(f"[WebSocket] 收到发送验证码请求: phone={phone}", file=sys.stderr) # 直接处理发送验证码(不使用create_task) result, service_instance = await handle_send_code_ws(session_id, phone, country_code, login_page, websocket) # 如果需要扫码,在当前协程中启动监听,并在扫码成功后自动继续发送验证码 if result.get("need_captcha") and service_instance: print(f"[WebSocket] 在主协程中启动扫码监听: {session_id}", file=sys.stderr) # 创建一个包装函数,在扫码完成后自动继续发送验证码 async def monitor_and_continue(): try: # 等待扫码完成 print(f"[WebSocket] 开始监听扫码...", file=sys.stderr) await service_instance._monitor_qrcode_scan(session_id) print(f"[WebSocket] 扫码监听已返回,说明扫码已完成", file=sys.stderr) # 扫码成功,自动重新发送验证码 print(f"[WebSocket] 扫码成功,自动重新发送验证码...", file=sys.stderr) # 从之前的result中获取参数 retry_phone = result.get('phone', phone) retry_country_code = result.get('country_code', country_code) retry_login_page = result.get('login_page', login_page) # 重新调用send_verification_code retry_result = await service_instance.send_verification_code( phone=retry_phone, country_code=retry_country_code, login_page=retry_login_page, session_id=session_id ) # 检查是否成功 if retry_result.get("success"): print(f"[WebSocket] 扫码后自动发送验证码成功", file=sys.stderr) # 推送成功消息给前端 await websocket.send_json({ "type": "code_sent", "success": True, "message": "扫码验证完成,验证码已自动发送" }) else: print(f"[WebSocket] 扫码后自动发送验证码失败: {retry_result.get('error')}", file=sys.stderr) # 推送失败消息给前端 await websocket.send_json({ "type": "code_sent", "success": False, "message": retry_result.get("error", "自动发送验证码失败") }) except Exception as e: print(f"[WebSocket] 扫码后自动继续流程异常: {str(e)}", file=sys.stderr) import traceback traceback.print_exc() try: await websocket.send_json({ "type": "code_sent", "success": False, "message": f"自动继续流程异常: {str(e)}" }) except: pass # 使用create_task在后台监听,但不阻塞当前消息循环 asyncio.create_task(monitor_and_continue()) print(f"[WebSocket] 已启动扫码监听任务", file=sys.stderr) # 处理验证码验证消息 elif msg_type == 'verify_code': phone = msg.get('phone') code = msg.get('code') country_code = msg.get('country_code', '+86') login_page = msg.get('login_page', 'creator') print(f"[WebSocket] 收到验证码验证请求: phone={phone}, code={code}", file=sys.stderr) # 启动异步任务处理验证码验证 asyncio.create_task(handle_verify_code_ws(session_id, phone, code, country_code, login_page, websocket)) # 处理刷新二维码消息 elif msg_type == 'refresh_qrcode': print(f"[WebSocket] 收到刷新二维码请求: session_id={session_id}", file=sys.stderr) # 从temp_sessions中获取service实例 if session_id in temp_sessions: browser_data = temp_sessions[session_id] service_instance = browser_data.get('service') if service_instance: print(f"[WebSocket] 开始刷新二维码...", file=sys.stderr) result = await service_instance.refresh_qrcode() if result['success']: # 刷新成功,推送新二维码 print(f"[WebSocket] 二维码刷新成功", file=sys.stderr) await websocket.send_json({ "type": "qrcode_refreshed", "success": True, "qrcode_image": result['qrcode_image'], "message": result.get('message', '二维码已刷新') }) else: # 刷新失败 print(f"[WebSocket] 二维码刷新失败: {result.get('message', '未知错误')}", file=sys.stderr) await websocket.send_json({ "type": "qrcode_refreshed", "success": False, "message": result.get('message', '刷新失败') }) else: print(f"[WebSocket] 错误: 未找到service实例", file=sys.stderr) await websocket.send_json({ "type": "qrcode_refreshed", "success": False, "message": "内部错误: 服务实例不存在" }) else: print(f"[WebSocket] 错误: session_id {session_id} 不在temp_sessions中", file=sys.stderr) await websocket.send_json({ "type": "qrcode_refreshed", "success": False, "message": "会话已过期,请重新发送验证码" }) except json.JSONDecodeError: print(f"[WebSocket] 无法解析为JSON: {data}", file=sys.stderr) except WebSocketDisconnect as e: reason = f"客户端主动断开连接 (code: {e.code if hasattr(e, 'code') else 'unknown'})" ws_manager.disconnect(session_id, reason=reason) except Exception as e: reason = f"连接异常: {type(e).__name__} - {str(e)}" ws_manager.disconnect(session_id, reason=reason) finally: # 清理Redis订阅 try: redis_task.cancel() await pubsub.unsubscribe(channel) await pubsub.aclose() await redis_client.aclose() # 使用aclose()而不是close() print(f"[WebSocket] 已取消Redis订阅: {channel}", file=sys.stderr) except: pass # 释放浏览器实例 try: # 检查是否有临时浏览器需要释放 if session_id in temp_sessions: print(f"[WebSocket] 检测到未释放的临时浏览器,开始清理: {session_id}", file=sys.stderr) try: service = temp_sessions[session_id].get('service') if service: await service.close_browser() # 使用正确的方法名 del temp_sessions[session_id] print(f"[WebSocket] 已释放临时浏览器: {session_id}", file=sys.stderr) except Exception as e: print(f"[WebSocket] 释放临时浏览器失败: {str(e)}", file=sys.stderr) except Exception as e: print(f"[WebSocket] 释放浏览器异常: {str(e)}", file=sys.stderr) if __name__ == "__main__": import uvicorn # 从配置文件读取服务器配置 config = get_config() host = config.get_str('server.host', '0.0.0.0') port = config.get_int('server.port', 8000) debug = config.get_bool('server.debug', False) reload = config.get_bool('server.reload', False) print(f"[启动服务] 主机: {host}, 端口: {port}, 调试: {debug}, 热重载: {reload}") print(f"[WebSocket] WebSocket服务地址: ws://{host}:{port}/ws/login/{{session_id}}") print(f"[WebSocket] 示例: ws://{host}:{port}/ws/login/xhs_login_xxxxx") uvicorn.run( app, host=host, port=port, reload=reload, log_level="debug" if debug else "info" )