Files
ai_wht_wechat/backend/scheduler.py
2026-01-23 16:27:47 +08:00

599 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
小红书定时发布调度器
管理自动发布任务的调度和执行
"""
import asyncio
import sys
import random
from datetime import datetime, time as dt_time
from typing import List, Dict, Any, Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import pymysql
import json
import aiohttp
from xhs_login import XHSLoginService
class XHSScheduler:
"""小红书定时发布调度器"""
def __init__(self,
db_config: Dict[str, Any],
max_concurrent: int = 2,
publish_timeout: int = 300,
max_articles_per_user_per_run: int = 5,
max_failures_per_user_per_run: int = 3,
max_daily_articles_per_user: int = 20,
max_hourly_articles_per_user: int = 3,
proxy_pool_enabled: bool = False,
proxy_pool_api_url: Optional[str] = None,
enable_random_ua: bool = True,
min_publish_interval: int = 30,
max_publish_interval: int = 120,
headless: bool = True,
use_adspower: bool = True,
proxy_username: Optional[str] = None, # 新增:代理用户名
proxy_password: Optional[str] = None): # 新增:代理密码
"""
初始化调度器
Args:
db_config: 数据库配置
max_concurrent: 最大并发发布数
publish_timeout: 发布超时时间(秒)
max_articles_per_user_per_run: 每轮每用户最大发文数
max_failures_per_user_per_run: 每轮每用户最大失败次数
max_daily_articles_per_user: 每用户每日最大发文数
max_hourly_articles_per_user: 每用户每小时最大发文数
enable_random_ua: 是否启用随机User-Agent
min_publish_interval: 最小发布间隔(秒)
max_publish_interval: 最大发布间隔(秒)
headless: 是否使用无头模式False为有头模式方便调试
use_adspower: 是否使用AdsPower浏览器管理
proxy_username: 代理用户名(可选,白名单模式可留空)
proxy_password: 代理密码(可选,白名单模式可留空)
"""
self.db_config = db_config
self.max_concurrent = max_concurrent
self.publish_timeout = publish_timeout
self.max_articles_per_user_per_run = max_articles_per_user_per_run
self.max_failures_per_user_per_run = max_failures_per_user_per_run
self.max_daily_articles_per_user = max_daily_articles_per_user
self.max_hourly_articles_per_user = max_hourly_articles_per_user
self.proxy_pool_enabled = proxy_pool_enabled
self.proxy_pool_api_url = proxy_pool_api_url or ""
self.proxy_username = proxy_username or "" # 保存代理用户名
self.proxy_password = proxy_password or "" # 保存代理密码
self.enable_random_ua = enable_random_ua
self.min_publish_interval = min_publish_interval
self.max_publish_interval = max_publish_interval
self.headless = headless
self.use_adspower = use_adspower
self.scheduler = AsyncIOScheduler()
# 使用AdsPower时禁用浏览器池避免资源冲突
self.login_service = XHSLoginService(
use_pool=False, # 使用AdsPower不需要浏览器池
headless=headless,
use_adspower=use_adspower
)
self.semaphore = asyncio.Semaphore(max_concurrent)
mode_text = "AdsPower" if use_adspower else "浏览器池" if not use_adspower else "传统"
print(f"[调度器] 已创建,最大并发: {max_concurrent},浏览器模式: {mode_text}", file=sys.stderr)
def start(self, cron_expr: str = "*/5 * * * * *"):
"""
启动定时任务
Args:
cron_expr: Cron表达式默认每5秒执行一次
格式: 秒 分 时 日 月 周
"""
# 解析cron表达式
parts = cron_expr.split()
if len(parts) == 6:
# 6位格式: 秒 分 时 日 月 周
trigger = CronTrigger(
second=parts[0],
minute=parts[1],
hour=parts[2],
day=parts[3],
month=parts[4],
day_of_week=parts[5]
)
else:
print(f"[调度器] ⚠️ Cron表达式格式错误: {cron_expr},使用默认配置", file=sys.stderr)
trigger = CronTrigger(second="*/5")
self.scheduler.add_job(
self.auto_publish_articles,
trigger=trigger,
id='xhs_publish',
name='小红书自动发布',
max_instances=1, # 最多只允许1个实例同时运行防止重复执行
replace_existing=True # 如果任务已存在则替换,避免重启时重复添加
)
self.scheduler.start()
print(f"[调度器] 定时发布任务已启动Cron表达式: {cron_expr}", file=sys.stderr)
def stop(self):
"""停止定时任务"""
self.scheduler.shutdown()
print("[调度器] 定时发布任务已停止", file=sys.stderr)
def get_db_connection(self):
"""获取数据库连接"""
return pymysql.connect(
host=self.db_config['host'],
port=self.db_config['port'],
user=self.db_config['user'],
password=self.db_config['password'],
database=self.db_config['database'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
async def _fetch_proxy_from_pool(self) -> Optional[dict]:
"""从代理池接口获取一个代理地址,并附加认证信息
Returns:
dict: 代理配置字典 {'server': 'http://ip:port', 'username': '...', 'password': '...'}
或 None 如果未启用或获取失败
"""
if not self.proxy_pool_enabled or not self.proxy_pool_api_url:
return None
try:
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(self.proxy_pool_api_url) as resp:
if resp.status != 200:
print(f"[调度器] 代理池接口返回非200状态码: {resp.status}", file=sys.stderr)
return None
text = (await resp.text()).strip()
if not text:
print("[调度器] 代理池返回内容为空", file=sys.stderr)
return None
line = text.splitlines()[0].strip()
if not line:
print("[调度器] 代理池首行内容为空", file=sys.stderr)
return None
# 构建代理URL
proxy_server = line if line.startswith(("http://", "https://")) else "http://" + line
# 构建完整的代理配置字典
proxy_config = {
'server': proxy_server
}
# 如果配置了认证信息,添加到配置中
if self.proxy_username and self.proxy_password:
proxy_config['username'] = self.proxy_username
proxy_config['password'] = self.proxy_password
print(f"[调度器] 获取代理成功: {proxy_server} (认证代理, 用户名: {self.proxy_username})", file=sys.stderr)
else:
print(f"[调度器] 获取代理成功: {proxy_server} (白名单模式)", file=sys.stderr)
return proxy_config
except Exception as e:
print(f"[调度器] 请求代理池接口失败: {str(e)}", file=sys.stderr)
return None
def _generate_random_user_agent(self) -> str:
"""生成随机User-Agent防止浏览器指纹识别"""
chrome_versions = ['120.0.0.0', '119.0.0.0', '118.0.0.0', '117.0.0.0', '116.0.0.0']
windows_versions = ['Windows NT 10.0; Win64; x64', 'Windows NT 11.0; Win64; x64']
chrome_ver = random.choice(chrome_versions)
win_ver = random.choice(windows_versions)
return f'Mozilla/5.0 ({win_ver}) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{chrome_ver} Safari/537.36'
async def auto_publish_articles(self):
"""自动发布文案(定时任务主函数)"""
print("========== 开始执行定时发布任务 ==========", file=sys.stderr)
start_time = datetime.now()
try:
conn = self.get_db_connection()
cursor = conn.cursor()
# 1. 查询所有待发布的文案
cursor.execute("""
SELECT * FROM ai_articles
WHERE status = 'published_review'
ORDER BY id ASC
""")
articles = cursor.fetchall()
if not articles:
print("没有待发布的文案", file=sys.stderr)
cursor.close()
conn.close()
return
original_total = len(articles)
# 2. 限制每用户每轮发文数
articles = self._limit_articles_per_user(articles, self.max_articles_per_user_per_run)
print(f"找到 {original_total} 篇待发布文案,按照每个用户每轮最多 {self.max_articles_per_user_per_run} 篇,本次计划发布 {len(articles)}", file=sys.stderr)
# 3. 应用每日/每小时上限过滤
if self.max_daily_articles_per_user > 0 or self.max_hourly_articles_per_user > 0:
before_count = len(articles)
articles = await self._filter_by_daily_and_hourly_limit(
cursor, articles,
self.max_daily_articles_per_user,
self.max_hourly_articles_per_user
)
print(f"应用每日/每小时上限过滤:过滤前 {before_count} 篇,过滤后 {len(articles)}", file=sys.stderr)
if not articles:
print("所有文案均因频率限制被过滤,本轮无任务", file=sys.stderr)
cursor.close()
conn.close()
return
# 4. 并发发布
tasks = []
user_fail_count = {}
paused_users = set()
for article in articles:
user_id = article['publish_user_id'] or article['created_user_id']
if user_id in paused_users:
print(f"用户 {user_id} 在本轮已暂停,跳过文案 ID: {article['id']}", file=sys.stderr)
continue
# 直接发布,不在这里延迟
task = asyncio.create_task(
self._publish_article_with_semaphore(
article, user_id, cursor, user_fail_count, paused_users
)
)
tasks.append(task)
# 等待所有发布完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 统计结果
success_count = sum(1 for r in results if r is True)
fail_count = len(results) - success_count
cursor.close()
conn.close()
duration = (datetime.now() - start_time).total_seconds()
print("========== 定时发布任务完成 ==========", file=sys.stderr)
print(f"总计: {len(articles)} 篇, 成功: {success_count} 篇, 失败: {fail_count} 篇, 耗时: {duration:.1f}", file=sys.stderr)
except Exception as e:
print(f"[调度器] 定时任务异常: {str(e)}", file=sys.stderr)
import traceback
traceback.print_exc()
async def _publish_article_with_semaphore(self, article: Dict, user_id: int,
cursor, user_fail_count: Dict, paused_users: set):
"""带信号量控制的发布文章"""
async with self.semaphore:
try:
print(f"[调度器] 开始发布文案 {article['id']}: {article['title']}", file=sys.stderr)
success = await self._publish_single_article(article, cursor)
if not success:
user_fail_count[user_id] = user_fail_count.get(user_id, 0) + 1
if user_fail_count[user_id] >= self.max_failures_per_user_per_run:
paused_users.add(user_id)
print(f"用户 {user_id} 在本轮失败次数达到 {user_fail_count[user_id]} 次,暂停本轮后续发布", file=sys.stderr)
print(f"发布失败 [文案ID: {article['id']}, 标题: {article['title']}]", file=sys.stderr)
return False
else:
print(f"发布成功 [文案ID: {article['id']}, 标题: {article['title']}]", file=sys.stderr)
return True
except Exception as e:
print(f"发布异常 [文案ID: {article['id']}]: {str(e)}", file=sys.stderr)
return False
async def _publish_single_article(self, article: Dict, cursor) -> bool:
"""发布单篇文章"""
try:
# 1. 获取用户信息
user_id = article['publish_user_id'] or article['created_user_id']
cursor.execute("SELECT * FROM ai_users WHERE id = %s", (user_id,))
user = cursor.fetchone()
if not user:
self._update_article_status(cursor, article['id'], 'failed', '获取用户信息失败')
return False
# 2. 检查用户是否绑定小红书
if user['is_bound_xhs'] != 1:
self._update_article_status(cursor, article['id'], 'failed', '用户未绑定小红书账号')
return False
# 3. 获取author记录和Cookie
cursor.execute("""
SELECT * FROM ai_authors
WHERE phone = %s AND enterprise_id = %s AND channel = 1 AND status = 'active'
LIMIT 1
""", (user['phone'], user['enterprise_id']))
author = cursor.fetchone()
if not author or not author['xhs_cookie']:
self._update_article_status(cursor, article['id'], 'failed', '小红书Cookie已失效')
return False
# 4. 获取文章图片
cursor.execute("""
SELECT image_url FROM ai_article_images
WHERE article_id = %s
ORDER BY sort_order ASC
""", (article['id'],))
images = [img['image_url'] for img in cursor.fetchall() if img['image_url']]
# 5. 获取标签
cursor.execute("SELECT coze_tag FROM ai_article_tags WHERE article_id = %s LIMIT 1", (article['id'],))
tag_row = cursor.fetchone()
topics = []
if tag_row and tag_row['coze_tag']:
topics = self._parse_tags(tag_row['coze_tag'])
# 6. 解析Cookie并格式化
try:
# 数据库中存储的是完整的login_state JSON
login_state = json.loads(author['xhs_cookie'])
# 处理双重JSON编码的情况
if isinstance(login_state, str):
login_state = json.loads(login_state)
# 提取cookies字段兼容旧格式如果login_state本身就是cookies列表
if isinstance(login_state, dict) and 'cookies' in login_state:
# 新格式login_state对象包含cookies字段
cookies = login_state['cookies']
print(f" 从login_state提取cookies: {len(cookies) if isinstance(cookies, list) else 'unknown'}", file=sys.stderr)
elif isinstance(login_state, (list, dict)):
# 旧格式直接是cookies
cookies = login_state
print(f" 使用旧格式cookies无login_state包装", file=sys.stderr)
else:
raise ValueError(f"无法识别的Cookie存储格式: {type(login_state).__name__}")
# 验证cookies格式
if not isinstance(cookies, (list, dict)):
raise ValueError(f"Cookie必须是列表或字典格式当前类型: {type(cookies).__name__}")
# 格式化Cookie确保包含domain字段
cookies = self._format_cookies(cookies)
except Exception as e:
self._update_article_status(cursor, article['id'], 'failed', f'Cookie格式错误: {str(e)}')
return False
# 7. 从代理池获取代理(如果启用)
proxy = await self._fetch_proxy_from_pool()
if proxy:
print(f"[调度器] 使用代理: {proxy}", file=sys.stderr)
# 8. 生成随机User-Agent防指纹识别
user_agent = self._generate_random_user_agent() if self.enable_random_ua else None
if user_agent:
print(f"[调度器] 使用随机UA: {user_agent[:50]}...", file=sys.stderr)
# 9. 调用发布服务(增加超时控制)
try:
print(f"[调度器] 开始调用发布服务,超时设置: {self.publish_timeout}", file=sys.stderr)
result = await asyncio.wait_for(
self.login_service.publish_note(
title=article['title'],
content=article['content'],
images=images,
topics=topics,
cookies=cookies,
proxy=proxy,
user_agent=user_agent,
),
timeout=self.publish_timeout
)
except asyncio.TimeoutError:
error_msg = f'发布超时({self.publish_timeout}秒)'
print(f"[调度器] {error_msg}", file=sys.stderr)
self._update_article_status(cursor, article['id'], 'failed', error_msg)
return False
except Exception as e:
error_msg = f'调用发布服务异常: {str(e)}'
print(f"[调度器] {error_msg}", file=sys.stderr)
import traceback
traceback.print_exc()
self._update_article_status(cursor, article['id'], 'failed', error_msg)
return False
# 10. 更新状态
if result['success']:
self._update_article_status(cursor, article['id'], 'published', '发布成功')
return True
else:
error_msg = result.get('error', '未知错误')
self._update_article_status(cursor, article['id'], 'failed', error_msg)
return False
except Exception as e:
self._update_article_status(cursor, article['id'], 'failed', f'发布异常: {str(e)}')
return False
def _update_article_status(self, cursor, article_id: int, status: str, message: str = ''):
"""更新文章状态"""
try:
if status == 'published':
cursor.execute("""
UPDATE ai_articles
SET status = %s, publish_time = NOW(), updated_at = NOW()
WHERE id = %s
""", (status, article_id))
else:
cursor.execute("""
UPDATE ai_articles
SET status = %s, review_comment = %s, updated_at = NOW()
WHERE id = %s
""", (status, message, article_id))
cursor.connection.commit()
except Exception as e:
print(f"更新文章 {article_id} 状态失败: {str(e)}", file=sys.stderr)
def _limit_articles_per_user(self, articles: List[Dict], per_user_limit: int) -> List[Dict]:
"""限制每用户发文数"""
if per_user_limit <= 0:
return articles
grouped = {}
for art in articles:
user_id = art['publish_user_id'] or art['created_user_id']
if user_id not in grouped:
grouped[user_id] = []
grouped[user_id].append(art)
limited = []
for user_id, user_articles in grouped.items():
limited.extend(user_articles[:per_user_limit])
return limited
async def _filter_by_daily_and_hourly_limit(self, cursor, articles: List[Dict],
max_daily: int, max_hourly: int) -> List[Dict]:
"""按每日和每小时上限过滤文章"""
if max_daily <= 0 and max_hourly <= 0:
return articles
# 提取所有用户ID
user_ids = set()
for art in articles:
user_id = art['publish_user_id'] or art['created_user_id']
user_ids.add(user_id)
# 查询每用户已发布数量
user_daily_published = {}
user_hourly_published = {}
now = datetime.now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
current_hour_start = now.replace(minute=0, second=0, microsecond=0)
for user_id in user_ids:
# 查询当日已发布数量
if max_daily > 0:
cursor.execute("""
SELECT COUNT(*) as count FROM ai_articles
WHERE status = 'published' AND publish_time >= %s
AND (publish_user_id = %s OR (publish_user_id IS NULL AND created_user_id = %s))
""", (today_start, user_id, user_id))
user_daily_published[user_id] = cursor.fetchone()['count']
# 查询当前小时已发布数量
if max_hourly > 0:
cursor.execute("""
SELECT COUNT(*) as count FROM ai_articles
WHERE status = 'published' AND publish_time >= %s
AND (publish_user_id = %s OR (publish_user_id IS NULL AND created_user_id = %s))
""", (current_hour_start, user_id, user_id))
user_hourly_published[user_id] = cursor.fetchone()['count']
# 过滤超限文章
filtered = []
for art in articles:
user_id = art['publish_user_id'] or art['created_user_id']
# 检查每日上限
if max_daily > 0 and user_daily_published.get(user_id, 0) >= max_daily:
continue
# 检查每小时上限
if max_hourly > 0 and user_hourly_published.get(user_id, 0) >= max_hourly:
continue
filtered.append(art)
return filtered
def _parse_tags(self, tag_str: str) -> List[str]:
"""解析标签字符串"""
if not tag_str:
return []
# 替换分隔符
tag_str = tag_str.replace(';', ',').replace(' ', ',').replace('', ',')
# 分割并清理
tags = []
for tag in tag_str.split(','):
tag = tag.strip()
if tag:
tags.append(tag)
return tags
def _format_cookies(self, cookies) -> List[Dict]:
"""
格式化Cookie只处理非标准格式的Cookie
对于Playwright原生格式的Cookie直接返回不做任何修改
Args:
cookies: Cookie数据支持list[dict]或dict格式
Returns:
格式化后的Cookie列表
"""
# 如果是字典格式(键值对),转换为列表格式
if isinstance(cookies, dict):
cookies = [
{
"name": name,
"value": str(value) if not isinstance(value, str) else value,
"domain": ".xiaohongshu.com",
"path": "/"
}
for name, value in cookies.items()
]
# 验证是否为列表
if not isinstance(cookies, list):
raise ValueError(f"Cookie必须是列表或字典格式当前类型: {type(cookies).__name__}")
# 检查是否为空列表
if not cookies or len(cookies) == 0:
print(f" Cookie列表为空直接返回", file=sys.stderr)
return cookies
# 检查是否是Playwright原生格式包含name和value字段
if isinstance(cookies[0], dict) and 'name' in cookies[0] and 'value' in cookies[0]:
# 已经是Playwright格式直接返回不做任何修改
print(f" 检测到Playwright原生格式直接使用 ({len(cookies)} 个cookie)", file=sys.stderr)
return cookies
# 其他格式,进行基础验证
formatted_cookies = []
for cookie in cookies:
if not isinstance(cookie, dict):
raise ValueError(f"Cookie元素必须是字典格式当前类型: {type(cookie).__name__}")
# 确保有基本字段
if 'domain' not in cookie and 'url' not in cookie:
cookie = cookie.copy()
cookie['domain'] = '.xiaohongshu.com'
if 'path' not in cookie and 'url' not in cookie:
if 'domain' in cookie or 'url' not in cookie:
cookie = cookie.copy() if cookie is cookies[cookies.index(cookie)] else cookie
cookie['path'] = '/'
formatted_cookies.append(cookie)
return formatted_cookies