#!/usr/bin/env python # -*- coding: utf-8 -*- """ 文章管理接口 """ from flask import Blueprint, request, jsonify import logging import json import time import random from datetime import datetime from auth_utils import require_auth, require_role, AuthUtils from database_config import get_db_manager, format_datetime_fields from log_utils import log_create, log_update, log_delete, log_error, log_operation logger = logging.getLogger('article_server') # 创建蓝图 article_bp = Blueprint('article', __name__, url_prefix='/api/articles') # 图片基础URL #IMAGE_BASE_URL_CDN1 = "http://images11.bxmkb.cn/Images/" IMAGE_BASE_URL_CDN1 = "https://bxmkb-beijing.oss-cn-beijing.aliyuncs.com/Images/" @article_bp.route('/list', methods=['GET']) @require_auth def get_articles_list(): """获取文章列表(聚合图片和标签)""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[获取文章列表] 开始处理请求, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') logger.info(f"[获取文章列表] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}") if not enterprise_id: logger.warning(f"[获取文章列表] 无法获取企业ID, IP: {client_ip}") return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 # 获取查询参数 page = int(request.args.get('page', 1)) page_size = int(request.args.get('pageSize', 20)) keyword = request.args.get('keyword', '').strip() product_id = request.args.get('product_id', '').strip() status = request.args.get('status', '').strip() logger.info(f"[获取文章列表] 查询参数: page={page}, pageSize={page_size}, keyword={keyword}, product_id={product_id}, status={status}, 企业ID: {enterprise_id}, IP: {client_ip}") # 构建查询条件 where_conditions = ["enterprise_id = %s"] params = [enterprise_id] if keyword: where_conditions.append("(title LIKE %s OR content LIKE %s OR topic LIKE %s)") keyword_pattern = f"%{keyword}%" params.extend([keyword_pattern, keyword_pattern, keyword_pattern]) if product_id: where_conditions.append("product_id = %s") params.append(product_id) if status: where_conditions.append("status = %s") params.append(status) where_clause = " AND ".join(where_conditions) # 计算偏移量 offset = (page - 1) * page_size db_manager = get_db_manager() # 查询总数 count_sql = f""" SELECT COUNT(*) as total FROM ai_articles WHERE {where_clause} """ count_result = db_manager.execute_query(count_sql, params) total = count_result[0]['total'] # ✅ 查询文章列表(直接从ai_articles表获取所有字段,避免跨表查询) sql = f""" SELECT id, batch_id, enterprise_id, product_id, product_name, topic_type_id, prompt_workflow_id, prompt_workflow_name, topic, title, context_summary, department, departmentids, author_id, author_name, department_id, department_name, created_user_id, review_user_id, publish_user_id, status, channel, review_comment, publish_time, publish_link, baijiahao_id, baijiahao_status, word_count, image_count, coze_tag, created_at, updated_at, product_name as product_name, prompt_workflow_name as prompt_name FROM ai_articles WHERE {where_clause} ORDER BY created_at DESC LIMIT %s OFFSET %s """ params.extend([page_size, offset]) articles = db_manager.execute_query(sql, params) # ✅ 批量查询所有文章的图片和标签(避免N+1查询) if articles: article_ids = [article['id'] for article in articles] placeholders = ','.join(['%s'] * len(article_ids)) # 批量查询图片 images_sql = f""" SELECT id, article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order, keywords_id, keywords_name, department_id, department_name, image_source, created_at FROM ai_article_images WHERE article_id IN ({placeholders}) ORDER BY article_id, sort_order ASC, created_at ASC """ all_images = db_manager.execute_query(images_sql, article_ids) # 批量查询标签 tags_sql = f""" SELECT id, article_id, coze_tag, created_at FROM ai_article_tags WHERE article_id IN ({placeholders}) """ all_tags = db_manager.execute_query(tags_sql, article_ids) # 构建图片和标签的字典映射 images_map = {} for img in all_images: aid = img['article_id'] if aid not in images_map: images_map[aid] = [] images_map[aid].append(img) tags_map = {} for tag in all_tags: tags_map[tag['article_id']] = tag # 将图片和标签分配给对应的文章 for article in articles: article['images'] = images_map.get(article['id'], []) article['tags'] = tags_map.get(article['id']) # 格式化日期时间字段 articles = format_datetime_fields(articles) logger.info(f"[获取文章列表] 查询成功, 总数: {total}, 当前页: {page}, 每页: {page_size}, 返回数量: {len(articles)}, 企业ID: {enterprise_id}, IP: {client_ip}") return jsonify({ 'code': 200, 'message': 'success', 'data': { 'total': total, 'list': articles }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取文章列表] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @article_bp.route('/generate', methods=['POST']) @require_auth def generate_article(): """生成文案""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[生成文案] 开始处理生成文案请求, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') logger.info(f"[生成文案] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}") if not enterprise_id: logger.warning(f"[生成文案] 无法获取企业ID, IP: {client_ip}") return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 data = request.get_json() if not data: logger.warning(f"[生成文案] 请求参数为空, 企业ID: {enterprise_id}, IP: {client_ip}") return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 logger.info(f"[生成文案] 收到生成请求, 产品ID: {data.get('product_id')}, 提示词ID: {data.get('prompt_workflow_id')}, 主题数: {len(data.get('topics', []))}, 企业ID: {enterprise_id}, IP: {client_ip}") # 验证必需字段 required_fields = ['product_id', 'prompt_workflow_id', 'topics'] for field in required_fields: if not data.get(field): logger.warning(f"[生成文案] 缺少必需字段: {field}, 企业ID: {enterprise_id}, IP: {client_ip}") return jsonify({ 'code': 400, 'message': f'缺少必需字段: {field}', 'data': None }), 400 db_manager = get_db_manager() # 验证产品是否存在 logger.info(f"[生成文案] 验证产品是否存在, 产品ID: {data['product_id']}, 企业ID: {enterprise_id}") check_product_sql = "SELECT id, name FROM ai_products WHERE id = %s AND enterprise_id = %s" product = db_manager.execute_query(check_product_sql, (data['product_id'], enterprise_id)) if not product: logger.warning(f"[生成文案] 产品不存在, 产品ID: {data['product_id']}, 企业ID: {enterprise_id}, IP: {client_ip}") return jsonify({ 'code': 404, 'message': '产品不存在', 'data': None }), 404 logger.info(f"[生成文案] 产品验证成功, 产品名称: {product[0]['name']}, ID: {data['product_id']}") # 验证提示词是否存在 logger.info(f"[生成文案] 验证提示词是否存在, 提示词ID: {data['prompt_workflow_id']}, 企业ID: {enterprise_id}") check_prompt_sql = "SELECT id, prompt_workflow_name FROM ai_prompt_workflow WHERE id = %s AND enterprise_id = %s" prompt = db_manager.execute_query(check_prompt_sql, (data['prompt_workflow_id'], enterprise_id)) if not prompt: logger.warning(f"[生成文案] 提示词不存在, 提示词ID: {data['prompt_workflow_id']}, 企业ID: {enterprise_id}, IP: {client_ip}") return jsonify({ 'code': 404, 'message': '提示词不存在', 'data': None }), 404 logger.info(f"[生成文案] 提示词验证成功, 名称: {prompt[0]['prompt_workflow_name']}, ID: {data['prompt_workflow_id']}") count = data.get('count', 1) topics = data['topics'][:count] if len(data['topics']) >= count else data['topics'] logger.info(f"[生成文案] 开始生成文案, 主题数量: {len(topics)}, 产品: {product[0]['name']}, 企业ID: {enterprise_id}") generated_articles = [] # 生成batch_id(时间戳 + 6位随机数) timestamp = int(time.time()) random_num = random.randint(100000, 999999) batch_id = f"{timestamp}{random_num}" logger.info(f"[批量生成文章] 生成batch_id: {batch_id}, 待处理数据行数: {len(topics)}") for topic in topics: logger.info(f"[生成文案] 开始生成主题文案: {topic}, 产品: {product[0]['name']}") # TODO: 这里应该调用AI接口生成文案内容 # 目前使用模拟数据 title = f"{topic}" #content = f"这是一篇关于{topic}的精彩内容..." # 插入文案记录(content字段先为空,等待后续脚本填充) sql = """ INSERT INTO ai_articles (enterprise_id, product_id, product_name, prompt_workflow_id, prompt_workflow_name, title, topic, content, status, batch_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ article_id = db_manager.execute_insert(sql, ( enterprise_id, data['product_id'], product[0]['name'], data['prompt_workflow_id'], prompt[0]['prompt_workflow_name'], title, topic, '', # ✅ content字段先设为空,等待后续脚本填充 'generate', batch_id )) logger.info(f"[生成文案] 文案生成成功, 文案ID: {article_id}, 主题: {topic}, 标题: {title}") generated_articles.append({ 'id': article_id, 'title': title, 'topic': topic }) # 更新产品和企业文案总数 update_product_sql = "UPDATE ai_products SET articles_total = articles_total + %s WHERE id = %s" db_manager.execute_update(update_product_sql, (len(generated_articles), data['product_id'])) update_enterprise_sql = "UPDATE ai_enterprises SET articles_total = articles_total + %s WHERE id = %s" db_manager.execute_update(update_enterprise_sql, (len(generated_articles), enterprise_id)) # 更新提示词使用次数 update_prompt_sql = "UPDATE ai_prompt_workflow SET usage_count = usage_count + %s WHERE id = %s" db_manager.execute_update(update_prompt_sql, (len(generated_articles), data['prompt_workflow_id'])) logger.info(f"生成文案成功: {len(generated_articles)}篇") return jsonify({ 'code': 200, 'message': '生成成功', 'data': { 'generated': len(generated_articles), 'articles': generated_articles }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[生成文案] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @article_bp.route('/create_Discard', methods=['POST']) @require_auth def create_article(): """创建文章""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[创建文章] 开始处理请求, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') user_id = current_user.get('user_id', 0) if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 data = request.get_json() if not data: return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 # 验证必需字段 required_fields = ['title', 'content'] for field in required_fields: if not data.get(field): return jsonify({ 'code': 400, 'message': f'缺少必需字段: {field}', 'data': None }), 400 db_manager = get_db_manager() # ✅ 插入文章主表 article_sql = """ INSERT INTO ai_articles (enterprise_id, product_id, topic_type_id, prompt_workflow_id, topic, title, content, department, departmentids, author_id, author_name, department_id, department_name, created_user_id, status, channel, word_count, image_count, batch_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ article_id = db_manager.execute_insert(article_sql, ( enterprise_id, data.get('product_id', 0), data.get('topic_type_id', 0), data.get('prompt_workflow_id', 0), data.get('topic', ''), data['title'], data['content'], data.get('department', ''), data.get('departmentids', ''), data.get('author_id'), data.get('author_name'), data.get('department_id'), data.get('department_name'), user_id, data.get('status', 'draft'), data.get('channel', 1), data.get('word_count', len(data['content'])), data.get('image_count', 0), data.get('batch_id', 0) )) # ✅ 插入文章图片 if data.get('images'): for img in data['images']: image_sql = """ INSERT INTO ai_article_images (enterprise_id, article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order, keywords_id, keywords_name, department_id, department_name, image_source) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ db_manager.execute_insert(image_sql, ( enterprise_id, article_id, img.get('image_id', 0), img.get('image_url', ''), img.get('image_thumb_url', ''), img.get('image_tag_id', 0), img.get('sort_order', 0), img.get('keywords_id', 0), img.get('keywords_name', ''), img.get('department_id', 0), img.get('department_name', ''), img.get('image_source', 0) )) # ✅ 插入文章标签 if data.get('coze_tag'): tag_sql = """ INSERT INTO ai_article_tags (enterprise_id, article_id, coze_tag) VALUES (%s, %s, %s) """ db_manager.execute_insert(tag_sql, (enterprise_id, article_id, data['coze_tag'])) logger.info(f"[创建文章] 创建成功, 文章ID: {article_id}, 企业ID: {enterprise_id}") return jsonify({ 'code': 200, 'message': '创建成功', 'data': { 'id': article_id, 'title': data['title'] }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[创建文章] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @article_bp.route('/', methods=['PUT']) @require_auth def update_article(article_id): """更新文章""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[更新文章] 开始处理请求, 文章ID: {article_id}, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 data = request.get_json() if not data: return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 db_manager = get_db_manager() # 检查文章是否存在且属于当前企业 check_sql = "SELECT id, status FROM ai_articles WHERE id = %s AND enterprise_id = %s" existing = db_manager.execute_query(check_sql, (article_id, enterprise_id)) if not existing: return jsonify({ 'code': 404, 'message': '文章不存在', 'data': None }), 404 old_status = existing[0]['status'] # ✅ 构建更新字段 update_fields = [] params = [] field_mapping = { 'product_id': 'product_id', 'topic_type_id': 'topic_type_id', 'prompt_workflow_id': 'prompt_workflow_id', 'topic': 'topic', 'title': 'title', 'content': 'content', 'department': 'department', 'departmentids': 'departmentids', 'author_id': 'author_id', 'author_name': 'author_name', 'department_id': 'department_id', 'department_name': 'department_name', 'status': 'status', 'channel': 'channel', 'review_comment': 'review_comment', 'publish_time': 'publish_time', 'publish_link': 'publish_link', 'baijiahao_id': 'baijiahao_id', 'baijiahao_status': 'baijiahao_status', 'word_count': 'word_count', 'image_count': 'image_count', 'batch_id': 'batch_id' } for field, db_field in field_mapping.items(): if field in data: update_fields.append(f"{db_field} = %s") params.append(data[field]) if update_fields: params.append(article_id) sql = f"UPDATE ai_articles SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s" db_manager.execute_update(sql, params) # ✅ 更新文章图片(先删除后插入) if 'images' in data: # 删除旧图片 db_manager.execute_update("DELETE FROM ai_article_images WHERE article_id = %s", (article_id,)) # 插入新图片 if data['images']: for img in data['images']: image_sql = """ INSERT INTO ai_article_images (enterprise_id, article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order, keywords_id, keywords_name, department_id, department_name, image_source) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ db_manager.execute_insert(image_sql, ( enterprise_id, article_id, img.get('image_id', 0), img.get('image_url', ''), img.get('image_thumb_url', ''), img.get('image_tag_id', 0), img.get('sort_order', 0), img.get('keywords_id', 0), img.get('keywords_name', ''), img.get('department_id', 0), img.get('department_name', ''), img.get('image_source', 0) )) # ✅ 更新文章标签 if 'coze_tag' in data: # 检查是否已存在标签 tag_check = db_manager.execute_query( "SELECT id FROM ai_article_tags WHERE article_id = %s", (article_id,) ) if tag_check: # 更新标签 db_manager.execute_update( "UPDATE ai_article_tags SET coze_tag = %s WHERE article_id = %s", (data['coze_tag'], article_id) ) else: # 插入标签 db_manager.execute_insert( "INSERT INTO ai_article_tags (enterprise_id, article_id, coze_tag) VALUES (%s, %s, %s)", (enterprise_id, article_id, data['coze_tag']) ) # ✅ 如果状态发生变化,记录到发布记录表 new_status = data.get('status') if new_status and new_status != old_status: # 查询文章信息 article_info = db_manager.execute_query( "SELECT product_id, topic, title FROM ai_articles WHERE id = %s", (article_id,) ) if article_info: record_sql = """ INSERT INTO ai_article_published_records (article_id, enterprise_id, product_id, topic, title, created_user_id, status, channel, word_count, image_count, publish_time, publish_link) SELECT id, enterprise_id, product_id, topic, title, created_user_id, status, channel, word_count, image_count, publish_time, publish_link FROM ai_articles WHERE id = %s """ db_manager.execute_insert(record_sql, (article_id,)) logger.info(f"[更新文章] 更新成功, 文章ID: {article_id}, 企业ID: {enterprise_id}") return jsonify({ 'code': 200, 'message': '更新成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[更新文章] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @article_bp.route('/', methods=['GET']) @require_auth def get_article_detail(article_id): """获取文章详情(包含图片、标签、发布记录)""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # ✅ 查询文章详情(包含所有字段) sql = """ SELECT a.id, a.batch_id, a.enterprise_id, a.product_id, a.topic_type_id, a.prompt_workflow_id, a.topic, a.title, a.content, a.department, a.departmentids, a.author_id, a.author_name, a.department_id, a.department_name, a.created_user_id, a.review_user_id, a.publish_user_id, a.status, a.channel, a.review_comment, a.publish_time, a.publish_link, a.baijiahao_id, a.baijiahao_status, a.word_count, a.image_count, a.coze_tag, a.created_at, a.updated_at, p.name as product_name, pw.prompt_workflow_name as prompt_name FROM ai_articles a LEFT JOIN ai_products p ON a.product_id = p.id LEFT JOIN ai_prompt_workflow pw ON a.prompt_workflow_id = pw.id WHERE a.id = %s AND a.enterprise_id = %s """ result = db_manager.execute_query(sql, (article_id, enterprise_id)) if not result: return jsonify({ 'code': 404, 'message': '文章不存在', 'data': None }), 404 article = result[0] # ✅ 查询文章图片 images_sql = """ SELECT id, image_id, image_url, image_thumb_url, image_tag_id, sort_order, keywords_id, keywords_name, department_id, department_name, image_source, created_at FROM ai_article_images WHERE article_id = %s ORDER BY sort_order ASC, created_at ASC """ images = db_manager.execute_query(images_sql, (article_id,)) # 为图片URL添加CDN前缀 for img in images: if img.get('image_url'): img['image_url'] = IMAGE_BASE_URL_CDN1 + img['image_url'] if img.get('image_thumb_url'): img['image_thumb_url'] = IMAGE_BASE_URL_CDN1 + img['image_thumb_url'] article['images'] = images # ✅ 查询文章标签 tags_sql = """ SELECT id, coze_tag, created_at FROM ai_article_tags WHERE article_id = %s """ tags_result = db_manager.execute_query(tags_sql, (article_id,)) article['tags'] = tags_result[0] if tags_result else None # ✅ 查询文章发布记录 records_sql = """ SELECT id, status, created_user_id, review_user_id, publish_user_id, review_comment, publish_time, publish_link, word_count, image_count, created_at FROM ai_article_published_records WHERE article_id = %s ORDER BY created_at DESC """ article['publish_records'] = db_manager.execute_query(records_sql, (article_id,)) # 格式化日期字段 article = format_datetime_fields([article])[0] logger.info(f"获取文章详情成功: ID {article_id}") return jsonify({ 'code': 200, 'message': 'success', 'data': article, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取文章详情] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @article_bp.route('/', methods=['DELETE']) @require_auth def delete_article(article_id): """删除文章(级联删除图片和标签)""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # 检查文章是否存在且属于当前企业 check_sql = "SELECT id, title FROM ai_articles WHERE id = %s AND enterprise_id = %s" existing = db_manager.execute_query(check_sql, (article_id, enterprise_id)) if not existing: return jsonify({ 'code': 404, 'message': '文章不存在', 'data': None }), 404 # ✅ 删除文章图片 db_manager.execute_update("DELETE FROM ai_article_images WHERE article_id = %s", (article_id,)) # ✅ 删除文章标签 db_manager.execute_update("DELETE FROM ai_article_tags WHERE article_id = %s", (article_id,)) # ✅ 删除文章主表 sql = "DELETE FROM ai_articles WHERE id = %s" db_manager.execute_update(sql, (article_id,)) logger.info(f"删除文章成功: ID {article_id}") return jsonify({ 'code': 200, 'message': '删除成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[删除文章] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @article_bp.route('/list_dashboard', methods=['GET']) @require_auth def get_articles_dashboard(): """获取文章仪表盘统计""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[文章仪表盘] 开始处理请求, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: logger.warning(f"[文章仪表盘] 无法获取企业ID, IP: {client_ip}") return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # ✅ 1. 文章总数:status != 'draft' articles_total_sql = """ SELECT COUNT(id) as total FROM ai_articles WHERE enterprise_id = %s """ articles_total_result = db_manager.execute_query(articles_total_sql, (enterprise_id,)) articles_total = articles_total_result[0]['total'] if articles_total_result else 0 # ✅ 2. 能发的文章:status = 'published_review' articles_available_sql = """ SELECT COUNT(id) as total FROM ai_articles WHERE enterprise_id = %s AND status = 'published_review' """ articles_available_result = db_manager.execute_query(articles_available_sql, (enterprise_id,)) articles_available = articles_available_result[0]['total'] if articles_available_result else 0 # ✅ 3. 发布成功:status = 'published' articles_published_sql = """ SELECT COUNT(id) as total FROM ai_articles WHERE enterprise_id = %s AND status = 'published' """ articles_published_result = db_manager.execute_query(articles_published_sql, (enterprise_id,)) articles_published = articles_published_result[0]['total'] if articles_published_result else 0 stats = { 'articles_total': articles_total, 'articles_available': articles_available, 'articles_published': articles_published } logger.info(f"[文章仪表盘] 查询成功, 企业ID: {enterprise_id}, 总数: {articles_total}, 可发: {articles_available}, 已发: {articles_published}, IP: {client_ip}") return jsonify({ 'code': 200, 'message': 'success', 'data': stats, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[文章仪表盘] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @article_bp.route('/batch-published-account-cycle', methods=['POST']) @require_auth @require_role('admin','editor','reviewer','publisher','enterprise') def batch_published_account_cycle_articles(): """批量发布文章 - 账号循环分配""" try: data = request.get_json() if not data: return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 article_ids = data.get('article_ids', []) author_ids = data.get('author_ids', []) logger.info(f"[批量发布文章-循环分配] 入参数据: {json.dumps(data, ensure_ascii=False)}") if not article_ids: return jsonify({ 'code': 400, 'message': '缺少文章ID列表', 'data': None }), 400 if not author_ids: return jsonify({ 'code': 400, 'message': '缺少作者ID列表', 'data': None }), 400 db_manager = get_db_manager() current_user = AuthUtils.get_current_user() # 对文章ID和作者ID进行有序排序(按ID从小到大) sorted_article_ids = sorted(article_ids) sorted_author_ids = sorted(author_ids) logger.info(f"[批量发布文章-循环分配] 排序后文章ID: {sorted_article_ids}") logger.info(f"[批量发布文章-循环分配] 排序后作者ID: {sorted_author_ids}") # 查询所有作者信息 author_placeholders = ','.join(['%s'] * len(sorted_author_ids)) author_sql = f""" SELECT id, app_id, app_token, author_name, department_id, department_name FROM ai_authors WHERE id IN ({author_placeholders}) AND status = 'active' ORDER BY id ASC """ author_results = db_manager.execute_query(author_sql, sorted_author_ids) logger.info(f"[批量发布文章-循环分配] 查询到作者信息: {json.dumps(author_results, ensure_ascii=False)}") if not author_results: return jsonify({ 'code': 400, 'message': '未找到有效的作者信息', 'data': None }), 400 # 创建作者信息字典,按ID排序 authors_dict = {} for author in author_results: authors_dict[author['id']] = author logger.info(f"[批量发布文章authors_dict] 查询到作者信息: {json.dumps(authors_dict, ensure_ascii=False)}") # 一次性查询文章信息并筛选有效文章 article_placeholders = ','.join(['%s'] * len(sorted_article_ids)) article_sql = f""" SELECT id, title, status FROM ai_articles WHERE id IN ({article_placeholders}) AND status = 'pending_review' ORDER BY id ASC """ logger.info(f"[批量发布文章-循环分配] 执行SQL查询文章: {article_sql} - 参数: {sorted_article_ids}") articles = db_manager.execute_query(article_sql, sorted_article_ids) logger.info(f"[批量发布文章-循环分配] 查询结果: 请求{len(sorted_article_ids)}篇,找到{len(articles)}篇有效文章") if not articles: return jsonify({ 'code': 400, 'message': f'未找到状态为pending_review的文章: {sorted_article_ids}', 'data': { 'requested_articles': sorted_article_ids, 'found_articles': 0 } }), 400 published_count = 0 failed_count = 0 publish_results = [] assignment_results = [] # 实现循环分配逻辑 try: logger.info(f"[批量发布文章-循环分配] 开始循环分配处理 {len(articles)} 篇文章") # 循环分配文章到作者 for i, article in enumerate(articles): # 使用模运算实现循环分配 author_index = i % len(sorted_author_ids) assigned_author_id = sorted_author_ids[author_index] # 获取作者信息,如果找不到则从头重新分配 author_info = authors_dict.get(assigned_author_id) retry_count = 0 max_retries = len(sorted_author_ids) # ✅ 如果找不到作者信息,从头开始重新循环分配 while not author_info and retry_count < max_retries: retry_count += 1 author_index = (author_index + 1) % len(sorted_author_ids) assigned_author_id = sorted_author_ids[author_index] author_info = authors_dict.get(assigned_author_id) logger.warning(f"[批量发布文章-循环分配] 作者ID {sorted_author_ids[author_index - 1]} 无效,重试第{retry_count}次,尝试作者ID {assigned_author_id}") # 如果尝试所有作者后仍未找到有效作者,则标记失败 if not author_info: logger.error(f"[批量发布文章-循环分配] 文章 {article['id']} 尝试所有作者后仍未找到有效作者信息") failed_count += 1 publish_results.append({ 'article_id': article['id'], 'title': article['title'], 'status': 'failed', 'error': '所有作者都无效,无法分配作者' }) continue article_id = article['id'] title = article['title'] author_name = author_info['author_name'] department_id = author_info['department_id'] department_name = author_info['department_name'] # ✅ 最终验证:确保作者信息完整 if not assigned_author_id or not author_name: error_msg = f"文章 {article_id} 作者信息不完整,assigned_author_id={assigned_author_id}, author_name={author_name}" logger.error(f"[批量发布文章-循环分配] {error_msg}") failed_count += 1 publish_results.append({ 'article_id': article_id, 'title': title, 'status': 'failed', 'error': '作者信息不完整,无法分配作者' }) continue # 记录分配结果 assignment_results.append({ 'article_id': article_id, 'article_title': title, 'author_id': assigned_author_id, 'author_name': author_name, 'department_id': department_id, 'department_name': department_name }) logger.info(f"[批量发布文章-循环分配] 文章 {article_id} 分配给作者 {assigned_author_id} ({author_name})") # ✅ 最终验证:确保所有文章都已分配作者 logger.info(f"[批量发布文章-循环分配] 分配完成统计 - 请求文章数: {len(articles)}, 成功分配: {len(assignment_results)}, 失败: {failed_count}") if len(assignment_results) == 0: return jsonify({ 'code': 400, 'message': '所有文章都未能成功分配作者,批量发布失败', 'data': { 'total_articles': len(articles), 'assigned_articles': 0, 'failed_articles': failed_count, 'results': publish_results } }), 400 # 批量更新文章状态 for assignment in assignment_results: try: update_sql = """ UPDATE ai_articles SET status = 'assign_authors', author_id = %s, author_name = %s, department_name = %s, department_id = %s, publish_user_id = %s, publish_time = NOW(), updated_at = NOW() WHERE id = %s """ update_params = ( assignment['author_id'], assignment['author_name'], assignment['department_name'], assignment['department_id'], current_user['user_id'], assignment['article_id'] ) logger.info(f"[批量发布文章-循环分配] 执行SQL更新: {update_sql} - 参数: {update_params}") affected_rows = db_manager.execute_update(update_sql, update_params) logger.info(f"[批量发布文章-循环分配] 更新文章 {assignment['article_id']} 完成,影响行数: {affected_rows}") # 生成百家号ID(模拟) baijiahao_id = f"bjh_{assignment['article_id']}_{int(time.time())}" published_count += 1 publish_results.append({ 'article_id': assignment['article_id'], 'title': assignment['article_title'], 'author_id': assignment['author_id'], 'author_name': assignment['author_name'], 'status': 'success', 'baijiahao_id': baijiahao_id }) logger.info(f"[批量发布文章-循环分配] 文章发布成功: ID={assignment['article_id']}, 标题={assignment['article_title']}, 作者={assignment['author_name']}, 百家号ID: {baijiahao_id}") except Exception as update_error: logger.error(f"[批量发布文章-循环分配] 更新文章 {assignment['article_id']} 失败: {str(update_error)}") failed_count += 1 publish_results.append({ 'article_id': assignment['article_id'], 'title': assignment['article_title'], 'author_id': assignment['author_id'], 'author_name': assignment['author_name'], 'status': 'failed', 'error': str(update_error) }) except Exception as e: # 批量发布异常处理 logger.error(f"[批量发布文章-循环分配] 批量发布异常: {str(e)}", exc_info=True) # 将所有文章标记为失败 for article in articles: article_id = article['id'] title = article['title'] failed_count += 1 publish_results.append({ 'article_id': article_id, 'title': title, 'status': 'error', 'error': str(e) }) logger.error(f"[批量发布文章-循环分配] 文章发布异常: ID={article_id}, 标题={title}, 错误: {str(e)}") # 记录操作日志 log_sql = """ INSERT INTO ai_logs (user_id, action, target_type, target_id, description, ip_address, user_agent, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) user_agent = request.headers.get('User-Agent', '未知') log_params = ( current_user['user_id'], 'batch_publish_articles_cycle', 'articles', None, # target_id 设为 NULL,因为是批量操作 f'批量发布文章(循环分配),成功: {published_count},失败: {failed_count},文章ID: {",".join(map(str, sorted_article_ids))},作者ID: {",".join(map(str, sorted_author_ids))}', client_ip, user_agent, 'success' if failed_count == 0 else 'warning' ) logger.info(f"[批量发布文章-循环分配] 执行SQL插入日志: {log_sql} - 参数: {log_params}") db_manager.execute_insert(log_sql, log_params) logger.info(f"[批量发布文章-循环分配] 操作日志记录成功") logger.info(f"批量发布文章(循环分配)完成,成功: {published_count},失败: {failed_count}") return jsonify({ 'code': 200, 'message': f'批量发布(循环分配)完成,成功: {published_count},失败: {failed_count}', 'data': { 'published_count': published_count, 'failed_count': failed_count, 'total_count': len(articles), 'assignment_results': assignment_results, 'results': publish_results }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[批量发布文章-循环分配] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @article_bp.route('/batch-published-review', methods=['POST']) @require_auth @require_role('admin','editor','reviewer','publisher','enterprise') def batch_published_review_articles(): """批量发布文章""" try: data = request.get_json() if not data: return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 article_ids = data.get('article_ids', []) author_id = data.get('author_id') logger.info(f"[push文章] 入参数据: {json.dumps(data, ensure_ascii=False)}") if not article_ids: return jsonify({ 'code': 400, 'message': '缺少文章ID列表', 'data': None }), 400 if not author_id: return jsonify({ 'code': 400, 'message': '缺少作者ID', 'data': None }), 400 db_manager = get_db_manager() current_user = AuthUtils.get_current_user() # 获取作者的app_id和app_token信息 author_sql = """ SELECT app_id, app_token, author_name, department_id, department_name FROM ai_authors WHERE id = %s AND status = 'active' """ author_result = db_manager.execute_query(author_sql, (author_id,)) logger.info(f"[医生信息] 数据库返回数据: {json.dumps(author_result, ensure_ascii=False)}") if not author_result: return jsonify({ 'code': 400, 'message': '作者信息不存在或已禁用', 'data': None }), 400 app_id = author_result[0]['app_id'] app_token = author_result[0]['app_token'] author_name = author_result[0]['author_name'] department_id = author_result[0]['department_id'] department_name = author_result[0]['department_name'] # 一次性查询文章信息并筛选有效文章 article_sql = """ SELECT id, title, status FROM ai_articles WHERE id IN %s AND status = 'approved' """ logger.info(f"[批量发布文章] 执行SQL查询文章: {article_sql} - 参数: {tuple(article_ids)}") articles = db_manager.execute_query(article_sql, (tuple(article_ids),)) logger.info(f"[批量发布文章] 查询结果: 请求{len(article_ids)}篇,找到{len(articles)}篇有效文章") if not articles: return jsonify({ 'code': 400, 'message': f'未找到状态为approved的文章: {article_ids}', 'data': { 'requested_articles': article_ids, 'found_articles': 0 } }), 400 published_count = 0 failed_count = 0 publish_results = [] # 批量发布文章到百家号 try: logger.info(f"[批量发布文章] 开始批量处理 {len(articles)} 篇文章") # 模拟百家号发布逻辑(这里需要根据实际API进行调整) # 由于没有实际的百家号API调用,这里直接标记为成功 publish_success = True if publish_success: # 批量更新文章状态 - 使用 IN 子句进行批量更新 article_ids_str = ','.join(map(str, article_ids)) update_sql = f""" UPDATE ai_articles SET status = 'assign_authors', author_id = %s, author_name = %s, department_name = %s, department_id = %s, publish_user_id = %s, publish_time = NOW(), updated_at = NOW() WHERE id IN ({article_ids_str}) """ update_params = (author_id, author_name, department_name, department_id, current_user['user_id']) logger.info(f"[批量发布文章] 执行批量SQL更新: {update_sql} - 参数: {update_params}") affected_rows = db_manager.execute_update(update_sql, update_params) logger.info(f"[批量发布文章] 批量更新完成,影响行数: {affected_rows}") # 为每篇文章生成发布结果 for article in articles: article_id = article['id'] title = article['title'] baijiahao_id = f"bjh_{article_id}_{int(time.time())}" published_count += 1 publish_results.append({ 'article_id': article_id, 'title': title, 'status': 'success', 'baijiahao_id': baijiahao_id }) logger.info(f"[批量发布文章] 文章发布成功: ID={article_id}, 标题={title}, 百家号ID: {baijiahao_id}") else: # 批量发布失败 error_msg = '百家号批量发布失败' # 批量更新文章状态为失败 article_ids_str = ','.join(map(str, article_ids)) update_sql = f""" UPDATE ai_articles SET status = 'failed', updated_at = NOW() WHERE id IN ({article_ids_str}) """ db_manager.execute_update(update_sql, ()) for article in articles: article_id = article['id'] title = article['title'] failed_count += 1 publish_results.append({ 'article_id': article_id, 'title': title, 'status': 'failed', 'error': error_msg }) logger.error(f"[批量发布文章] 文章发布失败: ID={article_id}, 标题={title}, 错误: {error_msg}") except Exception as e: # 批量发布异常处理 logger.error(f"[批量发布文章] 批量发布异常: {str(e)}", exc_info=True) # 将所有文章标记为失败 for article in articles: article_id = article['id'] title = article['title'] failed_count += 1 publish_results.append({ 'article_id': article_id, 'title': title, 'status': 'error', 'error': str(e) }) logger.error(f"[批量发布文章] 文章发布异常: ID={article_id}, 标题={title}, 错误: {str(e)}") # 记录操作日志 log_sql = """ INSERT INTO ai_logs (user_id, action, target_type, target_id, description, ip_address, user_agent, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) user_agent = request.headers.get('User-Agent', '未知') log_params = ( current_user['user_id'], 'batch_publish_articles', 'articles', None, # target_id 设为 NULL,因为是批量操作 f'批量发布文章,成功: {published_count},失败: {failed_count},文章ID: {",".join(map(str, article_ids))}', client_ip, user_agent, 'success' if failed_count == 0 else 'warning' ) logger.info(f"[批量发布文章] 执行SQL插入日志: {log_sql} - 参数: {log_params}") db_manager.execute_insert(log_sql, log_params) logger.info(f"[批量发布文章] 操作日志记录成功") logger.info(f"批量发布文章完成,成功: {published_count},失败: {failed_count}") return jsonify({ 'code': 200, 'message': f'批量发布完成,成功: {published_count},失败: {failed_count}', 'data': { 'published_count': published_count, 'failed_count': failed_count, 'total_count': len(articles), 'results': publish_results }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[批量发布文章] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500