Files
ai_wht_B/article_routes.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

1347 lines
56 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
文章管理接口
"""
from flask import Blueprint, request, jsonify
import logging
import json
import time
import random
from datetime import datetime
from auth_utils import require_auth, require_role, AuthUtils
from database_config import get_db_manager, format_datetime_fields
from log_utils import log_create, log_update, log_delete, log_error, log_operation
logger = logging.getLogger('article_server')
# 创建蓝图
article_bp = Blueprint('article', __name__, url_prefix='/api/articles')
# 图片基础URL
#IMAGE_BASE_URL_CDN1 = "http://images11.bxmkb.cn/Images/"
IMAGE_BASE_URL_CDN1 = "https://bxmkb-beijing.oss-cn-beijing.aliyuncs.com/Images/"
@article_bp.route('/list', methods=['GET'])
@require_auth
def get_articles_list():
"""获取文章列表(聚合图片和标签)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取文章列表] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
logger.info(f"[获取文章列表] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}")
if not enterprise_id:
logger.warning(f"[获取文章列表] 无法获取企业ID, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
# 获取查询参数
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
keyword = request.args.get('keyword', '').strip()
product_id = request.args.get('product_id', '').strip()
status = request.args.get('status', '').strip()
logger.info(f"[获取文章列表] 查询参数: page={page}, pageSize={page_size}, keyword={keyword}, product_id={product_id}, status={status}, 企业ID: {enterprise_id}, IP: {client_ip}")
# 构建查询条件
where_conditions = ["enterprise_id = %s"]
params = [enterprise_id]
if keyword:
where_conditions.append("(title LIKE %s OR content LIKE %s OR topic LIKE %s)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
if product_id:
where_conditions.append("product_id = %s")
params.append(product_id)
if status:
where_conditions.append("status = %s")
params.append(status)
where_clause = " AND ".join(where_conditions)
# 计算偏移量
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 查询总数
count_sql = f"""
SELECT COUNT(*) as total
FROM ai_articles
WHERE {where_clause}
"""
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# ✅ 查询文章列表直接从ai_articles表获取所有字段避免跨表查询
sql = f"""
SELECT id, batch_id, enterprise_id, product_id, product_name, topic_type_id,
prompt_workflow_id, prompt_workflow_name, topic, title, context_summary, department,
departmentids, author_id, author_name, department_id, department_name,
created_user_id, review_user_id, publish_user_id, status, channel,
review_comment, publish_time, publish_link, baijiahao_id, baijiahao_status,
word_count, image_count, coze_tag, created_at, updated_at,
product_name as product_name,
prompt_workflow_name as prompt_name
FROM ai_articles
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
articles = db_manager.execute_query(sql, params)
# ✅ 批量查询所有文章的图片和标签避免N+1查询
if articles:
article_ids = [article['id'] for article in articles]
placeholders = ','.join(['%s'] * len(article_ids))
# 批量查询图片
images_sql = f"""
SELECT id, article_id, image_id, image_url, image_thumb_url, image_tag_id,
sort_order, keywords_id, keywords_name, department_id,
department_name, image_source, created_at
FROM ai_article_images
WHERE article_id IN ({placeholders})
ORDER BY article_id, sort_order ASC, created_at ASC
"""
all_images = db_manager.execute_query(images_sql, article_ids)
# 批量查询标签
tags_sql = f"""
SELECT id, article_id, coze_tag, created_at
FROM ai_article_tags
WHERE article_id IN ({placeholders})
"""
all_tags = db_manager.execute_query(tags_sql, article_ids)
# 构建图片和标签的字典映射
images_map = {}
for img in all_images:
aid = img['article_id']
if aid not in images_map:
images_map[aid] = []
images_map[aid].append(img)
tags_map = {}
for tag in all_tags:
tags_map[tag['article_id']] = tag
# 将图片和标签分配给对应的文章
for article in articles:
article['images'] = images_map.get(article['id'], [])
article['tags'] = tags_map.get(article['id'])
# 格式化日期时间字段
articles = format_datetime_fields(articles)
logger.info(f"[获取文章列表] 查询成功, 总数: {total}, 当前页: {page}, 每页: {page_size}, 返回数量: {len(articles)}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 200,
'message': 'success',
'data': {
'total': total,
'list': articles
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取文章列表] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/generate', methods=['POST'])
@require_auth
def generate_article():
"""生成文案"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[生成文案] 开始处理生成文案请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
logger.info(f"[生成文案] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}")
if not enterprise_id:
logger.warning(f"[生成文案] 无法获取企业ID, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
logger.warning(f"[生成文案] 请求参数为空, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
logger.info(f"[生成文案] 收到生成请求, 产品ID: {data.get('product_id')}, 提示词ID: {data.get('prompt_workflow_id')}, 主题数: {len(data.get('topics', []))}, 企业ID: {enterprise_id}, IP: {client_ip}")
# 验证必需字段
required_fields = ['product_id', 'prompt_workflow_id', 'topics']
for field in required_fields:
if not data.get(field):
logger.warning(f"[生成文案] 缺少必需字段: {field}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 400,
'message': f'缺少必需字段: {field}',
'data': None
}), 400
db_manager = get_db_manager()
# 验证产品是否存在
logger.info(f"[生成文案] 验证产品是否存在, 产品ID: {data['product_id']}, 企业ID: {enterprise_id}")
check_product_sql = "SELECT id, name FROM ai_products WHERE id = %s AND enterprise_id = %s"
product = db_manager.execute_query(check_product_sql, (data['product_id'], enterprise_id))
if not product:
logger.warning(f"[生成文案] 产品不存在, 产品ID: {data['product_id']}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
logger.info(f"[生成文案] 产品验证成功, 产品名称: {product[0]['name']}, ID: {data['product_id']}")
# 验证提示词是否存在
logger.info(f"[生成文案] 验证提示词是否存在, 提示词ID: {data['prompt_workflow_id']}, 企业ID: {enterprise_id}")
check_prompt_sql = "SELECT id, prompt_workflow_name FROM ai_prompt_workflow WHERE id = %s AND enterprise_id = %s"
prompt = db_manager.execute_query(check_prompt_sql, (data['prompt_workflow_id'], enterprise_id))
if not prompt:
logger.warning(f"[生成文案] 提示词不存在, 提示词ID: {data['prompt_workflow_id']}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 404,
'message': '提示词不存在',
'data': None
}), 404
logger.info(f"[生成文案] 提示词验证成功, 名称: {prompt[0]['prompt_workflow_name']}, ID: {data['prompt_workflow_id']}")
count = data.get('count', 1)
topics = data['topics'][:count] if len(data['topics']) >= count else data['topics']
logger.info(f"[生成文案] 开始生成文案, 主题数量: {len(topics)}, 产品: {product[0]['name']}, 企业ID: {enterprise_id}")
generated_articles = []
# 生成batch_id时间戳 + 6位随机数
timestamp = int(time.time())
random_num = random.randint(100000, 999999)
batch_id = f"{timestamp}{random_num}"
logger.info(f"[批量生成文章] 生成batch_id: {batch_id}, 待处理数据行数: {len(topics)}")
for topic in topics:
logger.info(f"[生成文案] 开始生成主题文案: {topic}, 产品: {product[0]['name']}")
# TODO: 这里应该调用AI接口生成文案内容
# 目前使用模拟数据
title = f"{topic}"
#content = f"这是一篇关于{topic}的精彩内容..."
# 插入文案记录content字段先为空等待后续脚本填充
sql = """
INSERT INTO ai_articles
(enterprise_id, product_id, product_name, prompt_workflow_id, prompt_workflow_name, title, topic, content, status, batch_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
article_id = db_manager.execute_insert(sql, (
enterprise_id,
data['product_id'],
product[0]['name'],
data['prompt_workflow_id'],
prompt[0]['prompt_workflow_name'],
title,
topic,
'', # ✅ content字段先设为空等待后续脚本填充
'generate',
batch_id
))
logger.info(f"[生成文案] 文案生成成功, 文案ID: {article_id}, 主题: {topic}, 标题: {title}")
generated_articles.append({
'id': article_id,
'title': title,
'topic': topic
})
# 更新产品和企业文案总数
update_product_sql = "UPDATE ai_products SET articles_total = articles_total + %s WHERE id = %s"
db_manager.execute_update(update_product_sql, (len(generated_articles), data['product_id']))
update_enterprise_sql = "UPDATE ai_enterprises SET articles_total = articles_total + %s WHERE id = %s"
db_manager.execute_update(update_enterprise_sql, (len(generated_articles), enterprise_id))
# 更新提示词使用次数
update_prompt_sql = "UPDATE ai_prompt_workflow SET usage_count = usage_count + %s WHERE id = %s"
db_manager.execute_update(update_prompt_sql, (len(generated_articles), data['prompt_workflow_id']))
logger.info(f"生成文案成功: {len(generated_articles)}")
return jsonify({
'code': 200,
'message': '生成成功',
'data': {
'generated': len(generated_articles),
'articles': generated_articles
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[生成文案] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/create_Discard', methods=['POST'])
@require_auth
def create_article():
"""创建文章"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[创建文章] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
user_id = current_user.get('user_id', 0)
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
# 验证必需字段
required_fields = ['title', 'content']
for field in required_fields:
if not data.get(field):
return jsonify({
'code': 400,
'message': f'缺少必需字段: {field}',
'data': None
}), 400
db_manager = get_db_manager()
# ✅ 插入文章主表
article_sql = """
INSERT INTO ai_articles
(enterprise_id, product_id, topic_type_id, prompt_workflow_id, topic, title, content,
department, departmentids, author_id, author_name, department_id, department_name,
created_user_id, status, channel, word_count, image_count, batch_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
article_id = db_manager.execute_insert(article_sql, (
enterprise_id,
data.get('product_id', 0),
data.get('topic_type_id', 0),
data.get('prompt_workflow_id', 0),
data.get('topic', ''),
data['title'],
data['content'],
data.get('department', ''),
data.get('departmentids', ''),
data.get('author_id'),
data.get('author_name'),
data.get('department_id'),
data.get('department_name'),
user_id,
data.get('status', 'draft'),
data.get('channel', 1),
data.get('word_count', len(data['content'])),
data.get('image_count', 0),
data.get('batch_id', 0)
))
# ✅ 插入文章图片
if data.get('images'):
for img in data['images']:
image_sql = """
INSERT INTO ai_article_images
(enterprise_id, article_id, image_id, image_url, image_thumb_url,
image_tag_id, sort_order, keywords_id, keywords_name,
department_id, department_name, image_source)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
db_manager.execute_insert(image_sql, (
enterprise_id,
article_id,
img.get('image_id', 0),
img.get('image_url', ''),
img.get('image_thumb_url', ''),
img.get('image_tag_id', 0),
img.get('sort_order', 0),
img.get('keywords_id', 0),
img.get('keywords_name', ''),
img.get('department_id', 0),
img.get('department_name', ''),
img.get('image_source', 0)
))
# ✅ 插入文章标签
if data.get('coze_tag'):
tag_sql = """
INSERT INTO ai_article_tags (enterprise_id, article_id, coze_tag)
VALUES (%s, %s, %s)
"""
db_manager.execute_insert(tag_sql, (enterprise_id, article_id, data['coze_tag']))
logger.info(f"[创建文章] 创建成功, 文章ID: {article_id}, 企业ID: {enterprise_id}")
return jsonify({
'code': 200,
'message': '创建成功',
'data': {
'id': article_id,
'title': data['title']
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[创建文章] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/<int:article_id>', methods=['PUT'])
@require_auth
def update_article(article_id):
"""更新文章"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[更新文章] 开始处理请求, 文章ID: {article_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
db_manager = get_db_manager()
# 检查文章是否存在且属于当前企业
check_sql = "SELECT id, status FROM ai_articles WHERE id = %s AND enterprise_id = %s"
existing = db_manager.execute_query(check_sql, (article_id, enterprise_id))
if not existing:
return jsonify({
'code': 404,
'message': '文章不存在',
'data': None
}), 404
old_status = existing[0]['status']
# ✅ 构建更新字段
update_fields = []
params = []
field_mapping = {
'product_id': 'product_id',
'topic_type_id': 'topic_type_id',
'prompt_workflow_id': 'prompt_workflow_id',
'topic': 'topic',
'title': 'title',
'content': 'content',
'department': 'department',
'departmentids': 'departmentids',
'author_id': 'author_id',
'author_name': 'author_name',
'department_id': 'department_id',
'department_name': 'department_name',
'status': 'status',
'channel': 'channel',
'review_comment': 'review_comment',
'publish_time': 'publish_time',
'publish_link': 'publish_link',
'baijiahao_id': 'baijiahao_id',
'baijiahao_status': 'baijiahao_status',
'word_count': 'word_count',
'image_count': 'image_count',
'batch_id': 'batch_id'
}
for field, db_field in field_mapping.items():
if field in data:
update_fields.append(f"{db_field} = %s")
params.append(data[field])
if update_fields:
params.append(article_id)
sql = f"UPDATE ai_articles SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, params)
# ✅ 更新文章图片(先删除后插入)
if 'images' in data:
# 删除旧图片
db_manager.execute_update("DELETE FROM ai_article_images WHERE article_id = %s", (article_id,))
# 插入新图片
if data['images']:
for img in data['images']:
image_sql = """
INSERT INTO ai_article_images
(enterprise_id, article_id, image_id, image_url, image_thumb_url,
image_tag_id, sort_order, keywords_id, keywords_name,
department_id, department_name, image_source)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
db_manager.execute_insert(image_sql, (
enterprise_id,
article_id,
img.get('image_id', 0),
img.get('image_url', ''),
img.get('image_thumb_url', ''),
img.get('image_tag_id', 0),
img.get('sort_order', 0),
img.get('keywords_id', 0),
img.get('keywords_name', ''),
img.get('department_id', 0),
img.get('department_name', ''),
img.get('image_source', 0)
))
# ✅ 更新文章标签
if 'coze_tag' in data:
# 检查是否已存在标签
tag_check = db_manager.execute_query(
"SELECT id FROM ai_article_tags WHERE article_id = %s",
(article_id,)
)
if tag_check:
# 更新标签
db_manager.execute_update(
"UPDATE ai_article_tags SET coze_tag = %s WHERE article_id = %s",
(data['coze_tag'], article_id)
)
else:
# 插入标签
db_manager.execute_insert(
"INSERT INTO ai_article_tags (enterprise_id, article_id, coze_tag) VALUES (%s, %s, %s)",
(enterprise_id, article_id, data['coze_tag'])
)
# ✅ 如果状态发生变化,记录到发布记录表
new_status = data.get('status')
if new_status and new_status != old_status:
# 查询文章信息
article_info = db_manager.execute_query(
"SELECT product_id, topic, title FROM ai_articles WHERE id = %s",
(article_id,)
)
if article_info:
record_sql = """
INSERT INTO ai_article_published_records
(article_id, enterprise_id, product_id, topic, title, created_user_id,
status, channel, word_count, image_count, publish_time, publish_link)
SELECT id, enterprise_id, product_id, topic, title, created_user_id,
status, channel, word_count, image_count, publish_time, publish_link
FROM ai_articles
WHERE id = %s
"""
db_manager.execute_insert(record_sql, (article_id,))
logger.info(f"[更新文章] 更新成功, 文章ID: {article_id}, 企业ID: {enterprise_id}")
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新文章] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/<int:article_id>', methods=['GET'])
@require_auth
def get_article_detail(article_id):
"""获取文章详情(包含图片、标签、发布记录)"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# ✅ 查询文章详情(包含所有字段)
sql = """
SELECT a.id, a.batch_id, a.enterprise_id, a.product_id, a.topic_type_id,
a.prompt_workflow_id, a.topic, a.title, a.content, a.department,
a.departmentids, a.author_id, a.author_name, a.department_id, a.department_name,
a.created_user_id, a.review_user_id, a.publish_user_id, a.status, a.channel,
a.review_comment, a.publish_time, a.publish_link, a.baijiahao_id, a.baijiahao_status,
a.word_count, a.image_count, a.coze_tag, a.created_at, a.updated_at,
p.name as product_name,
pw.prompt_workflow_name as prompt_name
FROM ai_articles a
LEFT JOIN ai_products p ON a.product_id = p.id
LEFT JOIN ai_prompt_workflow pw ON a.prompt_workflow_id = pw.id
WHERE a.id = %s AND a.enterprise_id = %s
"""
result = db_manager.execute_query(sql, (article_id, enterprise_id))
if not result:
return jsonify({
'code': 404,
'message': '文章不存在',
'data': None
}), 404
article = result[0]
# ✅ 查询文章图片
images_sql = """
SELECT id, image_id, image_url, image_thumb_url, image_tag_id,
sort_order, keywords_id, keywords_name, department_id,
department_name, image_source, created_at
FROM ai_article_images
WHERE article_id = %s
ORDER BY sort_order ASC, created_at ASC
"""
images = db_manager.execute_query(images_sql, (article_id,))
# 为图片URL添加CDN前缀
for img in images:
if img.get('image_url'):
img['image_url'] = IMAGE_BASE_URL_CDN1 + img['image_url']
if img.get('image_thumb_url'):
img['image_thumb_url'] = IMAGE_BASE_URL_CDN1 + img['image_thumb_url']
article['images'] = images
# ✅ 查询文章标签
tags_sql = """
SELECT id, coze_tag, created_at
FROM ai_article_tags
WHERE article_id = %s
"""
tags_result = db_manager.execute_query(tags_sql, (article_id,))
article['tags'] = tags_result[0] if tags_result else None
# ✅ 查询文章发布记录
records_sql = """
SELECT id, status, created_user_id, review_user_id, publish_user_id,
review_comment, publish_time, publish_link, word_count, image_count, created_at
FROM ai_article_published_records
WHERE article_id = %s
ORDER BY created_at DESC
"""
article['publish_records'] = db_manager.execute_query(records_sql, (article_id,))
# 格式化日期字段
article = format_datetime_fields([article])[0]
logger.info(f"获取文章详情成功: ID {article_id}")
return jsonify({
'code': 200,
'message': 'success',
'data': article,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取文章详情] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/<int:article_id>', methods=['DELETE'])
@require_auth
def delete_article(article_id):
"""删除文章(级联删除图片和标签)"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 检查文章是否存在且属于当前企业
check_sql = "SELECT id, title FROM ai_articles WHERE id = %s AND enterprise_id = %s"
existing = db_manager.execute_query(check_sql, (article_id, enterprise_id))
if not existing:
return jsonify({
'code': 404,
'message': '文章不存在',
'data': None
}), 404
# ✅ 删除文章图片
db_manager.execute_update("DELETE FROM ai_article_images WHERE article_id = %s", (article_id,))
# ✅ 删除文章标签
db_manager.execute_update("DELETE FROM ai_article_tags WHERE article_id = %s", (article_id,))
# ✅ 删除文章主表
sql = "DELETE FROM ai_articles WHERE id = %s"
db_manager.execute_update(sql, (article_id,))
logger.info(f"删除文章成功: ID {article_id}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除文章] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/list_dashboard', methods=['GET'])
@require_auth
def get_articles_dashboard():
"""获取文章仪表盘统计"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[文章仪表盘] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
logger.warning(f"[文章仪表盘] 无法获取企业ID, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# ✅ 1. 文章总数status != 'draft'
articles_total_sql = """
SELECT COUNT(id) as total
FROM ai_articles
WHERE enterprise_id = %s
"""
articles_total_result = db_manager.execute_query(articles_total_sql, (enterprise_id,))
articles_total = articles_total_result[0]['total'] if articles_total_result else 0
# ✅ 2. 能发的文章status = 'published_review'
articles_available_sql = """
SELECT COUNT(id) as total
FROM ai_articles
WHERE enterprise_id = %s AND status = 'published_review'
"""
articles_available_result = db_manager.execute_query(articles_available_sql, (enterprise_id,))
articles_available = articles_available_result[0]['total'] if articles_available_result else 0
# ✅ 3. 发布成功status = 'published'
articles_published_sql = """
SELECT COUNT(id) as total
FROM ai_articles
WHERE enterprise_id = %s AND status = 'published'
"""
articles_published_result = db_manager.execute_query(articles_published_sql, (enterprise_id,))
articles_published = articles_published_result[0]['total'] if articles_published_result else 0
stats = {
'articles_total': articles_total,
'articles_available': articles_available,
'articles_published': articles_published
}
logger.info(f"[文章仪表盘] 查询成功, 企业ID: {enterprise_id}, 总数: {articles_total}, 可发: {articles_available}, 已发: {articles_published}, IP: {client_ip}")
return jsonify({
'code': 200,
'message': 'success',
'data': stats,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[文章仪表盘] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/batch-published-account-cycle', methods=['POST'])
@require_auth
@require_role('admin','editor','reviewer','publisher','enterprise')
def batch_published_account_cycle_articles():
"""批量发布文章 - 账号循环分配"""
try:
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
article_ids = data.get('article_ids', [])
author_ids = data.get('author_ids', [])
logger.info(f"[批量发布文章-循环分配] 入参数据: {json.dumps(data, ensure_ascii=False)}")
if not article_ids:
return jsonify({
'code': 400,
'message': '缺少文章ID列表',
'data': None
}), 400
if not author_ids:
return jsonify({
'code': 400,
'message': '缺少作者ID列表',
'data': None
}), 400
db_manager = get_db_manager()
current_user = AuthUtils.get_current_user()
# 对文章ID和作者ID进行有序排序按ID从小到大
sorted_article_ids = sorted(article_ids)
sorted_author_ids = sorted(author_ids)
logger.info(f"[批量发布文章-循环分配] 排序后文章ID: {sorted_article_ids}")
logger.info(f"[批量发布文章-循环分配] 排序后作者ID: {sorted_author_ids}")
# 查询所有作者信息
author_placeholders = ','.join(['%s'] * len(sorted_author_ids))
author_sql = f"""
SELECT id, app_id, app_token, author_name, department_id, department_name
FROM ai_authors
WHERE id IN ({author_placeholders}) AND status = 'active'
ORDER BY id ASC
"""
author_results = db_manager.execute_query(author_sql, sorted_author_ids)
logger.info(f"[批量发布文章-循环分配] 查询到作者信息: {json.dumps(author_results, ensure_ascii=False)}")
if not author_results:
return jsonify({
'code': 400,
'message': '未找到有效的作者信息',
'data': None
}), 400
# 创建作者信息字典按ID排序
authors_dict = {}
for author in author_results:
authors_dict[author['id']] = author
logger.info(f"[批量发布文章authors_dict] 查询到作者信息: {json.dumps(authors_dict, ensure_ascii=False)}")
# 一次性查询文章信息并筛选有效文章
article_placeholders = ','.join(['%s'] * len(sorted_article_ids))
article_sql = f"""
SELECT id, title, status
FROM ai_articles
WHERE id IN ({article_placeholders}) AND status = 'pending_review'
ORDER BY id ASC
"""
logger.info(f"[批量发布文章-循环分配] 执行SQL查询文章: {article_sql} - 参数: {sorted_article_ids}")
articles = db_manager.execute_query(article_sql, sorted_article_ids)
logger.info(f"[批量发布文章-循环分配] 查询结果: 请求{len(sorted_article_ids)}篇,找到{len(articles)}篇有效文章")
if not articles:
return jsonify({
'code': 400,
'message': f'未找到状态为pending_review的文章: {sorted_article_ids}',
'data': {
'requested_articles': sorted_article_ids,
'found_articles': 0
}
}), 400
published_count = 0
failed_count = 0
publish_results = []
assignment_results = []
# 实现循环分配逻辑
try:
logger.info(f"[批量发布文章-循环分配] 开始循环分配处理 {len(articles)} 篇文章")
# 循环分配文章到作者
for i, article in enumerate(articles):
# 使用模运算实现循环分配
author_index = i % len(sorted_author_ids)
assigned_author_id = sorted_author_ids[author_index]
# 获取作者信息,如果找不到则从头重新分配
author_info = authors_dict.get(assigned_author_id)
retry_count = 0
max_retries = len(sorted_author_ids)
# ✅ 如果找不到作者信息,从头开始重新循环分配
while not author_info and retry_count < max_retries:
retry_count += 1
author_index = (author_index + 1) % len(sorted_author_ids)
assigned_author_id = sorted_author_ids[author_index]
author_info = authors_dict.get(assigned_author_id)
logger.warning(f"[批量发布文章-循环分配] 作者ID {sorted_author_ids[author_index - 1]} 无效,重试第{retry_count}尝试作者ID {assigned_author_id}")
# 如果尝试所有作者后仍未找到有效作者,则标记失败
if not author_info:
logger.error(f"[批量发布文章-循环分配] 文章 {article['id']} 尝试所有作者后仍未找到有效作者信息")
failed_count += 1
publish_results.append({
'article_id': article['id'],
'title': article['title'],
'status': 'failed',
'error': '所有作者都无效,无法分配作者'
})
continue
article_id = article['id']
title = article['title']
author_name = author_info['author_name']
department_id = author_info['department_id']
department_name = author_info['department_name']
# ✅ 最终验证:确保作者信息完整
if not assigned_author_id or not author_name:
error_msg = f"文章 {article_id} 作者信息不完整assigned_author_id={assigned_author_id}, author_name={author_name}"
logger.error(f"[批量发布文章-循环分配] {error_msg}")
failed_count += 1
publish_results.append({
'article_id': article_id,
'title': title,
'status': 'failed',
'error': '作者信息不完整,无法分配作者'
})
continue
# 记录分配结果
assignment_results.append({
'article_id': article_id,
'article_title': title,
'author_id': assigned_author_id,
'author_name': author_name,
'department_id': department_id,
'department_name': department_name
})
logger.info(f"[批量发布文章-循环分配] 文章 {article_id} 分配给作者 {assigned_author_id} ({author_name})")
# ✅ 最终验证:确保所有文章都已分配作者
logger.info(f"[批量发布文章-循环分配] 分配完成统计 - 请求文章数: {len(articles)}, 成功分配: {len(assignment_results)}, 失败: {failed_count}")
if len(assignment_results) == 0:
return jsonify({
'code': 400,
'message': '所有文章都未能成功分配作者,批量发布失败',
'data': {
'total_articles': len(articles),
'assigned_articles': 0,
'failed_articles': failed_count,
'results': publish_results
}
}), 400
# 批量更新文章状态
for assignment in assignment_results:
try:
update_sql = """
UPDATE ai_articles
SET status = 'assign_authors',
author_id = %s,
author_name = %s,
department_name = %s,
department_id = %s,
publish_user_id = %s,
publish_time = NOW(),
updated_at = NOW()
WHERE id = %s
"""
update_params = (
assignment['author_id'],
assignment['author_name'],
assignment['department_name'],
assignment['department_id'],
current_user['user_id'],
assignment['article_id']
)
logger.info(f"[批量发布文章-循环分配] 执行SQL更新: {update_sql} - 参数: {update_params}")
affected_rows = db_manager.execute_update(update_sql, update_params)
logger.info(f"[批量发布文章-循环分配] 更新文章 {assignment['article_id']} 完成,影响行数: {affected_rows}")
# 生成百家号ID模拟
baijiahao_id = f"bjh_{assignment['article_id']}_{int(time.time())}"
published_count += 1
publish_results.append({
'article_id': assignment['article_id'],
'title': assignment['article_title'],
'author_id': assignment['author_id'],
'author_name': assignment['author_name'],
'status': 'success',
'baijiahao_id': baijiahao_id
})
logger.info(f"[批量发布文章-循环分配] 文章发布成功: ID={assignment['article_id']}, 标题={assignment['article_title']}, 作者={assignment['author_name']}, 百家号ID: {baijiahao_id}")
except Exception as update_error:
logger.error(f"[批量发布文章-循环分配] 更新文章 {assignment['article_id']} 失败: {str(update_error)}")
failed_count += 1
publish_results.append({
'article_id': assignment['article_id'],
'title': assignment['article_title'],
'author_id': assignment['author_id'],
'author_name': assignment['author_name'],
'status': 'failed',
'error': str(update_error)
})
except Exception as e:
# 批量发布异常处理
logger.error(f"[批量发布文章-循环分配] 批量发布异常: {str(e)}", exc_info=True)
# 将所有文章标记为失败
for article in articles:
article_id = article['id']
title = article['title']
failed_count += 1
publish_results.append({
'article_id': article_id,
'title': title,
'status': 'error',
'error': str(e)
})
logger.error(f"[批量发布文章-循环分配] 文章发布异常: ID={article_id}, 标题={title}, 错误: {str(e)}")
# 记录操作日志
log_sql = """
INSERT INTO ai_logs (user_id, action, target_type, target_id, description, ip_address, user_agent, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
user_agent = request.headers.get('User-Agent', '未知')
log_params = (
current_user['user_id'],
'batch_publish_articles_cycle',
'articles',
None, # target_id 设为 NULL因为是批量操作
f'批量发布文章(循环分配),成功: {published_count},失败: {failed_count}文章ID: {",".join(map(str, sorted_article_ids))}作者ID: {",".join(map(str, sorted_author_ids))}',
client_ip,
user_agent,
'success' if failed_count == 0 else 'warning'
)
logger.info(f"[批量发布文章-循环分配] 执行SQL插入日志: {log_sql} - 参数: {log_params}")
db_manager.execute_insert(log_sql, log_params)
logger.info(f"[批量发布文章-循环分配] 操作日志记录成功")
logger.info(f"批量发布文章(循环分配)完成,成功: {published_count},失败: {failed_count}")
return jsonify({
'code': 200,
'message': f'批量发布(循环分配)完成,成功: {published_count},失败: {failed_count}',
'data': {
'published_count': published_count,
'failed_count': failed_count,
'total_count': len(articles),
'assignment_results': assignment_results,
'results': publish_results
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[批量发布文章-循环分配] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@article_bp.route('/batch-published-review', methods=['POST'])
@require_auth
@require_role('admin','editor','reviewer','publisher','enterprise')
def batch_published_review_articles():
"""批量发布文章"""
try:
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
article_ids = data.get('article_ids', [])
author_id = data.get('author_id')
logger.info(f"[push文章] 入参数据: {json.dumps(data, ensure_ascii=False)}")
if not article_ids:
return jsonify({
'code': 400,
'message': '缺少文章ID列表',
'data': None
}), 400
if not author_id:
return jsonify({
'code': 400,
'message': '缺少作者ID',
'data': None
}), 400
db_manager = get_db_manager()
current_user = AuthUtils.get_current_user()
# 获取作者的app_id和app_token信息
author_sql = """
SELECT app_id, app_token, author_name, department_id, department_name
FROM ai_authors
WHERE id = %s AND status = 'active'
"""
author_result = db_manager.execute_query(author_sql, (author_id,))
logger.info(f"[医生信息] 数据库返回数据: {json.dumps(author_result, ensure_ascii=False)}")
if not author_result:
return jsonify({
'code': 400,
'message': '作者信息不存在或已禁用',
'data': None
}), 400
app_id = author_result[0]['app_id']
app_token = author_result[0]['app_token']
author_name = author_result[0]['author_name']
department_id = author_result[0]['department_id']
department_name = author_result[0]['department_name']
# 一次性查询文章信息并筛选有效文章
article_sql = """
SELECT id, title, status
FROM ai_articles
WHERE id IN %s AND status = 'approved'
"""
logger.info(f"[批量发布文章] 执行SQL查询文章: {article_sql} - 参数: {tuple(article_ids)}")
articles = db_manager.execute_query(article_sql, (tuple(article_ids),))
logger.info(f"[批量发布文章] 查询结果: 请求{len(article_ids)}篇,找到{len(articles)}篇有效文章")
if not articles:
return jsonify({
'code': 400,
'message': f'未找到状态为approved的文章: {article_ids}',
'data': {
'requested_articles': article_ids,
'found_articles': 0
}
}), 400
published_count = 0
failed_count = 0
publish_results = []
# 批量发布文章到百家号
try:
logger.info(f"[批量发布文章] 开始批量处理 {len(articles)} 篇文章")
# 模拟百家号发布逻辑这里需要根据实际API进行调整
# 由于没有实际的百家号API调用这里直接标记为成功
publish_success = True
if publish_success:
# 批量更新文章状态 - 使用 IN 子句进行批量更新
article_ids_str = ','.join(map(str, article_ids))
update_sql = f"""
UPDATE ai_articles
SET status = 'assign_authors',
author_id = %s,
author_name = %s,
department_name = %s,
department_id = %s,
publish_user_id = %s,
publish_time = NOW(),
updated_at = NOW()
WHERE id IN ({article_ids_str})
"""
update_params = (author_id, author_name, department_name, department_id, current_user['user_id'])
logger.info(f"[批量发布文章] 执行批量SQL更新: {update_sql} - 参数: {update_params}")
affected_rows = db_manager.execute_update(update_sql, update_params)
logger.info(f"[批量发布文章] 批量更新完成,影响行数: {affected_rows}")
# 为每篇文章生成发布结果
for article in articles:
article_id = article['id']
title = article['title']
baijiahao_id = f"bjh_{article_id}_{int(time.time())}"
published_count += 1
publish_results.append({
'article_id': article_id,
'title': title,
'status': 'success',
'baijiahao_id': baijiahao_id
})
logger.info(f"[批量发布文章] 文章发布成功: ID={article_id}, 标题={title}, 百家号ID: {baijiahao_id}")
else:
# 批量发布失败
error_msg = '百家号批量发布失败'
# 批量更新文章状态为失败
article_ids_str = ','.join(map(str, article_ids))
update_sql = f"""
UPDATE ai_articles
SET status = 'failed',
updated_at = NOW()
WHERE id IN ({article_ids_str})
"""
db_manager.execute_update(update_sql, ())
for article in articles:
article_id = article['id']
title = article['title']
failed_count += 1
publish_results.append({
'article_id': article_id,
'title': title,
'status': 'failed',
'error': error_msg
})
logger.error(f"[批量发布文章] 文章发布失败: ID={article_id}, 标题={title}, 错误: {error_msg}")
except Exception as e:
# 批量发布异常处理
logger.error(f"[批量发布文章] 批量发布异常: {str(e)}", exc_info=True)
# 将所有文章标记为失败
for article in articles:
article_id = article['id']
title = article['title']
failed_count += 1
publish_results.append({
'article_id': article_id,
'title': title,
'status': 'error',
'error': str(e)
})
logger.error(f"[批量发布文章] 文章发布异常: ID={article_id}, 标题={title}, 错误: {str(e)}")
# 记录操作日志
log_sql = """
INSERT INTO ai_logs (user_id, action, target_type, target_id, description, ip_address, user_agent, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
user_agent = request.headers.get('User-Agent', '未知')
log_params = (
current_user['user_id'],
'batch_publish_articles',
'articles',
None, # target_id 设为 NULL因为是批量操作
f'批量发布文章,成功: {published_count},失败: {failed_count}文章ID: {",".join(map(str, article_ids))}',
client_ip,
user_agent,
'success' if failed_count == 0 else 'warning'
)
logger.info(f"[批量发布文章] 执行SQL插入日志: {log_sql} - 参数: {log_params}")
db_manager.execute_insert(log_sql, log_params)
logger.info(f"[批量发布文章] 操作日志记录成功")
logger.info(f"批量发布文章完成,成功: {published_count},失败: {failed_count}")
return jsonify({
'code': 200,
'message': f'批量发布完成,成功: {published_count},失败: {failed_count}',
'data': {
'published_count': published_count,
'failed_count': failed_count,
'total_count': len(articles),
'results': publish_results
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[批量发布文章] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500