#!/usr/bin/env python # -*- coding: utf-8 -*- """ 文案库管理接口 """ from flask import Blueprint, request, jsonify import logging from datetime import datetime from auth_utils import require_auth, AuthUtils from database_config import get_db_manager from log_utils import log_create, log_update, log_delete, log_error, log_operation logger = logging.getLogger(__name__) # 创建蓝图 article_bp = Blueprint('article', __name__, url_prefix='/api/articles') @article_bp.route('/list', methods=['GET']) @require_auth def get_articles_list(): """获取文案列表""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 # 获取查询参数 page = int(request.args.get('page', 1)) page_size = int(request.args.get('pageSize', 20)) keyword = request.args.get('keyword', '').strip() product_id = request.args.get('product_id', '').strip() status = request.args.get('status', '').strip() # 构建查询条件 where_conditions = ["a.enterprise_id = %s"] params = [enterprise_id] if keyword: where_conditions.append("(a.title LIKE %s OR a.content LIKE %s OR a.topic LIKE %s)") keyword_pattern = f"%{keyword}%" params.extend([keyword_pattern, keyword_pattern, keyword_pattern]) if product_id: where_conditions.append("a.product_id = %s") params.append(product_id) if status: where_conditions.append("a.status = %s") params.append(status) where_clause = " AND ".join(where_conditions) # 计算偏移量 offset = (page - 1) * page_size db_manager = get_db_manager() # 查询总数 count_sql = f""" SELECT COUNT(*) as total FROM ai_articles a WHERE {where_clause} """ count_result = db_manager.execute_query(count_sql, params) total = count_result[0]['total'] # 查询文案列表 sql = f""" SELECT a.id, a.title, a.content, a.product_id, a.prompt_workflow_id, a.topic, a.status, a.created_at, a.updated_at, p.name as product_name, pw.prompt_workflow_name as prompt_name FROM ai_articles a LEFT JOIN ai_products p ON a.product_id = p.id LEFT JOIN ai_prompt_workflow pw ON a.prompt_workflow_id = pw.id WHERE {where_clause} ORDER BY a.created_at DESC LIMIT %s OFFSET %s """ params.extend([page_size, offset]) articles = db_manager.execute_query(sql, params) logger.info(f"获取文案列表成功,总数: {total}") return jsonify({ 'code': 200, 'message': 'success', 'data': { 'total': total, 'list': articles }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取文案列表] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @article_bp.route('/generate', methods=['POST']) @require_auth def generate_article(): """生成文案""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 data = request.get_json() if not data: return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 # 验证必需字段 required_fields = ['product_id', 'prompt_workflow_id', 'topics'] for field in required_fields: if not data.get(field): return jsonify({ 'code': 400, 'message': f'缺少必需字段: {field}', 'data': None }), 400 db_manager = get_db_manager() # 验证产品是否存在 check_product_sql = "SELECT id, name FROM ai_products WHERE id = %s AND enterprise_id = %s" product = db_manager.execute_query(check_product_sql, (data['product_id'], enterprise_id)) if not product: return jsonify({ 'code': 404, 'message': '产品不存在', 'data': None }), 404 # 验证提示词是否存在 check_prompt_sql = "SELECT id FROM ai_prompt_workflow WHERE id = %s AND enterprise_id = %s" prompt = db_manager.execute_query(check_prompt_sql, (data['prompt_workflow_id'], enterprise_id)) if not prompt: return jsonify({ 'code': 404, 'message': '提示词不存在', 'data': None }), 404 count = data.get('count', 1) topics = data['topics'][:count] if len(data['topics']) >= count else data['topics'] generated_articles = [] for topic in topics: # TODO: 这里应该调用AI接口生成文案内容 # 目前使用模拟数据 title = f"关于{product[0]['name']}的{topic}分享" content = f"这是一篇关于{topic}的精彩内容..." # 插入文案记录 sql = """ INSERT INTO ai_articles (enterprise_id, product_id, prompt_workflow_id, title, content, topic, status) VALUES (%s, %s, %s, %s, %s, %s, %s) """ article_id = db_manager.execute_insert(sql, ( enterprise_id, data['product_id'], data['prompt_workflow_id'], title, content, topic, 'draft' )) generated_articles.append({ 'id': article_id, 'title': title, 'topic': topic }) # 更新产品和企业文案总数 update_product_sql = "UPDATE ai_products SET articles_total = articles_total + %s WHERE id = %s" db_manager.execute_update(update_product_sql, (len(generated_articles), data['product_id'])) update_enterprise_sql = "UPDATE ai_enterprises SET articles_total = articles_total + %s WHERE id = %s" db_manager.execute_update(update_enterprise_sql, (len(generated_articles), enterprise_id)) # 更新提示词使用次数 update_prompt_sql = "UPDATE ai_prompt_workflow SET usage_count = usage_count + %s WHERE id = %s" db_manager.execute_update(update_prompt_sql, (len(generated_articles), data['prompt_workflow_id'])) logger.info(f"生成文案成功: {len(generated_articles)}篇") return jsonify({ 'code': 200, 'message': '生成成功', 'data': { 'generated': len(generated_articles), 'articles': generated_articles }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[生成文案] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @article_bp.route('/', methods=['GET']) @require_auth def get_article_detail(article_id): """获取文案详情""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # 查询文案详情 sql = """ SELECT a.id, a.title, a.content, a.product_id, a.prompt_workflow_id, a.topic, a.status, a.created_at, a.updated_at, p.name as product_name, pw.prompt_workflow_name as prompt_name FROM ai_articles a LEFT JOIN ai_products p ON a.product_id = p.id LEFT JOIN ai_prompt_workflow pw ON a.prompt_workflow_id = pw.id WHERE a.id = %s AND a.enterprise_id = %s """ result = db_manager.execute_query(sql, (article_id, enterprise_id)) if not result: return jsonify({ 'code': 404, 'message': '文案不存在', 'data': None }), 404 logger.info(f"获取文案详情成功: ID {article_id}") return jsonify({ 'code': 200, 'message': 'success', 'data': result[0], 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取文案详情] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @article_bp.route('/', methods=['DELETE']) @require_auth def delete_article(article_id): """删除文案""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # 检查文案是否存在且属于当前企业 check_sql = "SELECT id, product_id FROM ai_articles WHERE id = %s AND enterprise_id = %s" existing = db_manager.execute_query(check_sql, (article_id, enterprise_id)) if not existing: return jsonify({ 'code': 404, 'message': '文案不存在', 'data': None }), 404 product_id = existing[0]['product_id'] # 删除文案 sql = "DELETE FROM ai_articles WHERE id = %s" db_manager.execute_update(sql, (article_id,)) # 更新产品和企业文案总数 update_product_sql = "UPDATE ai_products SET articles_total = articles_total - 1 WHERE id = %s" db_manager.execute_update(update_product_sql, (product_id,)) update_enterprise_sql = "UPDATE ai_enterprises SET articles_total = articles_total - 1 WHERE id = %s" db_manager.execute_update(update_enterprise_sql, (enterprise_id,)) logger.info(f"删除文案成功: ID {article_id}") return jsonify({ 'code': 200, 'message': '删除成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[删除文案] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @article_bp.route('/stats', methods=['GET']) @require_auth def get_articles_stats(): """获取文案统计""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() sql = """ SELECT COUNT(*) as total, SUM(CASE WHEN status IN ('draft', 'approved') THEN 1 ELSE 0 END) as available, SUM(CASE WHEN status = 'published' THEN 1 ELSE 0 END) as published FROM ai_articles WHERE enterprise_id = %s """ result = db_manager.execute_query(sql, (enterprise_id,)) stats = result[0] if result else {} logger.info("获取文案统计成功") return jsonify({ 'code': 200, 'message': 'success', 'data': stats, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取文案统计] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500