Files
ai_wht_wechat/backend/scheduler.py

599 lines
26 KiB
Python
Raw Permalink Normal View History

2026-01-06 19:36:42 +08:00
"""
小红书定时发布调度器
管理自动发布任务的调度和执行
"""
import asyncio
import sys
import random
from datetime import datetime, time as dt_time
from typing import List, Dict, Any, Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import pymysql
import json
import aiohttp
from xhs_login import XHSLoginService
class XHSScheduler:
"""小红书定时发布调度器"""
def __init__(self,
db_config: Dict[str, Any],
max_concurrent: int = 2,
publish_timeout: int = 300,
max_articles_per_user_per_run: int = 5,
max_failures_per_user_per_run: int = 3,
max_daily_articles_per_user: int = 20,
max_hourly_articles_per_user: int = 3,
proxy_pool_enabled: bool = False,
proxy_pool_api_url: Optional[str] = None,
enable_random_ua: bool = True,
min_publish_interval: int = 30,
max_publish_interval: int = 120,
2026-01-23 16:27:47 +08:00
headless: bool = True,
use_adspower: bool = True,
proxy_username: Optional[str] = None, # 新增:代理用户名
proxy_password: Optional[str] = None): # 新增:代理密码
2026-01-06 19:36:42 +08:00
"""
初始化调度器
Args:
db_config: 数据库配置
max_concurrent: 最大并发发布数
publish_timeout: 发布超时时间()
max_articles_per_user_per_run: 每轮每用户最大发文数
max_failures_per_user_per_run: 每轮每用户最大失败次数
max_daily_articles_per_user: 每用户每日最大发文数
max_hourly_articles_per_user: 每用户每小时最大发文数
enable_random_ua: 是否启用随机User-Agent
min_publish_interval: 最小发布间隔()
max_publish_interval: 最大发布间隔()
headless: 是否使用无头模式False为有头模式方便调试
2026-01-23 16:27:47 +08:00
use_adspower: 是否使用AdsPower浏览器管理
proxy_username: 代理用户名可选白名单模式可留空
proxy_password: 代理密码可选白名单模式可留空
2026-01-06 19:36:42 +08:00
"""
self.db_config = db_config
self.max_concurrent = max_concurrent
self.publish_timeout = publish_timeout
self.max_articles_per_user_per_run = max_articles_per_user_per_run
self.max_failures_per_user_per_run = max_failures_per_user_per_run
self.max_daily_articles_per_user = max_daily_articles_per_user
self.max_hourly_articles_per_user = max_hourly_articles_per_user
self.proxy_pool_enabled = proxy_pool_enabled
self.proxy_pool_api_url = proxy_pool_api_url or ""
2026-01-23 16:27:47 +08:00
self.proxy_username = proxy_username or "" # 保存代理用户名
self.proxy_password = proxy_password or "" # 保存代理密码
2026-01-06 19:36:42 +08:00
self.enable_random_ua = enable_random_ua
self.min_publish_interval = min_publish_interval
self.max_publish_interval = max_publish_interval
self.headless = headless
2026-01-23 16:27:47 +08:00
self.use_adspower = use_adspower
2026-01-06 19:36:42 +08:00
self.scheduler = AsyncIOScheduler()
2026-01-23 16:27:47 +08:00
# 使用AdsPower时禁用浏览器池避免资源冲突
self.login_service = XHSLoginService(
use_pool=False, # 使用AdsPower不需要浏览器池
headless=headless,
use_adspower=use_adspower
)
2026-01-06 19:36:42 +08:00
self.semaphore = asyncio.Semaphore(max_concurrent)
2026-01-23 16:27:47 +08:00
mode_text = "AdsPower" if use_adspower else "浏览器池" if not use_adspower else "传统"
print(f"[调度器] 已创建,最大并发: {max_concurrent},浏览器模式: {mode_text}", file=sys.stderr)
2026-01-06 19:36:42 +08:00
def start(self, cron_expr: str = "*/5 * * * * *"):
"""
启动定时任务
Args:
cron_expr: Cron表达式默认每5秒执行一次
格式:
"""
# 解析cron表达式
parts = cron_expr.split()
if len(parts) == 6:
# 6位格式: 秒 分 时 日 月 周
trigger = CronTrigger(
second=parts[0],
minute=parts[1],
hour=parts[2],
day=parts[3],
month=parts[4],
day_of_week=parts[5]
)
else:
print(f"[调度器] ⚠️ Cron表达式格式错误: {cron_expr},使用默认配置", file=sys.stderr)
trigger = CronTrigger(second="*/5")
self.scheduler.add_job(
self.auto_publish_articles,
trigger=trigger,
id='xhs_publish',
name='小红书自动发布',
max_instances=1, # 最多只允许1个实例同时运行防止重复执行
replace_existing=True # 如果任务已存在则替换,避免重启时重复添加
)
self.scheduler.start()
print(f"[调度器] 定时发布任务已启动Cron表达式: {cron_expr}", file=sys.stderr)
def stop(self):
"""停止定时任务"""
self.scheduler.shutdown()
print("[调度器] 定时发布任务已停止", file=sys.stderr)
def get_db_connection(self):
"""获取数据库连接"""
return pymysql.connect(
host=self.db_config['host'],
port=self.db_config['port'],
user=self.db_config['user'],
password=self.db_config['password'],
database=self.db_config['database'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
2026-01-23 16:27:47 +08:00
async def _fetch_proxy_from_pool(self) -> Optional[dict]:
"""从代理池接口获取一个代理地址,并附加认证信息
Returns:
dict: 代理配置字典 {'server': 'http://ip:port', 'username': '...', 'password': '...'}
None 如果未启用或获取失败
"""
2026-01-06 19:36:42 +08:00
if not self.proxy_pool_enabled or not self.proxy_pool_api_url:
return None
try:
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(self.proxy_pool_api_url) as resp:
if resp.status != 200:
print(f"[调度器] 代理池接口返回非200状态码: {resp.status}", file=sys.stderr)
return None
text = (await resp.text()).strip()
if not text:
print("[调度器] 代理池返回内容为空", file=sys.stderr)
return None
line = text.splitlines()[0].strip()
if not line:
print("[调度器] 代理池首行内容为空", file=sys.stderr)
return None
2026-01-23 16:27:47 +08:00
# 构建代理URL
proxy_server = line if line.startswith(("http://", "https://")) else "http://" + line
# 构建完整的代理配置字典
proxy_config = {
'server': proxy_server
}
# 如果配置了认证信息,添加到配置中
if self.proxy_username and self.proxy_password:
proxy_config['username'] = self.proxy_username
proxy_config['password'] = self.proxy_password
print(f"[调度器] 获取代理成功: {proxy_server} (认证代理, 用户名: {self.proxy_username})", file=sys.stderr)
else:
print(f"[调度器] 获取代理成功: {proxy_server} (白名单模式)", file=sys.stderr)
return proxy_config
2026-01-06 19:36:42 +08:00
except Exception as e:
print(f"[调度器] 请求代理池接口失败: {str(e)}", file=sys.stderr)
return None
def _generate_random_user_agent(self) -> str:
"""生成随机User-Agent防止浏览器指纹识别"""
chrome_versions = ['120.0.0.0', '119.0.0.0', '118.0.0.0', '117.0.0.0', '116.0.0.0']
windows_versions = ['Windows NT 10.0; Win64; x64', 'Windows NT 11.0; Win64; x64']
chrome_ver = random.choice(chrome_versions)
win_ver = random.choice(windows_versions)
return f'Mozilla/5.0 ({win_ver}) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{chrome_ver} Safari/537.36'
async def auto_publish_articles(self):
"""自动发布文案(定时任务主函数)"""
print("========== 开始执行定时发布任务 ==========", file=sys.stderr)
start_time = datetime.now()
try:
conn = self.get_db_connection()
cursor = conn.cursor()
# 1. 查询所有待发布的文案
cursor.execute("""
SELECT * FROM ai_articles
WHERE status = 'published_review'
ORDER BY id ASC
""")
articles = cursor.fetchall()
if not articles:
print("没有待发布的文案", file=sys.stderr)
cursor.close()
conn.close()
return
original_total = len(articles)
# 2. 限制每用户每轮发文数
articles = self._limit_articles_per_user(articles, self.max_articles_per_user_per_run)
print(f"找到 {original_total} 篇待发布文案,按照每个用户每轮最多 {self.max_articles_per_user_per_run} 篇,本次计划发布 {len(articles)}", file=sys.stderr)
# 3. 应用每日/每小时上限过滤
if self.max_daily_articles_per_user > 0 or self.max_hourly_articles_per_user > 0:
before_count = len(articles)
articles = await self._filter_by_daily_and_hourly_limit(
cursor, articles,
self.max_daily_articles_per_user,
self.max_hourly_articles_per_user
)
print(f"应用每日/每小时上限过滤:过滤前 {before_count} 篇,过滤后 {len(articles)}", file=sys.stderr)
if not articles:
print("所有文案均因频率限制被过滤,本轮无任务", file=sys.stderr)
cursor.close()
conn.close()
return
# 4. 并发发布
tasks = []
user_fail_count = {}
paused_users = set()
for article in articles:
user_id = article['publish_user_id'] or article['created_user_id']
if user_id in paused_users:
print(f"用户 {user_id} 在本轮已暂停,跳过文案 ID: {article['id']}", file=sys.stderr)
continue
# 直接发布,不在这里延迟
task = asyncio.create_task(
self._publish_article_with_semaphore(
article, user_id, cursor, user_fail_count, paused_users
)
)
tasks.append(task)
# 等待所有发布完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 统计结果
success_count = sum(1 for r in results if r is True)
fail_count = len(results) - success_count
cursor.close()
conn.close()
duration = (datetime.now() - start_time).total_seconds()
print("========== 定时发布任务完成 ==========", file=sys.stderr)
print(f"总计: {len(articles)} 篇, 成功: {success_count} 篇, 失败: {fail_count} 篇, 耗时: {duration:.1f}", file=sys.stderr)
except Exception as e:
print(f"[调度器] 定时任务异常: {str(e)}", file=sys.stderr)
import traceback
traceback.print_exc()
async def _publish_article_with_semaphore(self, article: Dict, user_id: int,
cursor, user_fail_count: Dict, paused_users: set):
"""带信号量控制的发布文章"""
async with self.semaphore:
try:
print(f"[调度器] 开始发布文案 {article['id']}: {article['title']}", file=sys.stderr)
success = await self._publish_single_article(article, cursor)
if not success:
user_fail_count[user_id] = user_fail_count.get(user_id, 0) + 1
if user_fail_count[user_id] >= self.max_failures_per_user_per_run:
paused_users.add(user_id)
print(f"用户 {user_id} 在本轮失败次数达到 {user_fail_count[user_id]} 次,暂停本轮后续发布", file=sys.stderr)
print(f"发布失败 [文案ID: {article['id']}, 标题: {article['title']}]", file=sys.stderr)
return False
else:
print(f"发布成功 [文案ID: {article['id']}, 标题: {article['title']}]", file=sys.stderr)
return True
except Exception as e:
print(f"发布异常 [文案ID: {article['id']}]: {str(e)}", file=sys.stderr)
return False
async def _publish_single_article(self, article: Dict, cursor) -> bool:
"""发布单篇文章"""
try:
# 1. 获取用户信息
user_id = article['publish_user_id'] or article['created_user_id']
cursor.execute("SELECT * FROM ai_users WHERE id = %s", (user_id,))
user = cursor.fetchone()
if not user:
self._update_article_status(cursor, article['id'], 'failed', '获取用户信息失败')
return False
# 2. 检查用户是否绑定小红书
if user['is_bound_xhs'] != 1:
self._update_article_status(cursor, article['id'], 'failed', '用户未绑定小红书账号')
return False
# 3. 获取author记录和Cookie
cursor.execute("""
SELECT * FROM ai_authors
WHERE phone = %s AND enterprise_id = %s AND channel = 1 AND status = 'active'
LIMIT 1
""", (user['phone'], user['enterprise_id']))
author = cursor.fetchone()
if not author or not author['xhs_cookie']:
self._update_article_status(cursor, article['id'], 'failed', '小红书Cookie已失效')
return False
# 4. 获取文章图片
cursor.execute("""
SELECT image_url FROM ai_article_images
WHERE article_id = %s
ORDER BY sort_order ASC
""", (article['id'],))
images = [img['image_url'] for img in cursor.fetchall() if img['image_url']]
# 5. 获取标签
cursor.execute("SELECT coze_tag FROM ai_article_tags WHERE article_id = %s LIMIT 1", (article['id'],))
tag_row = cursor.fetchone()
topics = []
if tag_row and tag_row['coze_tag']:
topics = self._parse_tags(tag_row['coze_tag'])
# 6. 解析Cookie并格式化
try:
# 数据库中存储的是完整的login_state JSON
login_state = json.loads(author['xhs_cookie'])
# 处理双重JSON编码的情况
if isinstance(login_state, str):
login_state = json.loads(login_state)
# 提取cookies字段兼容旧格式如果login_state本身就是cookies列表
if isinstance(login_state, dict) and 'cookies' in login_state:
# 新格式login_state对象包含cookies字段
cookies = login_state['cookies']
print(f" 从login_state提取cookies: {len(cookies) if isinstance(cookies, list) else 'unknown'}", file=sys.stderr)
elif isinstance(login_state, (list, dict)):
# 旧格式直接是cookies
cookies = login_state
print(f" 使用旧格式cookies无login_state包装", file=sys.stderr)
else:
raise ValueError(f"无法识别的Cookie存储格式: {type(login_state).__name__}")
# 验证cookies格式
if not isinstance(cookies, (list, dict)):
raise ValueError(f"Cookie必须是列表或字典格式当前类型: {type(cookies).__name__}")
# 格式化Cookie确保包含domain字段
cookies = self._format_cookies(cookies)
except Exception as e:
self._update_article_status(cursor, article['id'], 'failed', f'Cookie格式错误: {str(e)}')
return False
# 7. 从代理池获取代理(如果启用)
proxy = await self._fetch_proxy_from_pool()
if proxy:
print(f"[调度器] 使用代理: {proxy}", file=sys.stderr)
# 8. 生成随机User-Agent防指纹识别
user_agent = self._generate_random_user_agent() if self.enable_random_ua else None
if user_agent:
print(f"[调度器] 使用随机UA: {user_agent[:50]}...", file=sys.stderr)
# 9. 调用发布服务(增加超时控制)
try:
print(f"[调度器] 开始调用发布服务,超时设置: {self.publish_timeout}", file=sys.stderr)
result = await asyncio.wait_for(
self.login_service.publish_note(
title=article['title'],
content=article['content'],
images=images,
topics=topics,
cookies=cookies,
proxy=proxy,
user_agent=user_agent,
),
timeout=self.publish_timeout
)
except asyncio.TimeoutError:
error_msg = f'发布超时({self.publish_timeout}秒)'
print(f"[调度器] {error_msg}", file=sys.stderr)
self._update_article_status(cursor, article['id'], 'failed', error_msg)
return False
except Exception as e:
error_msg = f'调用发布服务异常: {str(e)}'
print(f"[调度器] {error_msg}", file=sys.stderr)
import traceback
traceback.print_exc()
self._update_article_status(cursor, article['id'], 'failed', error_msg)
return False
# 10. 更新状态
if result['success']:
self._update_article_status(cursor, article['id'], 'published', '发布成功')
return True
else:
error_msg = result.get('error', '未知错误')
self._update_article_status(cursor, article['id'], 'failed', error_msg)
return False
except Exception as e:
self._update_article_status(cursor, article['id'], 'failed', f'发布异常: {str(e)}')
return False
def _update_article_status(self, cursor, article_id: int, status: str, message: str = ''):
"""更新文章状态"""
try:
if status == 'published':
cursor.execute("""
UPDATE ai_articles
SET status = %s, publish_time = NOW(), updated_at = NOW()
WHERE id = %s
""", (status, article_id))
else:
cursor.execute("""
UPDATE ai_articles
SET status = %s, review_comment = %s, updated_at = NOW()
WHERE id = %s
""", (status, message, article_id))
cursor.connection.commit()
except Exception as e:
print(f"更新文章 {article_id} 状态失败: {str(e)}", file=sys.stderr)
def _limit_articles_per_user(self, articles: List[Dict], per_user_limit: int) -> List[Dict]:
"""限制每用户发文数"""
if per_user_limit <= 0:
return articles
grouped = {}
for art in articles:
user_id = art['publish_user_id'] or art['created_user_id']
if user_id not in grouped:
grouped[user_id] = []
grouped[user_id].append(art)
limited = []
for user_id, user_articles in grouped.items():
limited.extend(user_articles[:per_user_limit])
return limited
async def _filter_by_daily_and_hourly_limit(self, cursor, articles: List[Dict],
max_daily: int, max_hourly: int) -> List[Dict]:
"""按每日和每小时上限过滤文章"""
if max_daily <= 0 and max_hourly <= 0:
return articles
# 提取所有用户ID
user_ids = set()
for art in articles:
user_id = art['publish_user_id'] or art['created_user_id']
user_ids.add(user_id)
# 查询每用户已发布数量
user_daily_published = {}
user_hourly_published = {}
now = datetime.now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
current_hour_start = now.replace(minute=0, second=0, microsecond=0)
for user_id in user_ids:
# 查询当日已发布数量
if max_daily > 0:
cursor.execute("""
SELECT COUNT(*) as count FROM ai_articles
WHERE status = 'published' AND publish_time >= %s
AND (publish_user_id = %s OR (publish_user_id IS NULL AND created_user_id = %s))
""", (today_start, user_id, user_id))
user_daily_published[user_id] = cursor.fetchone()['count']
# 查询当前小时已发布数量
if max_hourly > 0:
cursor.execute("""
SELECT COUNT(*) as count FROM ai_articles
WHERE status = 'published' AND publish_time >= %s
AND (publish_user_id = %s OR (publish_user_id IS NULL AND created_user_id = %s))
""", (current_hour_start, user_id, user_id))
user_hourly_published[user_id] = cursor.fetchone()['count']
# 过滤超限文章
filtered = []
for art in articles:
user_id = art['publish_user_id'] or art['created_user_id']
# 检查每日上限
if max_daily > 0 and user_daily_published.get(user_id, 0) >= max_daily:
continue
# 检查每小时上限
if max_hourly > 0 and user_hourly_published.get(user_id, 0) >= max_hourly:
continue
filtered.append(art)
return filtered
def _parse_tags(self, tag_str: str) -> List[str]:
"""解析标签字符串"""
if not tag_str:
return []
# 替换分隔符
tag_str = tag_str.replace(';', ',').replace(' ', ',').replace('', ',')
# 分割并清理
tags = []
for tag in tag_str.split(','):
tag = tag.strip()
if tag:
tags.append(tag)
return tags
def _format_cookies(self, cookies) -> List[Dict]:
"""
格式化Cookie只处理非标准格式的Cookie
对于Playwright原生格式的Cookie直接返回不做任何修改
Args:
cookies: Cookie数据支持list[dict]或dict格式
Returns:
格式化后的Cookie列表
"""
# 如果是字典格式(键值对),转换为列表格式
if isinstance(cookies, dict):
cookies = [
{
"name": name,
"value": str(value) if not isinstance(value, str) else value,
"domain": ".xiaohongshu.com",
"path": "/"
}
for name, value in cookies.items()
]
# 验证是否为列表
if not isinstance(cookies, list):
raise ValueError(f"Cookie必须是列表或字典格式当前类型: {type(cookies).__name__}")
# 检查是否为空列表
if not cookies or len(cookies) == 0:
print(f" Cookie列表为空直接返回", file=sys.stderr)
return cookies
# 检查是否是Playwright原生格式包含name和value字段
if isinstance(cookies[0], dict) and 'name' in cookies[0] and 'value' in cookies[0]:
# 已经是Playwright格式直接返回不做任何修改
print(f" 检测到Playwright原生格式直接使用 ({len(cookies)} 个cookie)", file=sys.stderr)
return cookies
# 其他格式,进行基础验证
formatted_cookies = []
for cookie in cookies:
if not isinstance(cookie, dict):
raise ValueError(f"Cookie元素必须是字典格式当前类型: {type(cookie).__name__}")
# 确保有基本字段
if 'domain' not in cookie and 'url' not in cookie:
cookie = cookie.copy()
cookie['domain'] = '.xiaohongshu.com'
if 'path' not in cookie and 'url' not in cookie:
if 'domain' in cookie or 'url' not in cookie:
cookie = cookie.copy() if cookie is cookies[cookies.index(cookie)] else cookie
cookie['path'] = '/'
formatted_cookies.append(cookie)
return formatted_cookies