379 lines
15 KiB
Python
379 lines
15 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
文章自动分配作者脚本
|
|||
|
|
监控数据库中status为pending_review且author_id=0的文章,自动分配作者
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
import time
|
|||
|
|
import json
|
|||
|
|
import logging
|
|||
|
|
import requests
|
|||
|
|
from typing import Dict, List
|
|||
|
|
import traceback
|
|||
|
|
|
|||
|
|
# 添加项目根目录到Python路径
|
|||
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|||
|
|
|
|||
|
|
from database_config import get_db_manager
|
|||
|
|
from log_config import setup_logger
|
|||
|
|
|
|||
|
|
# 配置日志记录器,支持按日期切割和控制台输出
|
|||
|
|
logger = setup_logger(
|
|||
|
|
name='assign_authors_to_articles',
|
|||
|
|
log_file='logs/assign_authors_to_articles.log',
|
|||
|
|
error_log_file='logs/assign_authors_to_articles_error.log',
|
|||
|
|
level=logging.INFO,
|
|||
|
|
console_output=True
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 配置常量
|
|||
|
|
BASE_URL = "http://127.0.0.1:8216"
|
|||
|
|
SLEEP_INTERVAL = 5 # 监控间隔(秒),可配置
|
|||
|
|
|
|||
|
|
class AssignAuthorsToArticles:
|
|||
|
|
def __init__(self):
|
|||
|
|
# API配置
|
|||
|
|
self.base_url = BASE_URL
|
|||
|
|
|
|||
|
|
# 认证信息
|
|||
|
|
self.jwt_token = None
|
|||
|
|
|
|||
|
|
# 使用统一的数据库管理器
|
|||
|
|
self.db_manager = get_db_manager()
|
|||
|
|
|
|||
|
|
# 登录配置 - 使用雇员账号
|
|||
|
|
self.login_credentials = {
|
|||
|
|
'username': '13621242430',
|
|||
|
|
'password': 'admin123'
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 禁用代理
|
|||
|
|
self.proxies = {
|
|||
|
|
'http': None,
|
|||
|
|
'https': None
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info("AssignAuthorsToArticles 初始化完成")
|
|||
|
|
|
|||
|
|
def log_to_database(self, level: str, message: str, details: str = None):
|
|||
|
|
"""记录日志到数据库ai_logs表"""
|
|||
|
|
try:
|
|||
|
|
with self.db_manager.get_cursor() as cursor:
|
|||
|
|
# 映射日志级别到数据库状态
|
|||
|
|
status_map = {
|
|||
|
|
'INFO': 'success',
|
|||
|
|
'WARNING': 'warning',
|
|||
|
|
'ERROR': 'error'
|
|||
|
|
}
|
|||
|
|
status = status_map.get(level, 'success')
|
|||
|
|
|
|||
|
|
sql = """
|
|||
|
|
INSERT INTO ai_logs (user_id, action, description, status, error_message, created_at)
|
|||
|
|
VALUES (%s, %s, %s, %s, %s, NOW())
|
|||
|
|
"""
|
|||
|
|
cursor.execute(sql, (None, 'assign_authors', message, status, details))
|
|||
|
|
logger.info(f"日志已记录到数据库: {level} - {message}")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"记录日志到数据库失败: {e}")
|
|||
|
|
|
|||
|
|
def login_and_get_jwt_token(self) -> bool:
|
|||
|
|
"""登录获取JWT token"""
|
|||
|
|
try:
|
|||
|
|
login_url = f"{self.base_url}/api/auth/login"
|
|||
|
|
login_data = {
|
|||
|
|
"username": self.login_credentials['username'],
|
|||
|
|
"password": self.login_credentials['password']
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(f"尝试登录: {login_data['username']}")
|
|||
|
|
self.log_to_database('INFO', f"尝试登录用户: {login_data['username']}")
|
|||
|
|
|
|||
|
|
response = requests.post(
|
|||
|
|
login_url,
|
|||
|
|
json=login_data,
|
|||
|
|
headers={'Content-Type': 'application/json'},
|
|||
|
|
proxies=self.proxies
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if response.status_code == 200:
|
|||
|
|
result = response.json()
|
|||
|
|
if result.get('code') == 200:
|
|||
|
|
self.jwt_token = result['data']['token']
|
|||
|
|
logger.info("JWT token获取成功")
|
|||
|
|
self.log_to_database('INFO', "JWT token获取成功")
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
error_msg = f"登录失败: {result.get('message', '未知错误')}"
|
|||
|
|
logger.error(error_msg)
|
|||
|
|
self.log_to_database('ERROR', error_msg, json.dumps(result))
|
|||
|
|
return False
|
|||
|
|
else:
|
|||
|
|
error_msg = f"登录请求失败: {response.status_code}"
|
|||
|
|
logger.error(error_msg)
|
|||
|
|
self.log_to_database('ERROR', error_msg, response.text)
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
error_msg = f"登录异常: {e}"
|
|||
|
|
logger.error(error_msg)
|
|||
|
|
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def get_pending_articles(self) -> List[Dict]:
|
|||
|
|
"""获取待分配作者的文章:enterprise_id=1, status=pending_review, author_id=0"""
|
|||
|
|
try:
|
|||
|
|
with self.db_manager.get_cursor() as cursor:
|
|||
|
|
sql = """
|
|||
|
|
SELECT id, title, batch_id, enterprise_id, product_id, status,
|
|||
|
|
author_id, created_at, updated_at
|
|||
|
|
FROM ai_articles
|
|||
|
|
WHERE enterprise_id = 1
|
|||
|
|
AND status = 'pending_review'
|
|||
|
|
AND author_id = 0
|
|||
|
|
AND image_count > 0
|
|||
|
|
ORDER BY created_at ASC
|
|||
|
|
LIMIT 1000
|
|||
|
|
"""
|
|||
|
|
cursor.execute(sql)
|
|||
|
|
results = cursor.fetchall()
|
|||
|
|
|
|||
|
|
if results:
|
|||
|
|
logger.info(f"查询到 {len(results)} 篇待分配作者的文章")
|
|||
|
|
for result in results:
|
|||
|
|
logger.info(f"待分配文章 - ID: {result['id']}, 标题: {result['title']}, 状态: {result['status']}")
|
|||
|
|
else:
|
|||
|
|
logger.info("未查询到待分配作者的文章")
|
|||
|
|
|
|||
|
|
return results
|
|||
|
|
except Exception as e:
|
|||
|
|
error_msg = f"查询待分配文章异常: {e}"
|
|||
|
|
logger.error(error_msg)
|
|||
|
|
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def get_active_authors(self) -> List[Dict]:
|
|||
|
|
"""获取活跃作者:enterprise_id=1, status=active, phone!=空, created_user_id>0"""
|
|||
|
|
try:
|
|||
|
|
with self.db_manager.get_cursor() as cursor:
|
|||
|
|
sql = """
|
|||
|
|
SELECT id, author_name, phone, app_id, app_token,
|
|||
|
|
department_id, department_name, status, created_at
|
|||
|
|
FROM ai_authors
|
|||
|
|
WHERE enterprise_id = 1
|
|||
|
|
AND status = 'active'
|
|||
|
|
AND phone IS NOT NULL AND phone != ''
|
|||
|
|
AND created_user_id > 0
|
|||
|
|
ORDER BY created_at ASC
|
|||
|
|
LIMIT 1000
|
|||
|
|
"""
|
|||
|
|
cursor.execute(sql)
|
|||
|
|
results = cursor.fetchall()
|
|||
|
|
|
|||
|
|
if results:
|
|||
|
|
logger.info(f"查询到 {len(results)} 个活跃作者")
|
|||
|
|
for result in results:
|
|||
|
|
logger.info(f"活跃作者 - ID: {result['id']}, 姓名: {result['author_name']}, 手机: {result['phone']}")
|
|||
|
|
else:
|
|||
|
|
logger.info("未查询到活跃作者")
|
|||
|
|
|
|||
|
|
return results
|
|||
|
|
except Exception as e:
|
|||
|
|
error_msg = f"查询活跃作者异常: {e}"
|
|||
|
|
logger.error(error_msg)
|
|||
|
|
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def assign_authors_to_articles_batch(self, article_ids: List[int], author_ids: List[int]) -> bool:
|
|||
|
|
"""调用批量分配接口:/api/articles/batch-published-account-cycle"""
|
|||
|
|
try:
|
|||
|
|
if not article_ids or not author_ids:
|
|||
|
|
logger.warning("文章ID或作者ID列表为空,跳过分配")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
# 确保有JWT token
|
|||
|
|
if not self.jwt_token:
|
|||
|
|
logger.warning("JWT token缺失,尝试重新登录")
|
|||
|
|
if not self.login_and_get_jwt_token():
|
|||
|
|
logger.error("重新登录失败")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
assign_url = f"{self.base_url}/api/articles/batch-published-account-cycle"
|
|||
|
|
headers = {
|
|||
|
|
'Authorization': f'Bearer {self.jwt_token}',
|
|||
|
|
'Content-Type': 'application/json'
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
data = {
|
|||
|
|
'article_ids': article_ids,
|
|||
|
|
'author_ids': author_ids
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(f"开始批量分配作者 - 文章数: {len(article_ids)}, 作者数: {len(author_ids)}")
|
|||
|
|
logger.info(f"文章ID: {article_ids}")
|
|||
|
|
logger.info(f"作者ID: {author_ids}")
|
|||
|
|
self.log_to_database('INFO', f"开始批量分配作者",
|
|||
|
|
f"文章数: {len(article_ids)}, 作者数: {len(author_ids)}")
|
|||
|
|
|
|||
|
|
response = requests.post(
|
|||
|
|
assign_url,
|
|||
|
|
json=data,
|
|||
|
|
headers=headers,
|
|||
|
|
timeout=60,
|
|||
|
|
proxies=self.proxies
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
logger.info(f"批量分配响应状态码: {response.status_code}")
|
|||
|
|
|
|||
|
|
if response.status_code == 200:
|
|||
|
|
result = response.json()
|
|||
|
|
logger.info(f"批量分配响应: {result}")
|
|||
|
|
|
|||
|
|
if result.get('code') == 200:
|
|||
|
|
data = result.get('data', {})
|
|||
|
|
success_msg = f"批量分配成功 - 成功: {data.get('published_count')}, 失败: {data.get('failed_count')}"
|
|||
|
|
logger.info(success_msg)
|
|||
|
|
self.log_to_database('INFO', success_msg, json.dumps(result, ensure_ascii=False))
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
error_msg = f"批量分配失败: {result.get('message', '未知错误')}"
|
|||
|
|
logger.error(error_msg)
|
|||
|
|
self.log_to_database('ERROR', error_msg, json.dumps(result, ensure_ascii=False))
|
|||
|
|
return False
|
|||
|
|
elif response.status_code == 401:
|
|||
|
|
# Token过期,重新登录后重试
|
|||
|
|
logger.warning("JWT token过期(401),尝试重新登录")
|
|||
|
|
if self.login_and_get_jwt_token():
|
|||
|
|
logger.info("重新登录成功,重试批量分配")
|
|||
|
|
headers['Authorization'] = f'Bearer {self.jwt_token}'
|
|||
|
|
|
|||
|
|
retry_response = requests.post(
|
|||
|
|
assign_url,
|
|||
|
|
json=data,
|
|||
|
|
headers=headers,
|
|||
|
|
timeout=60,
|
|||
|
|
proxies=self.proxies
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if retry_response.status_code == 200:
|
|||
|
|
retry_result = retry_response.json()
|
|||
|
|
if retry_result.get('code') == 200:
|
|||
|
|
data = retry_result.get('data', {})
|
|||
|
|
success_msg = f"重试批量分配成功 - 成功: {data.get('published_count')}, 失败: {data.get('failed_count')}"
|
|||
|
|
logger.info(success_msg)
|
|||
|
|
self.log_to_database('INFO', success_msg, json.dumps(retry_result, ensure_ascii=False))
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
error_msg = f"重试批量分配失败: {retry_response.status_code}"
|
|||
|
|
logger.error(error_msg)
|
|||
|
|
self.log_to_database('ERROR', error_msg, retry_response.text)
|
|||
|
|
return False
|
|||
|
|
else:
|
|||
|
|
logger.error("重新登录失败")
|
|||
|
|
return False
|
|||
|
|
else:
|
|||
|
|
error_msg = f"批量分配请求失败: {response.status_code}"
|
|||
|
|
logger.error(error_msg)
|
|||
|
|
self.log_to_database('ERROR', error_msg, response.text)
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
error_msg = f"批量分配异常: {e}"
|
|||
|
|
logger.error(error_msg)
|
|||
|
|
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def run(self):
|
|||
|
|
"""主运行逻辑"""
|
|||
|
|
try:
|
|||
|
|
logger.info("="*60)
|
|||
|
|
logger.info("开始执行文章自动分配作者任务")
|
|||
|
|
logger.info("="*60)
|
|||
|
|
self.log_to_database('INFO', "开始执行文章自动分配作者任务")
|
|||
|
|
|
|||
|
|
# 1. 获取待分配作者的文章
|
|||
|
|
logger.info("步骤1: 获取待分配作者的文章")
|
|||
|
|
articles = self.get_pending_articles()
|
|||
|
|
if not articles:
|
|||
|
|
logger.info("没有待分配作者的文章,任务结束")
|
|||
|
|
self.log_to_database('INFO', "没有待分配作者的文章,任务结束")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
# 2. 获取活跃作者
|
|||
|
|
logger.info("步骤2: 获取活跃作者")
|
|||
|
|
authors = self.get_active_authors()
|
|||
|
|
if not authors:
|
|||
|
|
logger.error("没有活跃作者,无法分配")
|
|||
|
|
self.log_to_database('ERROR', "没有活跃作者,无法分配")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
# 3. 批量分配作者
|
|||
|
|
logger.info("步骤3: 批量分配作者")
|
|||
|
|
article_ids = [article['id'] for article in articles]
|
|||
|
|
author_ids = [author['id'] for author in authors]
|
|||
|
|
|
|||
|
|
logger.info(f"准备分配 {len(article_ids)} 篇文章给 {len(author_ids)} 个作者")
|
|||
|
|
self.log_to_database('INFO', f"准备分配文章",
|
|||
|
|
f"文章数: {len(article_ids)}, 作者数: {len(author_ids)}")
|
|||
|
|
|
|||
|
|
success = self.assign_authors_to_articles_batch(article_ids, author_ids)
|
|||
|
|
|
|||
|
|
if success:
|
|||
|
|
logger.info("="*60)
|
|||
|
|
logger.info("文章自动分配作者任务完成")
|
|||
|
|
logger.info("="*60)
|
|||
|
|
self.log_to_database('INFO', "文章自动分配作者任务完成")
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
logger.error("文章自动分配作者任务失败")
|
|||
|
|
self.log_to_database('ERROR', "文章自动分配作者任务失败")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
error_msg = f"运行任务异常: {e}"
|
|||
|
|
logger.error(error_msg)
|
|||
|
|
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
"""主函数 - 持续监控模式"""
|
|||
|
|
generator = AssignAuthorsToArticles()
|
|||
|
|
|
|||
|
|
logger.info("="*60)
|
|||
|
|
logger.info(f"启动文章自动分配作者监控服务")
|
|||
|
|
logger.info(f"监控间隔: {SLEEP_INTERVAL}秒")
|
|||
|
|
logger.info("="*60)
|
|||
|
|
|
|||
|
|
# 首次登录获取JWT token
|
|||
|
|
logger.info("首次登录获取JWT token")
|
|||
|
|
if not generator.login_and_get_jwt_token():
|
|||
|
|
logger.error("首次登录失败,程序退出")
|
|||
|
|
generator.log_to_database('ERROR', "首次登录失败,程序退出")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
while True:
|
|||
|
|
try:
|
|||
|
|
# 执行文章自动分配作者任务
|
|||
|
|
generator.run()
|
|||
|
|
|
|||
|
|
# 等待下一次检查
|
|||
|
|
logger.info(f"等待 {SLEEP_INTERVAL} 秒后进行下一次检查...")
|
|||
|
|
time.sleep(SLEEP_INTERVAL)
|
|||
|
|
|
|||
|
|
except KeyboardInterrupt:
|
|||
|
|
logger.info("="*60)
|
|||
|
|
logger.info("收到中断信号,停止监控服务")
|
|||
|
|
logger.info("="*60)
|
|||
|
|
generator.log_to_database('INFO', '监控服务手动停止', 'KeyboardInterrupt')
|
|||
|
|
break
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"程序运行异常: {e}")
|
|||
|
|
generator.log_to_database('ERROR', f'程序运行异常: {e}', traceback.format_exc())
|
|||
|
|
logger.info(f"等待 {SLEEP_INTERVAL} 秒后重试...")
|
|||
|
|
time.sleep(SLEEP_INTERVAL)
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|