#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 文章自动分配作者脚本 监控数据库中status为pending_review且author_id=0的文章,自动分配作者 """ import os import sys import time import json import logging import requests from typing import Dict, List import traceback # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) from database_config import get_db_manager from log_config import setup_logger # 配置日志记录器,支持按日期切割和控制台输出 logger = setup_logger( name='assign_authors_to_articles', log_file='logs/assign_authors_to_articles.log', error_log_file='logs/assign_authors_to_articles_error.log', level=logging.INFO, console_output=True ) # 配置常量 BASE_URL = "http://127.0.0.1:8216" SLEEP_INTERVAL = 5 # 监控间隔(秒),可配置 class AssignAuthorsToArticles: def __init__(self): # API配置 self.base_url = BASE_URL # 认证信息 self.jwt_token = None # 使用统一的数据库管理器 self.db_manager = get_db_manager() # 登录配置 - 使用雇员账号 self.login_credentials = { 'username': '13621242430', 'password': 'admin123' } # 禁用代理 self.proxies = { 'http': None, 'https': None } logger.info("AssignAuthorsToArticles 初始化完成") def log_to_database(self, level: str, message: str, details: str = None): """记录日志到数据库ai_logs表""" try: with self.db_manager.get_cursor() as cursor: # 映射日志级别到数据库状态 status_map = { 'INFO': 'success', 'WARNING': 'warning', 'ERROR': 'error' } status = status_map.get(level, 'success') sql = """ INSERT INTO ai_logs (user_id, action, description, status, error_message, created_at) VALUES (%s, %s, %s, %s, %s, NOW()) """ cursor.execute(sql, (None, 'assign_authors', message, status, details)) logger.info(f"日志已记录到数据库: {level} - {message}") except Exception as e: logger.error(f"记录日志到数据库失败: {e}") def login_and_get_jwt_token(self) -> bool: """登录获取JWT token""" try: login_url = f"{self.base_url}/api/auth/login" login_data = { "username": self.login_credentials['username'], "password": self.login_credentials['password'] } logger.info(f"尝试登录: {login_data['username']}") self.log_to_database('INFO', f"尝试登录用户: {login_data['username']}") response = requests.post( login_url, json=login_data, headers={'Content-Type': 'application/json'}, proxies=self.proxies ) if response.status_code == 200: result = response.json() if result.get('code') == 200: self.jwt_token = result['data']['token'] logger.info("JWT token获取成功") self.log_to_database('INFO', "JWT token获取成功") return True else: error_msg = f"登录失败: {result.get('message', '未知错误')}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, json.dumps(result)) return False else: error_msg = f"登录请求失败: {response.status_code}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, response.text) return False except Exception as e: error_msg = f"登录异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return False def get_pending_articles(self) -> List[Dict]: """获取待分配作者的文章:enterprise_id=1, status=pending_review, author_id=0""" try: with self.db_manager.get_cursor() as cursor: sql = """ SELECT id, title, batch_id, enterprise_id, product_id, status, author_id, created_at, updated_at FROM ai_articles WHERE enterprise_id = 1 AND status = 'pending_review' AND author_id = 0 AND image_count > 0 ORDER BY created_at ASC LIMIT 1000 """ cursor.execute(sql) results = cursor.fetchall() if results: logger.info(f"查询到 {len(results)} 篇待分配作者的文章") for result in results: logger.info(f"待分配文章 - ID: {result['id']}, 标题: {result['title']}, 状态: {result['status']}") else: logger.info("未查询到待分配作者的文章") return results except Exception as e: error_msg = f"查询待分配文章异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return [] def get_active_authors(self) -> List[Dict]: """获取活跃作者:enterprise_id=1, status=active, phone!=空, created_user_id>0""" try: with self.db_manager.get_cursor() as cursor: sql = """ SELECT id, author_name, phone, app_id, app_token, department_id, department_name, status, created_at FROM ai_authors WHERE enterprise_id = 1 AND status = 'active' AND phone IS NOT NULL AND phone != '' AND created_user_id > 0 ORDER BY created_at ASC LIMIT 1000 """ cursor.execute(sql) results = cursor.fetchall() if results: logger.info(f"查询到 {len(results)} 个活跃作者") for result in results: logger.info(f"活跃作者 - ID: {result['id']}, 姓名: {result['author_name']}, 手机: {result['phone']}") else: logger.info("未查询到活跃作者") return results except Exception as e: error_msg = f"查询活跃作者异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return [] def assign_authors_to_articles_batch(self, article_ids: List[int], author_ids: List[int]) -> bool: """调用批量分配接口:/api/articles/batch-published-account-cycle""" try: if not article_ids or not author_ids: logger.warning("文章ID或作者ID列表为空,跳过分配") return False # 确保有JWT token if not self.jwt_token: logger.warning("JWT token缺失,尝试重新登录") if not self.login_and_get_jwt_token(): logger.error("重新登录失败") return False assign_url = f"{self.base_url}/api/articles/batch-published-account-cycle" headers = { 'Authorization': f'Bearer {self.jwt_token}', 'Content-Type': 'application/json' } data = { 'article_ids': article_ids, 'author_ids': author_ids } logger.info(f"开始批量分配作者 - 文章数: {len(article_ids)}, 作者数: {len(author_ids)}") logger.info(f"文章ID: {article_ids}") logger.info(f"作者ID: {author_ids}") self.log_to_database('INFO', f"开始批量分配作者", f"文章数: {len(article_ids)}, 作者数: {len(author_ids)}") response = requests.post( assign_url, json=data, headers=headers, timeout=60, proxies=self.proxies ) logger.info(f"批量分配响应状态码: {response.status_code}") if response.status_code == 200: result = response.json() logger.info(f"批量分配响应: {result}") if result.get('code') == 200: data = result.get('data', {}) success_msg = f"批量分配成功 - 成功: {data.get('published_count')}, 失败: {data.get('failed_count')}" logger.info(success_msg) self.log_to_database('INFO', success_msg, json.dumps(result, ensure_ascii=False)) return True else: error_msg = f"批量分配失败: {result.get('message', '未知错误')}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, json.dumps(result, ensure_ascii=False)) return False elif response.status_code == 401: # Token过期,重新登录后重试 logger.warning("JWT token过期(401),尝试重新登录") if self.login_and_get_jwt_token(): logger.info("重新登录成功,重试批量分配") headers['Authorization'] = f'Bearer {self.jwt_token}' retry_response = requests.post( assign_url, json=data, headers=headers, timeout=60, proxies=self.proxies ) if retry_response.status_code == 200: retry_result = retry_response.json() if retry_result.get('code') == 200: data = retry_result.get('data', {}) success_msg = f"重试批量分配成功 - 成功: {data.get('published_count')}, 失败: {data.get('failed_count')}" logger.info(success_msg) self.log_to_database('INFO', success_msg, json.dumps(retry_result, ensure_ascii=False)) return True error_msg = f"重试批量分配失败: {retry_response.status_code}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, retry_response.text) return False else: logger.error("重新登录失败") return False else: error_msg = f"批量分配请求失败: {response.status_code}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, response.text) return False except Exception as e: error_msg = f"批量分配异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return False def run(self): """主运行逻辑""" try: logger.info("="*60) logger.info("开始执行文章自动分配作者任务") logger.info("="*60) self.log_to_database('INFO', "开始执行文章自动分配作者任务") # 1. 获取待分配作者的文章 logger.info("步骤1: 获取待分配作者的文章") articles = self.get_pending_articles() if not articles: logger.info("没有待分配作者的文章,任务结束") self.log_to_database('INFO', "没有待分配作者的文章,任务结束") return True # 2. 获取活跃作者 logger.info("步骤2: 获取活跃作者") authors = self.get_active_authors() if not authors: logger.error("没有活跃作者,无法分配") self.log_to_database('ERROR', "没有活跃作者,无法分配") return False # 3. 批量分配作者 logger.info("步骤3: 批量分配作者") article_ids = [article['id'] for article in articles] author_ids = [author['id'] for author in authors] logger.info(f"准备分配 {len(article_ids)} 篇文章给 {len(author_ids)} 个作者") self.log_to_database('INFO', f"准备分配文章", f"文章数: {len(article_ids)}, 作者数: {len(author_ids)}") success = self.assign_authors_to_articles_batch(article_ids, author_ids) if success: logger.info("="*60) logger.info("文章自动分配作者任务完成") logger.info("="*60) self.log_to_database('INFO', "文章自动分配作者任务完成") return True else: logger.error("文章自动分配作者任务失败") self.log_to_database('ERROR', "文章自动分配作者任务失败") return False except Exception as e: error_msg = f"运行任务异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return False def main(): """主函数 - 持续监控模式""" generator = AssignAuthorsToArticles() logger.info("="*60) logger.info(f"启动文章自动分配作者监控服务") logger.info(f"监控间隔: {SLEEP_INTERVAL}秒") logger.info("="*60) # 首次登录获取JWT token logger.info("首次登录获取JWT token") if not generator.login_and_get_jwt_token(): logger.error("首次登录失败,程序退出") generator.log_to_database('ERROR', "首次登录失败,程序退出") return while True: try: # 执行文章自动分配作者任务 generator.run() # 等待下一次检查 logger.info(f"等待 {SLEEP_INTERVAL} 秒后进行下一次检查...") time.sleep(SLEEP_INTERVAL) except KeyboardInterrupt: logger.info("="*60) logger.info("收到中断信号,停止监控服务") logger.info("="*60) generator.log_to_database('INFO', '监控服务手动停止', 'KeyboardInterrupt') break except Exception as e: logger.error(f"程序运行异常: {e}") generator.log_to_database('ERROR', f'程序运行异常: {e}', traceback.format_exc()) logger.info(f"等待 {SLEEP_INTERVAL} 秒后重试...") time.sleep(SLEEP_INTERVAL) if __name__ == "__main__": main()