""" 小红书定时发布调度器 管理自动发布任务的调度和执行 """ import asyncio import sys import random from datetime import datetime, time as dt_time from typing import List, Dict, Any, Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger import pymysql import json import aiohttp from xhs_login import XHSLoginService class XHSScheduler: """小红书定时发布调度器""" def __init__(self, db_config: Dict[str, Any], max_concurrent: int = 2, publish_timeout: int = 300, max_articles_per_user_per_run: int = 5, max_failures_per_user_per_run: int = 3, max_daily_articles_per_user: int = 20, max_hourly_articles_per_user: int = 3, proxy_pool_enabled: bool = False, proxy_pool_api_url: Optional[str] = None, enable_random_ua: bool = True, min_publish_interval: int = 30, max_publish_interval: int = 120, headless: bool = True, use_adspower: bool = True, proxy_username: Optional[str] = None, # 新增:代理用户名 proxy_password: Optional[str] = None): # 新增:代理密码 """ 初始化调度器 Args: db_config: 数据库配置 max_concurrent: 最大并发发布数 publish_timeout: 发布超时时间(秒) max_articles_per_user_per_run: 每轮每用户最大发文数 max_failures_per_user_per_run: 每轮每用户最大失败次数 max_daily_articles_per_user: 每用户每日最大发文数 max_hourly_articles_per_user: 每用户每小时最大发文数 enable_random_ua: 是否启用随机User-Agent min_publish_interval: 最小发布间隔(秒) max_publish_interval: 最大发布间隔(秒) headless: 是否使用无头模式,False为有头模式(方便调试) use_adspower: 是否使用AdsPower浏览器管理 proxy_username: 代理用户名(可选,白名单模式可留空) proxy_password: 代理密码(可选,白名单模式可留空) """ self.db_config = db_config self.max_concurrent = max_concurrent self.publish_timeout = publish_timeout self.max_articles_per_user_per_run = max_articles_per_user_per_run self.max_failures_per_user_per_run = max_failures_per_user_per_run self.max_daily_articles_per_user = max_daily_articles_per_user self.max_hourly_articles_per_user = max_hourly_articles_per_user self.proxy_pool_enabled = proxy_pool_enabled self.proxy_pool_api_url = proxy_pool_api_url or "" self.proxy_username = proxy_username or "" # 保存代理用户名 self.proxy_password = proxy_password or "" # 保存代理密码 self.enable_random_ua = enable_random_ua self.min_publish_interval = min_publish_interval self.max_publish_interval = max_publish_interval self.headless = headless self.use_adspower = use_adspower self.scheduler = AsyncIOScheduler() # 使用AdsPower时禁用浏览器池,避免资源冲突 self.login_service = XHSLoginService( use_pool=False, # 使用AdsPower不需要浏览器池 headless=headless, use_adspower=use_adspower ) self.semaphore = asyncio.Semaphore(max_concurrent) mode_text = "AdsPower" if use_adspower else "浏览器池" if not use_adspower else "传统" print(f"[调度器] 已创建,最大并发: {max_concurrent},浏览器模式: {mode_text}", file=sys.stderr) def start(self, cron_expr: str = "*/5 * * * * *"): """ 启动定时任务 Args: cron_expr: Cron表达式,默认每5秒执行一次 格式: 秒 分 时 日 月 周 """ # 解析cron表达式 parts = cron_expr.split() if len(parts) == 6: # 6位格式: 秒 分 时 日 月 周 trigger = CronTrigger( second=parts[0], minute=parts[1], hour=parts[2], day=parts[3], month=parts[4], day_of_week=parts[5] ) else: print(f"[调度器] ⚠️ Cron表达式格式错误: {cron_expr},使用默认配置", file=sys.stderr) trigger = CronTrigger(second="*/5") self.scheduler.add_job( self.auto_publish_articles, trigger=trigger, id='xhs_publish', name='小红书自动发布', max_instances=1, # 最多只允许1个实例同时运行,防止重复执行 replace_existing=True # 如果任务已存在则替换,避免重启时重复添加 ) self.scheduler.start() print(f"[调度器] 定时发布任务已启动,Cron表达式: {cron_expr}", file=sys.stderr) def stop(self): """停止定时任务""" self.scheduler.shutdown() print("[调度器] 定时发布任务已停止", file=sys.stderr) def get_db_connection(self): """获取数据库连接""" return pymysql.connect( host=self.db_config['host'], port=self.db_config['port'], user=self.db_config['user'], password=self.db_config['password'], database=self.db_config['database'], charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) async def _fetch_proxy_from_pool(self) -> Optional[dict]: """从代理池接口获取一个代理地址,并附加认证信息 Returns: dict: 代理配置字典 {'server': 'http://ip:port', 'username': '...', 'password': '...'} 或 None 如果未启用或获取失败 """ if not self.proxy_pool_enabled or not self.proxy_pool_api_url: return None try: timeout = aiohttp.ClientTimeout(total=10) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(self.proxy_pool_api_url) as resp: if resp.status != 200: print(f"[调度器] 代理池接口返回非200状态码: {resp.status}", file=sys.stderr) return None text = (await resp.text()).strip() if not text: print("[调度器] 代理池返回内容为空", file=sys.stderr) return None line = text.splitlines()[0].strip() if not line: print("[调度器] 代理池首行内容为空", file=sys.stderr) return None # 构建代理URL proxy_server = line if line.startswith(("http://", "https://")) else "http://" + line # 构建完整的代理配置字典 proxy_config = { 'server': proxy_server } # 如果配置了认证信息,添加到配置中 if self.proxy_username and self.proxy_password: proxy_config['username'] = self.proxy_username proxy_config['password'] = self.proxy_password print(f"[调度器] 获取代理成功: {proxy_server} (认证代理, 用户名: {self.proxy_username})", file=sys.stderr) else: print(f"[调度器] 获取代理成功: {proxy_server} (白名单模式)", file=sys.stderr) return proxy_config except Exception as e: print(f"[调度器] 请求代理池接口失败: {str(e)}", file=sys.stderr) return None def _generate_random_user_agent(self) -> str: """生成随机User-Agent,防止浏览器指纹识别""" chrome_versions = ['120.0.0.0', '119.0.0.0', '118.0.0.0', '117.0.0.0', '116.0.0.0'] windows_versions = ['Windows NT 10.0; Win64; x64', 'Windows NT 11.0; Win64; x64'] chrome_ver = random.choice(chrome_versions) win_ver = random.choice(windows_versions) return f'Mozilla/5.0 ({win_ver}) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{chrome_ver} Safari/537.36' async def auto_publish_articles(self): """自动发布文案(定时任务主函数)""" print("========== 开始执行定时发布任务 ==========", file=sys.stderr) start_time = datetime.now() try: conn = self.get_db_connection() cursor = conn.cursor() # 1. 查询所有待发布的文案 cursor.execute(""" SELECT * FROM ai_articles WHERE status = 'published_review' ORDER BY id ASC """) articles = cursor.fetchall() if not articles: print("没有待发布的文案", file=sys.stderr) cursor.close() conn.close() return original_total = len(articles) # 2. 限制每用户每轮发文数 articles = self._limit_articles_per_user(articles, self.max_articles_per_user_per_run) print(f"找到 {original_total} 篇待发布文案,按照每个用户每轮最多 {self.max_articles_per_user_per_run} 篇,本次计划发布 {len(articles)} 篇", file=sys.stderr) # 3. 应用每日/每小时上限过滤 if self.max_daily_articles_per_user > 0 or self.max_hourly_articles_per_user > 0: before_count = len(articles) articles = await self._filter_by_daily_and_hourly_limit( cursor, articles, self.max_daily_articles_per_user, self.max_hourly_articles_per_user ) print(f"应用每日/每小时上限过滤:过滤前 {before_count} 篇,过滤后 {len(articles)} 篇", file=sys.stderr) if not articles: print("所有文案均因频率限制被过滤,本轮无任务", file=sys.stderr) cursor.close() conn.close() return # 4. 并发发布 tasks = [] user_fail_count = {} paused_users = set() for article in articles: user_id = article['publish_user_id'] or article['created_user_id'] if user_id in paused_users: print(f"用户 {user_id} 在本轮已暂停,跳过文案 ID: {article['id']}", file=sys.stderr) continue # 直接发布,不在这里延迟 task = asyncio.create_task( self._publish_article_with_semaphore( article, user_id, cursor, user_fail_count, paused_users ) ) tasks.append(task) # 等待所有发布完成 results = await asyncio.gather(*tasks, return_exceptions=True) # 统计结果 success_count = sum(1 for r in results if r is True) fail_count = len(results) - success_count cursor.close() conn.close() duration = (datetime.now() - start_time).total_seconds() print("========== 定时发布任务完成 ==========", file=sys.stderr) print(f"总计: {len(articles)} 篇, 成功: {success_count} 篇, 失败: {fail_count} 篇, 耗时: {duration:.1f}秒", file=sys.stderr) except Exception as e: print(f"[调度器] 定时任务异常: {str(e)}", file=sys.stderr) import traceback traceback.print_exc() async def _publish_article_with_semaphore(self, article: Dict, user_id: int, cursor, user_fail_count: Dict, paused_users: set): """带信号量控制的发布文章""" async with self.semaphore: try: print(f"[调度器] 开始发布文案 {article['id']}: {article['title']}", file=sys.stderr) success = await self._publish_single_article(article, cursor) if not success: user_fail_count[user_id] = user_fail_count.get(user_id, 0) + 1 if user_fail_count[user_id] >= self.max_failures_per_user_per_run: paused_users.add(user_id) print(f"用户 {user_id} 在本轮失败次数达到 {user_fail_count[user_id]} 次,暂停本轮后续发布", file=sys.stderr) print(f"发布失败 [文案ID: {article['id']}, 标题: {article['title']}]", file=sys.stderr) return False else: print(f"发布成功 [文案ID: {article['id']}, 标题: {article['title']}]", file=sys.stderr) return True except Exception as e: print(f"发布异常 [文案ID: {article['id']}]: {str(e)}", file=sys.stderr) return False async def _publish_single_article(self, article: Dict, cursor) -> bool: """发布单篇文章""" try: # 1. 获取用户信息 user_id = article['publish_user_id'] or article['created_user_id'] cursor.execute("SELECT * FROM ai_users WHERE id = %s", (user_id,)) user = cursor.fetchone() if not user: self._update_article_status(cursor, article['id'], 'failed', '获取用户信息失败') return False # 2. 检查用户是否绑定小红书 if user['is_bound_xhs'] != 1: self._update_article_status(cursor, article['id'], 'failed', '用户未绑定小红书账号') return False # 3. 获取author记录和Cookie cursor.execute(""" SELECT * FROM ai_authors WHERE phone = %s AND enterprise_id = %s AND channel = 1 AND status = 'active' LIMIT 1 """, (user['phone'], user['enterprise_id'])) author = cursor.fetchone() if not author or not author['xhs_cookie']: self._update_article_status(cursor, article['id'], 'failed', '小红书Cookie已失效') return False # 4. 获取文章图片 cursor.execute(""" SELECT image_url FROM ai_article_images WHERE article_id = %s ORDER BY sort_order ASC """, (article['id'],)) images = [img['image_url'] for img in cursor.fetchall() if img['image_url']] # 5. 获取标签 cursor.execute("SELECT coze_tag FROM ai_article_tags WHERE article_id = %s LIMIT 1", (article['id'],)) tag_row = cursor.fetchone() topics = [] if tag_row and tag_row['coze_tag']: topics = self._parse_tags(tag_row['coze_tag']) # 6. 解析Cookie并格式化 try: # 数据库中存储的是完整的login_state JSON login_state = json.loads(author['xhs_cookie']) # 处理双重JSON编码的情况 if isinstance(login_state, str): login_state = json.loads(login_state) # 提取cookies字段(兼容旧格式:如果login_state本身就是cookies列表) if isinstance(login_state, dict) and 'cookies' in login_state: # 新格式:login_state对象包含cookies字段 cookies = login_state['cookies'] print(f" 从login_state提取cookies: {len(cookies) if isinstance(cookies, list) else 'unknown'} 个", file=sys.stderr) elif isinstance(login_state, (list, dict)): # 旧格式:直接是cookies cookies = login_state print(f" 使用旧格式cookies(无login_state包装)", file=sys.stderr) else: raise ValueError(f"无法识别的Cookie存储格式: {type(login_state).__name__}") # 验证cookies格式 if not isinstance(cookies, (list, dict)): raise ValueError(f"Cookie必须是列表或字典格式,当前类型: {type(cookies).__name__}") # 格式化Cookie,确保包含domain字段 cookies = self._format_cookies(cookies) except Exception as e: self._update_article_status(cursor, article['id'], 'failed', f'Cookie格式错误: {str(e)}') return False # 7. 从代理池获取代理(如果启用) proxy = await self._fetch_proxy_from_pool() if proxy: print(f"[调度器] 使用代理: {proxy}", file=sys.stderr) # 8. 生成随机User-Agent(防指纹识别) user_agent = self._generate_random_user_agent() if self.enable_random_ua else None if user_agent: print(f"[调度器] 使用随机UA: {user_agent[:50]}...", file=sys.stderr) # 9. 调用发布服务(增加超时控制) try: print(f"[调度器] 开始调用发布服务,超时设置: {self.publish_timeout}秒", file=sys.stderr) result = await asyncio.wait_for( self.login_service.publish_note( title=article['title'], content=article['content'], images=images, topics=topics, cookies=cookies, proxy=proxy, user_agent=user_agent, ), timeout=self.publish_timeout ) except asyncio.TimeoutError: error_msg = f'发布超时({self.publish_timeout}秒)' print(f"[调度器] {error_msg}", file=sys.stderr) self._update_article_status(cursor, article['id'], 'failed', error_msg) return False except Exception as e: error_msg = f'调用发布服务异常: {str(e)}' print(f"[调度器] {error_msg}", file=sys.stderr) import traceback traceback.print_exc() self._update_article_status(cursor, article['id'], 'failed', error_msg) return False # 10. 更新状态 if result['success']: self._update_article_status(cursor, article['id'], 'published', '发布成功') return True else: error_msg = result.get('error', '未知错误') self._update_article_status(cursor, article['id'], 'failed', error_msg) return False except Exception as e: self._update_article_status(cursor, article['id'], 'failed', f'发布异常: {str(e)}') return False def _update_article_status(self, cursor, article_id: int, status: str, message: str = ''): """更新文章状态""" try: if status == 'published': cursor.execute(""" UPDATE ai_articles SET status = %s, publish_time = NOW(), updated_at = NOW() WHERE id = %s """, (status, article_id)) else: cursor.execute(""" UPDATE ai_articles SET status = %s, review_comment = %s, updated_at = NOW() WHERE id = %s """, (status, message, article_id)) cursor.connection.commit() except Exception as e: print(f"更新文章 {article_id} 状态失败: {str(e)}", file=sys.stderr) def _limit_articles_per_user(self, articles: List[Dict], per_user_limit: int) -> List[Dict]: """限制每用户发文数""" if per_user_limit <= 0: return articles grouped = {} for art in articles: user_id = art['publish_user_id'] or art['created_user_id'] if user_id not in grouped: grouped[user_id] = [] grouped[user_id].append(art) limited = [] for user_id, user_articles in grouped.items(): limited.extend(user_articles[:per_user_limit]) return limited async def _filter_by_daily_and_hourly_limit(self, cursor, articles: List[Dict], max_daily: int, max_hourly: int) -> List[Dict]: """按每日和每小时上限过滤文章""" if max_daily <= 0 and max_hourly <= 0: return articles # 提取所有用户ID user_ids = set() for art in articles: user_id = art['publish_user_id'] or art['created_user_id'] user_ids.add(user_id) # 查询每用户已发布数量 user_daily_published = {} user_hourly_published = {} now = datetime.now() today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) current_hour_start = now.replace(minute=0, second=0, microsecond=0) for user_id in user_ids: # 查询当日已发布数量 if max_daily > 0: cursor.execute(""" SELECT COUNT(*) as count FROM ai_articles WHERE status = 'published' AND publish_time >= %s AND (publish_user_id = %s OR (publish_user_id IS NULL AND created_user_id = %s)) """, (today_start, user_id, user_id)) user_daily_published[user_id] = cursor.fetchone()['count'] # 查询当前小时已发布数量 if max_hourly > 0: cursor.execute(""" SELECT COUNT(*) as count FROM ai_articles WHERE status = 'published' AND publish_time >= %s AND (publish_user_id = %s OR (publish_user_id IS NULL AND created_user_id = %s)) """, (current_hour_start, user_id, user_id)) user_hourly_published[user_id] = cursor.fetchone()['count'] # 过滤超限文章 filtered = [] for art in articles: user_id = art['publish_user_id'] or art['created_user_id'] # 检查每日上限 if max_daily > 0 and user_daily_published.get(user_id, 0) >= max_daily: continue # 检查每小时上限 if max_hourly > 0 and user_hourly_published.get(user_id, 0) >= max_hourly: continue filtered.append(art) return filtered def _parse_tags(self, tag_str: str) -> List[str]: """解析标签字符串""" if not tag_str: return [] # 替换分隔符 tag_str = tag_str.replace(';', ',').replace(' ', ',').replace('、', ',') # 分割并清理 tags = [] for tag in tag_str.split(','): tag = tag.strip() if tag: tags.append(tag) return tags def _format_cookies(self, cookies) -> List[Dict]: """ 格式化Cookie,只处理非标准格式的Cookie 对于Playwright原生格式的Cookie,直接返回,不做任何修改 Args: cookies: Cookie数据,支持list[dict]或dict格式 Returns: 格式化后的Cookie列表 """ # 如果是字典格式(键值对),转换为列表格式 if isinstance(cookies, dict): cookies = [ { "name": name, "value": str(value) if not isinstance(value, str) else value, "domain": ".xiaohongshu.com", "path": "/" } for name, value in cookies.items() ] # 验证是否为列表 if not isinstance(cookies, list): raise ValueError(f"Cookie必须是列表或字典格式,当前类型: {type(cookies).__name__}") # 检查是否为空列表 if not cookies or len(cookies) == 0: print(f" Cookie列表为空,直接返回", file=sys.stderr) return cookies # 检查是否是Playwright原生格式(包含name和value字段) if isinstance(cookies[0], dict) and 'name' in cookies[0] and 'value' in cookies[0]: # 已经是Playwright格式,直接返回,不做任何修改 print(f" 检测到Playwright原生格式,直接使用 ({len(cookies)} 个cookie)", file=sys.stderr) return cookies # 其他格式,进行基础验证 formatted_cookies = [] for cookie in cookies: if not isinstance(cookie, dict): raise ValueError(f"Cookie元素必须是字典格式,当前类型: {type(cookie).__name__}") # 确保有基本字段 if 'domain' not in cookie and 'url' not in cookie: cookie = cookie.copy() cookie['domain'] = '.xiaohongshu.com' if 'path' not in cookie and 'url' not in cookie: if 'domain' in cookie or 'url' not in cookie: cookie = cookie.copy() if cookie is cookies[cookies.index(cookie)] else cookie cookie['path'] = '/' formatted_cookies.append(cookie) return formatted_cookies