Files
ai_wht_B/ver_25122115/assign_authors_to_articles.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

378 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文章自动分配作者脚本
监控数据库中status为pending_review且author_id=0的文章自动分配作者
"""
import os
import sys
import time
import json
import logging
import requests
from typing import Dict, List
import traceback
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from database_config import get_db_manager
from log_config import setup_logger
# 配置日志记录器,支持按日期切割和控制台输出
logger = setup_logger(
name='assign_authors_to_articles',
log_file='logs/assign_authors_to_articles.log',
error_log_file='logs/assign_authors_to_articles_error.log',
level=logging.INFO,
console_output=True
)
# 配置常量
BASE_URL = "http://127.0.0.1:8216"
SLEEP_INTERVAL = 5 # 监控间隔(秒),可配置
class AssignAuthorsToArticles:
def __init__(self):
# API配置
self.base_url = BASE_URL
# 认证信息
self.jwt_token = None
# 使用统一的数据库管理器
self.db_manager = get_db_manager()
# 登录配置 - 使用雇员账号
self.login_credentials = {
'username': '13621242430',
'password': 'admin123'
}
# 禁用代理
self.proxies = {
'http': None,
'https': None
}
logger.info("AssignAuthorsToArticles 初始化完成")
def log_to_database(self, level: str, message: str, details: str = None):
"""记录日志到数据库ai_logs表"""
try:
with self.db_manager.get_cursor() as cursor:
# 映射日志级别到数据库状态
status_map = {
'INFO': 'success',
'WARNING': 'warning',
'ERROR': 'error'
}
status = status_map.get(level, 'success')
sql = """
INSERT INTO ai_logs (user_id, action, description, status, error_message, created_at)
VALUES (%s, %s, %s, %s, %s, NOW())
"""
cursor.execute(sql, (None, 'assign_authors', message, status, details))
logger.info(f"日志已记录到数据库: {level} - {message}")
except Exception as e:
logger.error(f"记录日志到数据库失败: {e}")
def login_and_get_jwt_token(self) -> bool:
"""登录获取JWT token"""
try:
login_url = f"{self.base_url}/api/auth/login"
login_data = {
"username": self.login_credentials['username'],
"password": self.login_credentials['password']
}
logger.info(f"尝试登录: {login_data['username']}")
self.log_to_database('INFO', f"尝试登录用户: {login_data['username']}")
response = requests.post(
login_url,
json=login_data,
headers={'Content-Type': 'application/json'},
proxies=self.proxies
)
if response.status_code == 200:
result = response.json()
if result.get('code') == 200:
self.jwt_token = result['data']['token']
logger.info("JWT token获取成功")
self.log_to_database('INFO', "JWT token获取成功")
return True
else:
error_msg = f"登录失败: {result.get('message', '未知错误')}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, json.dumps(result))
return False
else:
error_msg = f"登录请求失败: {response.status_code}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, response.text)
return False
except Exception as e:
error_msg = f"登录异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return False
def get_pending_articles(self) -> List[Dict]:
"""获取待分配作者的文章enterprise_id=1, status=pending_review, author_id=0"""
try:
with self.db_manager.get_cursor() as cursor:
sql = """
SELECT id, title, batch_id, enterprise_id, product_id, status,
author_id, created_at, updated_at
FROM ai_articles
WHERE enterprise_id = 1
AND status = 'pending_review'
AND author_id = 0
ORDER BY created_at ASC
LIMIT 1000
"""
cursor.execute(sql)
results = cursor.fetchall()
if results:
logger.info(f"查询到 {len(results)} 篇待分配作者的文章")
for result in results:
logger.info(f"待分配文章 - ID: {result['id']}, 标题: {result['title']}, 状态: {result['status']}")
else:
logger.info("未查询到待分配作者的文章")
return results
except Exception as e:
error_msg = f"查询待分配文章异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return []
def get_active_authors(self) -> List[Dict]:
"""获取活跃作者enterprise_id=1, status=active, phone!=空, created_user_id>0"""
try:
with self.db_manager.get_cursor() as cursor:
sql = """
SELECT id, author_name, phone, app_id, app_token,
department_id, department_name, status, created_at
FROM ai_authors
WHERE enterprise_id = 1
AND status = 'active'
AND phone IS NOT NULL AND phone != ''
AND created_user_id > 0
ORDER BY created_at ASC
LIMIT 1000
"""
cursor.execute(sql)
results = cursor.fetchall()
if results:
logger.info(f"查询到 {len(results)} 个活跃作者")
for result in results:
logger.info(f"活跃作者 - ID: {result['id']}, 姓名: {result['author_name']}, 手机: {result['phone']}")
else:
logger.info("未查询到活跃作者")
return results
except Exception as e:
error_msg = f"查询活跃作者异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return []
def assign_authors_to_articles_batch(self, article_ids: List[int], author_ids: List[int]) -> bool:
"""调用批量分配接口:/api/articles/batch-published-account-cycle"""
try:
if not article_ids or not author_ids:
logger.warning("文章ID或作者ID列表为空跳过分配")
return False
# 确保有JWT token
if not self.jwt_token:
logger.warning("JWT token缺失尝试重新登录")
if not self.login_and_get_jwt_token():
logger.error("重新登录失败")
return False
assign_url = f"{self.base_url}/api/articles/batch-published-account-cycle"
headers = {
'Authorization': f'Bearer {self.jwt_token}',
'Content-Type': 'application/json'
}
data = {
'article_ids': article_ids,
'author_ids': author_ids
}
logger.info(f"开始批量分配作者 - 文章数: {len(article_ids)}, 作者数: {len(author_ids)}")
logger.info(f"文章ID: {article_ids}")
logger.info(f"作者ID: {author_ids}")
self.log_to_database('INFO', f"开始批量分配作者",
f"文章数: {len(article_ids)}, 作者数: {len(author_ids)}")
response = requests.post(
assign_url,
json=data,
headers=headers,
timeout=60,
proxies=self.proxies
)
logger.info(f"批量分配响应状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
logger.info(f"批量分配响应: {result}")
if result.get('code') == 200:
data = result.get('data', {})
success_msg = f"批量分配成功 - 成功: {data.get('published_count')}, 失败: {data.get('failed_count')}"
logger.info(success_msg)
self.log_to_database('INFO', success_msg, json.dumps(result, ensure_ascii=False))
return True
else:
error_msg = f"批量分配失败: {result.get('message', '未知错误')}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, json.dumps(result, ensure_ascii=False))
return False
elif response.status_code == 401:
# Token过期重新登录后重试
logger.warning("JWT token过期(401),尝试重新登录")
if self.login_and_get_jwt_token():
logger.info("重新登录成功,重试批量分配")
headers['Authorization'] = f'Bearer {self.jwt_token}'
retry_response = requests.post(
assign_url,
json=data,
headers=headers,
timeout=60,
proxies=self.proxies
)
if retry_response.status_code == 200:
retry_result = retry_response.json()
if retry_result.get('code') == 200:
data = retry_result.get('data', {})
success_msg = f"重试批量分配成功 - 成功: {data.get('published_count')}, 失败: {data.get('failed_count')}"
logger.info(success_msg)
self.log_to_database('INFO', success_msg, json.dumps(retry_result, ensure_ascii=False))
return True
error_msg = f"重试批量分配失败: {retry_response.status_code}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, retry_response.text)
return False
else:
logger.error("重新登录失败")
return False
else:
error_msg = f"批量分配请求失败: {response.status_code}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, response.text)
return False
except Exception as e:
error_msg = f"批量分配异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return False
def run(self):
"""主运行逻辑"""
try:
logger.info("="*60)
logger.info("开始执行文章自动分配作者任务")
logger.info("="*60)
self.log_to_database('INFO', "开始执行文章自动分配作者任务")
# 1. 获取待分配作者的文章
logger.info("步骤1: 获取待分配作者的文章")
articles = self.get_pending_articles()
if not articles:
logger.info("没有待分配作者的文章,任务结束")
self.log_to_database('INFO', "没有待分配作者的文章,任务结束")
return True
# 2. 获取活跃作者
logger.info("步骤2: 获取活跃作者")
authors = self.get_active_authors()
if not authors:
logger.error("没有活跃作者,无法分配")
self.log_to_database('ERROR', "没有活跃作者,无法分配")
return False
# 3. 批量分配作者
logger.info("步骤3: 批量分配作者")
article_ids = [article['id'] for article in articles]
author_ids = [author['id'] for author in authors]
logger.info(f"准备分配 {len(article_ids)} 篇文章给 {len(author_ids)} 个作者")
self.log_to_database('INFO', f"准备分配文章",
f"文章数: {len(article_ids)}, 作者数: {len(author_ids)}")
success = self.assign_authors_to_articles_batch(article_ids, author_ids)
if success:
logger.info("="*60)
logger.info("文章自动分配作者任务完成")
logger.info("="*60)
self.log_to_database('INFO', "文章自动分配作者任务完成")
return True
else:
logger.error("文章自动分配作者任务失败")
self.log_to_database('ERROR', "文章自动分配作者任务失败")
return False
except Exception as e:
error_msg = f"运行任务异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return False
def main():
"""主函数 - 持续监控模式"""
generator = AssignAuthorsToArticles()
logger.info("="*60)
logger.info(f"启动文章自动分配作者监控服务")
logger.info(f"监控间隔: {SLEEP_INTERVAL}")
logger.info("="*60)
# 首次登录获取JWT token
logger.info("首次登录获取JWT token")
if not generator.login_and_get_jwt_token():
logger.error("首次登录失败,程序退出")
generator.log_to_database('ERROR', "首次登录失败,程序退出")
return
while True:
try:
# 执行文章自动分配作者任务
generator.run()
# 等待下一次检查
logger.info(f"等待 {SLEEP_INTERVAL} 秒后进行下一次检查...")
time.sleep(SLEEP_INTERVAL)
except KeyboardInterrupt:
logger.info("="*60)
logger.info("收到中断信号,停止监控服务")
logger.info("="*60)
generator.log_to_database('INFO', '监控服务手动停止', 'KeyboardInterrupt')
break
except Exception as e:
logger.error(f"程序运行异常: {e}")
generator.log_to_database('ERROR', f'程序运行异常: {e}', traceback.format_exc())
logger.info(f"等待 {SLEEP_INTERVAL} 秒后重试...")
time.sleep(SLEEP_INTERVAL)
if __name__ == "__main__":
main()