Files
ai_wht_B/ver_25121821/article_routes.py

814 lines
32 KiB
Python
Raw Permalink Normal View History

2026-01-06 14:18:39 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
文章管理接口
"""
from flask import Blueprint, request, jsonify
import logging
import time
import random
from datetime import datetime
from auth_utils import require_auth, AuthUtils
from database_config import get_db_manager, format_datetime_fields
from log_utils import log_create, log_update, log_delete, log_error, log_operation
logger = logging.getLogger('article_server')
# 创建蓝图
article_bp = Blueprint('article', __name__, url_prefix='/api/articles')
@article_bp.route('/list', methods=['GET'])
@require_auth
def get_articles_list():
"""获取文章列表(聚合图片和标签)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取文章列表] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
logger.info(f"[获取文章列表] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}")
if not enterprise_id:
logger.warning(f"[获取文章列表] 无法获取企业ID, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
# 获取查询参数
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
keyword = request.args.get('keyword', '').strip()
product_id = request.args.get('product_id', '').strip()
status = request.args.get('status', '').strip()
logger.info(f"[获取文章列表] 查询参数: page={page}, pageSize={page_size}, keyword={keyword}, product_id={product_id}, status={status}, 企业ID: {enterprise_id}, IP: {client_ip}")
# 构建查询条件
where_conditions = ["a.enterprise_id = %s"]
params = [enterprise_id]
if keyword:
where_conditions.append("(a.title LIKE %s OR a.content LIKE %s OR a.topic LIKE %s)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
if product_id:
where_conditions.append("a.product_id = %s")
params.append(product_id)
if status:
where_conditions.append("a.status = %s")
params.append(status)
where_clause = " AND ".join(where_conditions)
# 计算偏移量
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 查询总数
count_sql = f"""
SELECT COUNT(*) as total
FROM ai_articles a
WHERE {where_clause}
"""
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# ✅ 查询文章列表(聚合图片和标签)
sql = f"""
SELECT a.id, a.batch_id, a.enterprise_id, a.product_id, a.topic_type_id,
a.prompt_workflow_id, a.topic, a.title, a.content, a.department,
a.departmentids, a.author_id, a.author_name, a.department_id, a.department_name,
a.created_user_id, a.review_user_id, a.publish_user_id, a.status, a.channel,
a.review_comment, a.publish_time, a.publish_link, a.baijiahao_id, a.baijiahao_status,
a.word_count, a.image_count, a.coze_tag, a.created_at, a.updated_at,
p.name as product_name,
pw.prompt_workflow_name as prompt_name
FROM ai_articles a
LEFT JOIN ai_products p ON a.product_id = p.id
LEFT JOIN ai_prompt_workflow pw ON a.prompt_workflow_id = pw.id
WHERE {where_clause}
ORDER BY a.created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
articles = db_manager.execute_query(sql, params)
# ✅ 聚合每篇文章的图片和标签
for article in articles:
article_id = article['id']
# 查询文章图片
images_sql = """
SELECT id, image_id, image_url, image_thumb_url, image_tag_id,
sort_order, keywords_id, keywords_name, department_id,
department_name, image_source, created_at
FROM ai_article_images
WHERE article_id = %s
ORDER BY sort_order ASC, created_at ASC
"""
article['images'] = db_manager.execute_query(images_sql, (article_id,))
# 查询文章标签
tags_sql = """
SELECT id, coze_tag, created_at
FROM ai_article_tags
WHERE article_id = %s
"""
tags_result = db_manager.execute_query(tags_sql, (article_id,))
article['tags'] = tags_result[0] if tags_result else None
# 格式化日期时间字段
articles = format_datetime_fields(articles)
logger.info(f"[获取文章列表] 查询成功, 总数: {total}, 当前页: {page}, 每页: {page_size}, 返回数量: {len(articles)}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 200,
'message': 'success',
'data': {
'total': total,
'list': articles
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取文章列表] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/generate', methods=['POST'])
@require_auth
def generate_article():
"""生成文案"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[生成文案] 开始处理生成文案请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
logger.info(f"[生成文案] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}")
if not enterprise_id:
logger.warning(f"[生成文案] 无法获取企业ID, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
logger.warning(f"[生成文案] 请求参数为空, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
logger.info(f"[生成文案] 收到生成请求, 产品ID: {data.get('product_id')}, 提示词ID: {data.get('prompt_workflow_id')}, 主题数: {len(data.get('topics', []))}, 企业ID: {enterprise_id}, IP: {client_ip}")
# 验证必需字段
required_fields = ['product_id', 'prompt_workflow_id', 'topics']
for field in required_fields:
if not data.get(field):
logger.warning(f"[生成文案] 缺少必需字段: {field}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 400,
'message': f'缺少必需字段: {field}',
'data': None
}), 400
db_manager = get_db_manager()
# 验证产品是否存在
logger.info(f"[生成文案] 验证产品是否存在, 产品ID: {data['product_id']}, 企业ID: {enterprise_id}")
check_product_sql = "SELECT id, name FROM ai_products WHERE id = %s AND enterprise_id = %s"
product = db_manager.execute_query(check_product_sql, (data['product_id'], enterprise_id))
if not product:
logger.warning(f"[生成文案] 产品不存在, 产品ID: {data['product_id']}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
logger.info(f"[生成文案] 产品验证成功, 产品名称: {product[0]['name']}, ID: {data['product_id']}")
# 验证提示词是否存在
logger.info(f"[生成文案] 验证提示词是否存在, 提示词ID: {data['prompt_workflow_id']}, 企业ID: {enterprise_id}")
check_prompt_sql = "SELECT id FROM ai_prompt_workflow WHERE id = %s AND enterprise_id = %s"
prompt = db_manager.execute_query(check_prompt_sql, (data['prompt_workflow_id'], enterprise_id))
if not prompt:
logger.warning(f"[生成文案] 提示词不存在, 提示词ID: {data['prompt_workflow_id']}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 404,
'message': '提示词不存在',
'data': None
}), 404
logger.info(f"[生成文案] 提示词验证成功, ID: {data['prompt_workflow_id']}")
count = data.get('count', 1)
topics = data['topics'][:count] if len(data['topics']) >= count else data['topics']
logger.info(f"[生成文案] 开始生成文案, 主题数量: {len(topics)}, 产品: {product[0]['name']}, 企业ID: {enterprise_id}")
generated_articles = []
# 生成batch_id时间戳 + 6位随机数
timestamp = int(time.time())
random_num = random.randint(100000, 999999)
batch_id = f"{timestamp}{random_num}"
logger.info(f"[批量生成文章] 生成batch_id: {batch_id}, 待处理数据行数: {len(topics)}")
for topic in topics:
logger.info(f"[生成文案] 开始生成主题文案: {topic}, 产品: {product[0]['name']}")
# TODO: 这里应该调用AI接口生成文案内容
# 目前使用模拟数据
title = f"{topic}"
#content = f"这是一篇关于{topic}的精彩内容..."
# 插入文案记录content字段先为空等待后续脚本填充
sql = """
INSERT INTO ai_articles
(enterprise_id, product_id, prompt_workflow_id, title, topic, content, status, batch_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
article_id = db_manager.execute_insert(sql, (
enterprise_id,
data['product_id'],
data['prompt_workflow_id'],
title,
topic,
'', # ✅ content字段先设为空等待后续脚本填充
'generate',
batch_id
))
logger.info(f"[生成文案] 文案生成成功, 文案ID: {article_id}, 主题: {topic}, 标题: {title}")
generated_articles.append({
'id': article_id,
'title': title,
'topic': topic
})
# 更新产品和企业文案总数
update_product_sql = "UPDATE ai_products SET articles_total = articles_total + %s WHERE id = %s"
db_manager.execute_update(update_product_sql, (len(generated_articles), data['product_id']))
update_enterprise_sql = "UPDATE ai_enterprises SET articles_total = articles_total + %s WHERE id = %s"
db_manager.execute_update(update_enterprise_sql, (len(generated_articles), enterprise_id))
# 更新提示词使用次数
update_prompt_sql = "UPDATE ai_prompt_workflow SET usage_count = usage_count + %s WHERE id = %s"
db_manager.execute_update(update_prompt_sql, (len(generated_articles), data['prompt_workflow_id']))
logger.info(f"生成文案成功: {len(generated_articles)}")
return jsonify({
'code': 200,
'message': '生成成功',
'data': {
'generated': len(generated_articles),
'articles': generated_articles
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[生成文案] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/create_Discard', methods=['POST'])
@require_auth
def create_article():
"""创建文章"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[创建文章] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
user_id = current_user.get('user_id', 0)
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
# 验证必需字段
required_fields = ['title', 'content']
for field in required_fields:
if not data.get(field):
return jsonify({
'code': 400,
'message': f'缺少必需字段: {field}',
'data': None
}), 400
db_manager = get_db_manager()
# ✅ 插入文章主表
article_sql = """
INSERT INTO ai_articles
(enterprise_id, product_id, topic_type_id, prompt_workflow_id, topic, title, content,
department, departmentids, author_id, author_name, department_id, department_name,
created_user_id, status, channel, word_count, image_count, batch_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
article_id = db_manager.execute_insert(article_sql, (
enterprise_id,
data.get('product_id', 0),
data.get('topic_type_id', 0),
data.get('prompt_workflow_id', 0),
data.get('topic', ''),
data['title'],
data['content'],
data.get('department', ''),
data.get('departmentids', ''),
data.get('author_id'),
data.get('author_name'),
data.get('department_id'),
data.get('department_name'),
user_id,
data.get('status', 'draft'),
data.get('channel', 1),
data.get('word_count', len(data['content'])),
data.get('image_count', 0),
data.get('batch_id', 0)
))
# ✅ 插入文章图片
if data.get('images'):
for img in data['images']:
image_sql = """
INSERT INTO ai_article_images
(enterprise_id, article_id, image_id, image_url, image_thumb_url,
image_tag_id, sort_order, keywords_id, keywords_name,
department_id, department_name, image_source)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
db_manager.execute_insert(image_sql, (
enterprise_id,
article_id,
img.get('image_id', 0),
img.get('image_url', ''),
img.get('image_thumb_url', ''),
img.get('image_tag_id', 0),
img.get('sort_order', 0),
img.get('keywords_id', 0),
img.get('keywords_name', ''),
img.get('department_id', 0),
img.get('department_name', ''),
img.get('image_source', 0)
))
# ✅ 插入文章标签
if data.get('coze_tag'):
tag_sql = """
INSERT INTO ai_article_tags (enterprise_id, article_id, coze_tag)
VALUES (%s, %s, %s)
"""
db_manager.execute_insert(tag_sql, (enterprise_id, article_id, data['coze_tag']))
logger.info(f"[创建文章] 创建成功, 文章ID: {article_id}, 企业ID: {enterprise_id}")
return jsonify({
'code': 200,
'message': '创建成功',
'data': {
'id': article_id,
'title': data['title']
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[创建文章] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/<int:article_id>', methods=['PUT'])
@require_auth
def update_article(article_id):
"""更新文章"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[更新文章] 开始处理请求, 文章ID: {article_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
db_manager = get_db_manager()
# 检查文章是否存在且属于当前企业
check_sql = "SELECT id, status FROM ai_articles WHERE id = %s AND enterprise_id = %s"
existing = db_manager.execute_query(check_sql, (article_id, enterprise_id))
if not existing:
return jsonify({
'code': 404,
'message': '文章不存在',
'data': None
}), 404
old_status = existing[0]['status']
# ✅ 构建更新字段
update_fields = []
params = []
field_mapping = {
'product_id': 'product_id',
'topic_type_id': 'topic_type_id',
'prompt_workflow_id': 'prompt_workflow_id',
'topic': 'topic',
'title': 'title',
'content': 'content',
'department': 'department',
'departmentids': 'departmentids',
'author_id': 'author_id',
'author_name': 'author_name',
'department_id': 'department_id',
'department_name': 'department_name',
'status': 'status',
'channel': 'channel',
'review_comment': 'review_comment',
'publish_time': 'publish_time',
'publish_link': 'publish_link',
'baijiahao_id': 'baijiahao_id',
'baijiahao_status': 'baijiahao_status',
'word_count': 'word_count',
'image_count': 'image_count',
'batch_id': 'batch_id'
}
for field, db_field in field_mapping.items():
if field in data:
update_fields.append(f"{db_field} = %s")
params.append(data[field])
if update_fields:
params.append(article_id)
sql = f"UPDATE ai_articles SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, params)
# ✅ 更新文章图片(先删除后插入)
if 'images' in data:
# 删除旧图片
db_manager.execute_update("DELETE FROM ai_article_images WHERE article_id = %s", (article_id,))
# 插入新图片
if data['images']:
for img in data['images']:
image_sql = """
INSERT INTO ai_article_images
(enterprise_id, article_id, image_id, image_url, image_thumb_url,
image_tag_id, sort_order, keywords_id, keywords_name,
department_id, department_name, image_source)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
db_manager.execute_insert(image_sql, (
enterprise_id,
article_id,
img.get('image_id', 0),
img.get('image_url', ''),
img.get('image_thumb_url', ''),
img.get('image_tag_id', 0),
img.get('sort_order', 0),
img.get('keywords_id', 0),
img.get('keywords_name', ''),
img.get('department_id', 0),
img.get('department_name', ''),
img.get('image_source', 0)
))
# ✅ 更新文章标签
if 'coze_tag' in data:
# 检查是否已存在标签
tag_check = db_manager.execute_query(
"SELECT id FROM ai_article_tags WHERE article_id = %s",
(article_id,)
)
if tag_check:
# 更新标签
db_manager.execute_update(
"UPDATE ai_article_tags SET coze_tag = %s WHERE article_id = %s",
(data['coze_tag'], article_id)
)
else:
# 插入标签
db_manager.execute_insert(
"INSERT INTO ai_article_tags (enterprise_id, article_id, coze_tag) VALUES (%s, %s, %s)",
(enterprise_id, article_id, data['coze_tag'])
)
# ✅ 如果状态发生变化,记录到发布记录表
new_status = data.get('status')
if new_status and new_status != old_status:
# 查询文章信息
article_info = db_manager.execute_query(
"SELECT product_id, topic, title FROM ai_articles WHERE id = %s",
(article_id,)
)
if article_info:
record_sql = """
INSERT INTO ai_article_published_records
(article_id, enterprise_id, product_id, topic, title, created_user_id,
status, channel, word_count, image_count, publish_time, publish_link)
SELECT id, enterprise_id, product_id, topic, title, created_user_id,
status, channel, word_count, image_count, publish_time, publish_link
FROM ai_articles
WHERE id = %s
"""
db_manager.execute_insert(record_sql, (article_id,))
logger.info(f"[更新文章] 更新成功, 文章ID: {article_id}, 企业ID: {enterprise_id}")
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新文章] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/<int:article_id>', methods=['GET'])
@require_auth
def get_article_detail(article_id):
"""获取文章详情(包含图片、标签、发布记录)"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# ✅ 查询文章详情(包含所有字段)
sql = """
SELECT a.id, a.batch_id, a.enterprise_id, a.product_id, a.topic_type_id,
a.prompt_workflow_id, a.topic, a.title, a.content, a.department,
a.departmentids, a.author_id, a.author_name, a.department_id, a.department_name,
a.created_user_id, a.review_user_id, a.publish_user_id, a.status, a.channel,
a.review_comment, a.publish_time, a.publish_link, a.baijiahao_id, a.baijiahao_status,
a.word_count, a.image_count, a.coze_tag, a.created_at, a.updated_at,
p.name as product_name,
pw.prompt_workflow_name as prompt_name
FROM ai_articles a
LEFT JOIN ai_products p ON a.product_id = p.id
LEFT JOIN ai_prompt_workflow pw ON a.prompt_workflow_id = pw.id
WHERE a.id = %s AND a.enterprise_id = %s
"""
result = db_manager.execute_query(sql, (article_id, enterprise_id))
if not result:
return jsonify({
'code': 404,
'message': '文章不存在',
'data': None
}), 404
article = result[0]
# ✅ 查询文章图片
images_sql = """
SELECT id, image_id, image_url, image_thumb_url, image_tag_id,
sort_order, keywords_id, keywords_name, department_id,
department_name, image_source, created_at
FROM ai_article_images
WHERE article_id = %s
ORDER BY sort_order ASC, created_at ASC
"""
article['images'] = db_manager.execute_query(images_sql, (article_id,))
# ✅ 查询文章标签
tags_sql = """
SELECT id, coze_tag, created_at
FROM ai_article_tags
WHERE article_id = %s
"""
tags_result = db_manager.execute_query(tags_sql, (article_id,))
article['tags'] = tags_result[0] if tags_result else None
# ✅ 查询文章发布记录
records_sql = """
SELECT id, status, created_user_id, review_user_id, publish_user_id,
review_comment, publish_time, publish_link, word_count, image_count, created_at
FROM ai_article_published_records
WHERE article_id = %s
ORDER BY created_at DESC
"""
article['publish_records'] = db_manager.execute_query(records_sql, (article_id,))
# 格式化日期字段
article = format_datetime_fields([article])[0]
logger.info(f"获取文章详情成功: ID {article_id}")
return jsonify({
'code': 200,
'message': 'success',
'data': article,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取文章详情] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/<int:article_id>', methods=['DELETE'])
@require_auth
def delete_article(article_id):
"""删除文章(级联删除图片和标签)"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 检查文章是否存在且属于当前企业
check_sql = "SELECT id, title FROM ai_articles WHERE id = %s AND enterprise_id = %s"
existing = db_manager.execute_query(check_sql, (article_id, enterprise_id))
if not existing:
return jsonify({
'code': 404,
'message': '文章不存在',
'data': None
}), 404
# ✅ 删除文章图片
db_manager.execute_update("DELETE FROM ai_article_images WHERE article_id = %s", (article_id,))
# ✅ 删除文章标签
db_manager.execute_update("DELETE FROM ai_article_tags WHERE article_id = %s", (article_id,))
# ✅ 删除文章主表
sql = "DELETE FROM ai_articles WHERE id = %s"
db_manager.execute_update(sql, (article_id,))
logger.info(f"删除文章成功: ID {article_id}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除文章] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/list_dashboard', methods=['GET'])
@require_auth
def get_articles_dashboard():
"""获取文章仪表盘统计"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[文章仪表盘] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
logger.warning(f"[文章仪表盘] 无法获取企业ID, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# ✅ 1. 文章总数status != 'draft'
articles_total_sql = """
SELECT COUNT(id) as total
FROM ai_articles
WHERE enterprise_id = %s
"""
articles_total_result = db_manager.execute_query(articles_total_sql, (enterprise_id,))
articles_total = articles_total_result[0]['total'] if articles_total_result else 0
# ✅ 2. 能发的文章status = 'published_review'
articles_available_sql = """
SELECT COUNT(id) as total
FROM ai_articles
WHERE enterprise_id = %s AND status = 'published_review'
"""
articles_available_result = db_manager.execute_query(articles_available_sql, (enterprise_id,))
articles_available = articles_available_result[0]['total'] if articles_available_result else 0
# ✅ 3. 发布成功status = 'published'
articles_published_sql = """
SELECT COUNT(id) as total
FROM ai_articles
WHERE enterprise_id = %s AND status = 'published'
"""
articles_published_result = db_manager.execute_query(articles_published_sql, (enterprise_id,))
articles_published = articles_published_result[0]['total'] if articles_published_result else 0
stats = {
'articles_total': articles_total,
'articles_available': articles_available,
'articles_published': articles_published
}
logger.info(f"[文章仪表盘] 查询成功, 企业ID: {enterprise_id}, 总数: {articles_total}, 可发: {articles_available}, 已发: {articles_published}, IP: {client_ip}")
return jsonify({
'code': 200,
'message': 'success',
'data': stats,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[文章仪表盘] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500