""" 小红书登录CLI工具 供Go服务调用的命令行脚本 """ import sys import json import asyncio import io import os from xhs_login import XHSLoginService # 设置标准输出为UTF-8编码 if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') # 将xhs_login.py中的print输出重定向到stderr original_print = print def silent_print(*args, **kwargs): """ 将print输出重定向到stderr """ kwargs['file'] = sys.stderr original_print(*args, **kwargs) # 替换全局print函数 import builtins builtins.print = silent_print async def send_code(phone: str, country_code: str = "+86"): """发送验证码""" service = XHSLoginService() try: result = await service.send_verification_code(phone, country_code) return result finally: await service.close_browser() async def login(phone: str, code: str, country_code: str = "+86"): """登录""" service = XHSLoginService() try: # 先发送验证码(页面已打开) await service.send_verification_code(phone, country_code) # 等待一下确保验证码发送完成 await asyncio.sleep(1) # 执行登录 result = await service.login(phone, code, country_code) return result finally: await service.close_browser() async def inject_cookies(cookies_json: str): """注入Cookie并验证登录状态""" service = XHSLoginService() try: cookies = json.loads(cookies_json) await service.init_browser(cookies=cookies) result = await service.verify_login_status() return result finally: await service.close_browser() def main(): """主函数""" if len(sys.argv) < 2: print(json.dumps({ "success": False, "error": "缺少命令参数" })) sys.exit(1) command = sys.argv[1] try: if command == "send_code": # python xhs_cli.py send_code [country_code] if len(sys.argv) < 3: print(json.dumps({ "success": False, "error": "缺少手机号参数" })) sys.exit(1) phone = sys.argv[2] country_code = sys.argv[3] if len(sys.argv) > 3 else "+86" result = asyncio.run(send_code(phone, country_code)) sys.stdout.write(json.dumps(result, ensure_ascii=False)) elif command == "login": # python xhs_cli.py login [country_code] if len(sys.argv) < 4: print(json.dumps({ "success": False, "error": "缺少手机号或验证码参数" })) sys.exit(1) phone = sys.argv[2] code = sys.argv[3] country_code = sys.argv[4] if len(sys.argv) > 4 else "+86" result = asyncio.run(login(phone, code, country_code)) sys.stdout.write(json.dumps(result, ensure_ascii=False)) elif command == "inject_cookies": # python xhs_cli.py inject_cookies if len(sys.argv) < 3: print(json.dumps({ "success": False, "error": "缺少Cookie参数" })) sys.exit(1) cookies_json = sys.argv[2] result = asyncio.run(inject_cookies(cookies_json)) sys.stdout.write(json.dumps(result, ensure_ascii=False)) else: print(json.dumps({ "success": False, "error": f"未知命令: {command}" })) sys.exit(1) except Exception as e: print(json.dumps({ "success": False, "error": str(e) }, ensure_ascii=False)) sys.exit(1) if __name__ == "__main__": main()