Files
ai_wht_B/ver_25122017/author_routes.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

694 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""作者管理接口"""
from flask import Blueprint, request, jsonify, g
import logging
from datetime import datetime
from auth_utils import require_auth, require_role, AuthUtils
from database_config import get_db_manager
# 导入统一日志配置
from log_config import setup_article_server_logger
# 使用统一的日志记录器
logger = logging.getLogger('article_server')
# 创建蓝图
author_bp = Blueprint('author', __name__, url_prefix='/api/authors')
@author_bp.route('', methods=['GET'])
@require_auth
def get_authors():
"""获取作者列表(分页)"""
try:
page = int(request.args.get('page', 1))
size = int(request.args.get('size', 10))
offset = (page - 1) * size
# 搜索参数
search = request.args.get('search', '').strip()
department = request.args.get('department', '').strip()
status = request.args.get('status', '').strip()
channel = request.args.get('channel', '').strip()
db_manager = get_db_manager()
# 构建查询条件
where_conditions = []
params = []
if search:
where_conditions.append("(author_name LIKE %s OR hospital LIKE %s OR title LIKE %s OR id = %s)")
search_param = f"%{search}%"
params.extend([search_param, search_param, search_param, search])
if department:
where_conditions.append("department = %s")
params.append(department)
if status:
where_conditions.append("status = %s")
params.append(status)
if channel:
where_conditions.append("channel = %s")
params.append(channel)
where_clause = ""
if where_conditions:
where_clause = "WHERE " + " AND ".join(where_conditions)
# 查询科室总数(用于分页)
dept_count_sql = f"SELECT COUNT(DISTINCT department_id) as total FROM ai_authors {where_clause}"
dept_count_result = db_manager.execute_query(dept_count_sql, params)
total_departments = dept_count_result[0]['total']
# 先获取分页的科室ID列表
dept_sql = f"""SELECT DISTINCT department_id, department_name
FROM ai_authors {where_clause}
ORDER BY department_id ASC
LIMIT %s OFFSET %s"""
dept_params = params + [size, offset]
departments = db_manager.execute_query(dept_sql, dept_params)
# 如果没有科室数据,直接返回空结果
if not departments:
department_list = []
else:
# 获取这些科室的所有作者
dept_ids = [str(dept['department_id']) for dept in departments]
dept_id_placeholders = ','.join(['%s'] * len(dept_ids))
# 构建包含科室ID过滤的查询条件
author_where_conditions = where_conditions.copy()
author_where_conditions.append(f"department_id IN ({dept_id_placeholders})")
author_where_clause = "WHERE " + " AND ".join(author_where_conditions)
author_sql = f"""SELECT id as author_id, author_name,
department_id, department_name, toutiao_cookie, toutiao_images_cookie
FROM ai_authors {author_where_clause}
ORDER BY department_id ASC, id ASC"""
author_params = params + dept_ids
authors = db_manager.execute_query(author_sql, author_params)
# 按科室分组
department_dict = {}
for dept in departments:
dept_id = dept['department_id']
department_dict[dept_id] = {
'department_id': dept_id,
'department_name': dept['department_name'],
'department_list': []
}
# 将作者分配到对应科室
for author in authors:
dept_id = author['department_id']
if dept_id in department_dict:
department_dict[dept_id]['department_list'].append(author)
# 转换为数组格式,保持科室顺序
department_list = [department_dict[dept['department_id']] for dept in departments]
return jsonify({
'code': 200,
'data': {
'list': department_list,
'page': page,
'size': size,
'total': total_departments
},
'message': '获取成功',
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"获取作者列表失败: {e}")
return jsonify({'code': 500, 'message': f'获取失败: {str(e)}'}), 500
@author_bp.route('/list', methods=['GET'])
@require_auth
def get_list_authors():
"""获取作者列表(分页,基于用户关联关系)"""
try:
page = int(request.args.get('page', 1))
size = int(request.args.get('size', 10))
offset = (page - 1) * size
# 搜索参数
search = request.args.get('search', '').strip()
department = request.args.get('department', '').strip()
status = request.args.get('status', '').strip()
channel = request.args.get('channel', '').strip()
db_manager = get_db_manager()
# 获取当前登录用户
current_user = AuthUtils.get_current_user()
logger.info(f"[get_authors] 获取到的用户信息: {current_user}")
if not current_user:
logger.error("[get_authors] 未获取到用户信息")
return jsonify({'code': 401, 'message': '未登录或用户信息缺失'}), 401
user_id = current_user.get('user_id')
logger.info(f"[get_authors] 提取的user_id: {user_id}")
if not user_id:
logger.error(f"[get_authors] user_id缺失current_user内容: {current_user}")
return jsonify({'code': 401, 'message': '用户ID缺失'}), 401
# Step 1: 先查出当前用户关联的作者ID
user_authors_sql = "SELECT author_id FROM ai_user_authors WHERE user_id = %s"
logger.info(f"[get_authors] 执行查询: {user_authors_sql}, 参数: {[user_id]}")
user_authors = db_manager.execute_query(user_authors_sql, [user_id])
logger.info(f"[get_authors] 查询结果: {user_authors}")
author_ids = [str(row['author_id']) for row in user_authors]
logger.info(f"[get_authors] 提取的author_ids: {author_ids}")
if not author_ids:
# 用户没有绑定任何作者
logger.info(f"[get_authors] 用户{user_id}没有绑定任何作者")
return jsonify({
'code': 200,
'data': {
'list': [],
'page': page,
'size': size,
'total': 0
},
'message': '获取成功',
'timestamp': int(datetime.now().timestamp() * 1000)
})
# Step 2: 构建查询条件(限定 author_id 范围)
where_conditions = ["id IN (" + ",".join(["%s"] * len(author_ids)) + ")"]
params = author_ids
if search:
where_conditions.append("(author_name LIKE %s OR hospital LIKE %s OR title LIKE %s)")
search_param = f"%{search}%"
params.extend([search_param, search_param, search_param])
if department:
where_conditions.append("department = %s")
params.append(department)
if status:
where_conditions.append("status = %s")
params.append(status)
if channel:
where_conditions.append("channel = %s")
params.append(channel)
where_clause = "WHERE " + " AND ".join(where_conditions)
# Step 3: 查询科室总数(用于分页)
dept_count_sql = f"SELECT COUNT(DISTINCT department_id) as total FROM ai_authors {where_clause}"
dept_count_result = db_manager.execute_query(dept_count_sql, params)
total_departments = dept_count_result[0]['total'] if dept_count_result else 0
# Step 4: 获取分页的科室ID列表
dept_sql = f"""SELECT DISTINCT department_id, department_name
FROM ai_authors {where_clause}
ORDER BY department_id ASC
LIMIT %s OFFSET %s"""
dept_params = params + [size, offset]
departments = db_manager.execute_query(dept_sql, dept_params)
if not departments:
department_list = []
else:
# Step 5: 获取这些科室下的所有作者
dept_ids = [str(dept['department_id']) for dept in departments]
dept_id_placeholders = ','.join(['%s'] * len(dept_ids))
author_where_conditions = where_conditions.copy()
author_where_conditions.append(f"department_id IN ({dept_id_placeholders})")
author_where_clause = "WHERE " + " AND ".join(author_where_conditions)
author_sql = f"""SELECT id as author_id, author_name,
department_id, department_name
FROM ai_authors {author_where_clause}
ORDER BY department_id ASC, id ASC"""
author_params = params + dept_ids
authors = db_manager.execute_query(author_sql, author_params)
# Step 6: 按科室分组
department_dict = {}
for dept in departments:
dept_id = dept['department_id']
department_dict[dept_id] = {
'department_id': dept_id,
'department_name': dept['department_name'],
'department_list': []
}
for author in authors:
dept_id = author['department_id']
if dept_id in department_dict:
department_dict[dept_id]['department_list'].append(author)
department_list = [department_dict[dept['department_id']] for dept in departments]
return jsonify({
'code': 200,
'data': {
'list': department_list,
'page': page,
'size': size,
'total': total_departments
},
'message': '获取成功',
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
import traceback
logger.error(f"获取作者列表失败: {e}")
logger.error(f"错误堆栈: {traceback.format_exc()}")
return jsonify({'code': 500, 'message': f'获取失败: {str(e)}'}), 500
@author_bp.route('/<int:author_id>', methods=['GET'])
@require_auth
def get_author(author_id):
"""获取单个作者详情"""
try:
db_manager = get_db_manager()
sql = """SELECT id, author_name, department, department_name, title, hospital, specialty, toutiao_images_cookie,
introduction, avatar_url, status, toutiao_cookie, created_at, updated_at
FROM ai_authors WHERE id = %s"""
author = db_manager.execute_query(sql, (author_id,))
if not author:
return jsonify({'code': 404, 'message': '作者不存在'}), 404
return jsonify({
'code': 200,
'data': author[0],
'message': '获取成功',
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"获取作者详情失败: {e}")
return jsonify({'code': 500, 'message': f'获取失败: {str(e)}'}), 500
@author_bp.route('', methods=['POST'])
@require_auth
@require_role(['admin', 'editor'])
def create_author():
"""创建新作者"""
try:
data = request.get_json()
logger.info(f"[创建作者] 接收到的数据: {data}")
# 必填字段验证
author_name = data.get('author_name', '').strip()
app_id = data.get('app_id', '').strip()
app_token = data.get('app_token', '').strip()
department_name = data.get('department_name', '').strip()
toutiao_cookie = data.get('toutiao_cookie', '').strip()
toutiao_images_cookie = data.get('toutiao_images_cookie', '').strip()
channel = data.get('channel', 1)
logger.info(f"[创建作者] 字段值 - author_name: '{author_name}', app_id: '{app_id}', app_token: '{app_token}', department_name: '{department_name}'")
if not author_name:
logger.warning(f"[创建作者] 作者姓名为空")
return jsonify({'code': 400, 'message': '作者姓名不能为空'}), 400
if not app_id:
logger.warning(f"[创建作者] app_id为空")
return jsonify({'code': 400, 'message': 'app_id不能为空'}), 400
if not app_token:
logger.warning(f"[创建作者] app_token为空")
return jsonify({'code': 400, 'message': 'app_token不能为空'}), 400
if not department_name:
logger.warning(f"[创建作者] department_name为空")
return jsonify({'code': 400, 'message': '科室不能为空'}), 400
if channel == 2 and not toutiao_cookie:
logger.warning(f"[创建作者] toutiao_cookie为空")
return jsonify({'code': 400, 'message': 'toutiao_cookie不能为空'}), 400
if channel == 2 and not toutiao_images_cookie:
logger.warning(f"[创建作者] toutiao_images_cookie为空")
return jsonify({'code': 400, 'message': 'toutiao_images_cookie不能为空'}), 400
# 兼容旧字段
department = data.get('department', department_name).strip()
# 可选字段
title = data.get('title', '').strip()
hospital = data.get('hospital', '').strip()
specialty = data.get('specialty', '').strip()
introduction = data.get('introduction', '').strip()
avatar_url = data.get('avatar_url', '').strip()
status = data.get('status', 'active')
# 状态验证
if status not in ['active', 'inactive']:
return jsonify({'code': 400, 'message': '状态值无效'}), 400
db_manager = get_db_manager()
# (1) 判断author_name唯一否则报错
check_author_name_sql = "SELECT id FROM ai_authors WHERE author_name = %s"
existing_author_name = db_manager.execute_query(check_author_name_sql, (author_name,))
if existing_author_name:
logger.warning(f"[创建作者] 作者姓名已存在: {author_name}")
return jsonify({'code': 400, 'message': f'作者姓名 "{author_name}" 已存在'}), 400
# (2) 判断app_id唯一否则报错
check_app_id_sql = "SELECT id FROM ai_authors WHERE app_id = %s"
existing_app_id = db_manager.execute_query(check_app_id_sql, (app_id,))
if existing_app_id:
logger.warning(f"[创建作者] app_id已存在: {app_id}")
return jsonify({'code': 400, 'message': f'app_id "{app_id}" 已存在'}), 400
# (3) 判断app_token唯一否则报错
check_app_token_sql = "SELECT id FROM ai_authors WHERE app_token = %s"
existing_app_token = db_manager.execute_query(check_app_token_sql, (app_token,))
if existing_app_token:
logger.warning(f"[创建作者] app_token已存在: {app_token}")
return jsonify({'code': 400, 'message': f'app_token "{app_token}" 已存在'}), 400
# (4) department_name处理逻辑
# 查询ai_departments表中是否存在该科室
check_dept_sql = "SELECT id FROM ai_departments WHERE department_name = %s"
existing_dept = db_manager.execute_query(check_dept_sql, (department_name,))
if existing_dept:
# 科室存在获取department_id
department_id = existing_dept[0]['id']
logger.info(f"[创建作者] 科室已存在使用现有ID: {department_id}")
else:
# 科室不存在,插入新科室
insert_dept_sql = "INSERT INTO ai_departments (department_name) VALUES (%s)"
department_id = db_manager.execute_insert(insert_dept_sql, (department_name,))
logger.info(f"[创建作者] 创建新科室ID: {department_id}")
# 创建作者
insert_sql = """INSERT INTO ai_authors
(author_name, app_id, app_token, department_name, department, department_id, title, hospital, specialty, toutiao_cookie, introduction, avatar_url, status, channel, toutiao_images_cookie)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
author_id = db_manager.execute_insert(insert_sql,
(author_name, app_id, app_token, department_name, department, department_id, title, hospital,
specialty, toutiao_cookie, introduction, avatar_url, status, channel, toutiao_images_cookie))
# 查询新作者
select_sql = """SELECT id, author_name, app_id, app_token, department_name, department, department_id, title, hospital, specialty,
toutiao_cookie, introduction, avatar_url, status, created_at, updated_at, channel
FROM ai_authors WHERE id = %s"""
new_author = db_manager.execute_query(select_sql, (author_id,))
logger.info(f"[创建作者] 成功创建作者ID: {author_id}, 科室ID: {department_id}")
return jsonify({
'code': 200,
'data': new_author[0],
'message': '创建成功',
'timestamp': int(datetime.now().timestamp() * 1000)
}), 201
except Exception as e:
logger.error(f"创建作者失败: {e}")
return jsonify({'code': 500, 'message': f'创建失败: {str(e)}'}), 500
@author_bp.route('/<int:author_id>', methods=['PUT'])
@require_auth
@require_role(['admin', 'editor'])
def update_author(author_id):
"""更新作者信息"""
try:
data = request.get_json()
# 必填字段验证
author_name = data.get('author_name', '').strip()
department_name = data.get('department_name', '').strip()
if not author_name:
return jsonify({'code': 400, 'message': '作者姓名不能为空'}), 400
if not department_name:
return jsonify({'code': 400, 'message': '科室不能为空'}), 400
# 可选字段
title = data.get('title', '').strip()
hospital = data.get('hospital', '').strip()
specialty = data.get('specialty', '').strip()
introduction = data.get('introduction', '').strip()
avatar_url = data.get('avatar_url', '').strip()
status = data.get('status', 'active')
toutiao_cookie = data.get('toutiao_cookie', '')
toutiao_images_cookie = data.get('toutiao_images_cookie', '')
# 状态验证
if status not in ['active', 'inactive']:
return jsonify({'code': 400, 'message': '状态值无效'}), 400
db_manager = get_db_manager()
# 检查存在
check_sql = "SELECT id FROM ai_authors WHERE id = %s"
existing = db_manager.execute_query(check_sql, (author_id,))
if not existing:
return jsonify({'code': 404, 'message': '作者不存在'}), 404
# 检查重复(同科室同姓名,排除自己)
duplicate_sql = "SELECT id FROM ai_authors WHERE author_name = %s AND department_name = %s AND id != %s"
duplicate = db_manager.execute_query(duplicate_sql, (author_name, department_name, author_id))
if duplicate:
return jsonify({'code': 400, 'message': '该科室已存在同名作者'}), 400
# (4) department_name处理逻辑
# 查询ai_departments表中是否存在该科室
check_dept_sql = "SELECT id FROM ai_departments WHERE department_name = %s"
existing_dept = db_manager.execute_query(check_dept_sql, (department_name,))
if existing_dept:
# 科室存在获取department_id
department_id = existing_dept[0]['id']
logger.info(f"[更新作者] 科室已存在使用现有ID: {department_id}")
else:
# 科室不存在,插入新科室
insert_dept_sql = "INSERT INTO ai_departments (department_name) VALUES (%s)"
department_id = db_manager.execute_insert(insert_dept_sql, (department_name,))
logger.info(f"[更新作者] 创建新科室ID: {department_id}")
# 更新作者
update_sql = """UPDATE ai_authors SET
author_name = %s, department_name = %s, department_id = %s, title = %s,
hospital = %s, specialty = %s, introduction = %s, avatar_url = %s,
status = %s, toutiao_cookie = %s, toutiao_images_cookie = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s"""
db_manager.execute_update(update_sql,
(author_name, department_name, department_id, title,
hospital, specialty, introduction, avatar_url,
status, toutiao_cookie, toutiao_images_cookie, author_id))
# 查询更新后作者
select_sql = """SELECT id, author_name, department_name, title, hospital, specialty,
introduction, toutiao_cookie, toutiao_images_cookie, avatar_url, status, created_at, updated_at
FROM ai_authors WHERE id = %s"""
updated_author = db_manager.execute_query(select_sql, (author_id,))
return jsonify({
'code': 200,
'data': updated_author[0],
'message': '更新成功',
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"更新作者失败: {e}")
return jsonify({'code': 500, 'message': f'更新失败: {str(e)}'}), 500
@author_bp.route('/<int:author_id>', methods=['DELETE'])
@require_auth
@require_role(['admin'])
def delete_author(author_id):
"""删除作者"""
try:
db_manager = get_db_manager()
# 检查存在
check_sql = "SELECT id, author_name FROM ai_authors WHERE id = %s"
existing = db_manager.execute_query(check_sql, (author_id,))
if not existing:
return jsonify({'code': 404, 'message': '作者不存在'}), 404
author_name = existing[0]['author_name']
# 检查引用(检查是否有文章引用该作者)
article_check_sql = "SELECT COUNT(*) as count FROM ai_articles WHERE author_name = %s"
article_count = db_manager.execute_query(article_check_sql, (author_name,))
if article_count[0]['count'] > 0:
return jsonify({'code': 400, 'message': f'作者被文章引用,无法删除'}), 400
# 删除作者
delete_sql = "DELETE FROM ai_authors WHERE id = %s"
db_manager.execute_update(delete_sql, (author_id,))
return jsonify({
'code': 200,
'message': '删除成功',
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"删除作者失败: {e}")
return jsonify({'code': 500, 'message': f'删除失败: {str(e)}'}), 500
@author_bp.route('/departments', methods=['GET'])
@require_auth
def get_author_departments():
"""获取所有作者科室列表"""
try:
db_manager = get_db_manager()
sql = "SELECT DISTINCT department FROM ai_authors WHERE department IS NOT NULL AND department != '' ORDER BY department"
departments = db_manager.execute_query(sql)
department_list = [dept['department'] for dept in departments]
return jsonify({
'code': 200,
'data': department_list,
'message': '获取成功',
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"获取科室列表失败: {e}")
return jsonify({'code': 500, 'message': f'获取失败: {str(e)}'}), 500
@author_bp.route('/search', methods=['GET'])
@require_auth
def search_authors():
"""搜索作者(用于下拉选择等)"""
try:
keyword = request.args.get('keyword', '').strip()
limit = int(request.args.get('limit', 20))
if not keyword:
return jsonify({
'code': 200,
'data': [],
'message': '获取成功',
'timestamp': int(datetime.now().timestamp() * 1000)
})
db_manager = get_db_manager()
sql = """SELECT id, author_name, department, title, hospital
FROM ai_authors
WHERE status = 'active' AND (author_name LIKE %s OR department LIKE %s)
ORDER BY author_name LIMIT %s"""
search_param = f"%{keyword}%"
authors = db_manager.execute_query(sql, (search_param, search_param, limit))
return jsonify({
'code': 200,
'data': authors,
'message': '获取成功',
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"搜索作者失败: {e}")
return jsonify({'code': 500, 'message': f'搜索失败: {str(e)}'}), 500
@author_bp.route('/detail_list', methods=['GET'])
@require_auth
def get_detail_list_authors():
"""获取作者详细列表分页直接查询ai_authors表不返回app_token字段"""
try:
page = int(request.args.get('page', 1))
size = int(request.args.get('size', 10))
offset = (page - 1) * size
# 搜索参数
search = request.args.get('search', '').strip()
department = request.args.get('department', '').strip()
status = request.args.get('status', '').strip()
channel = request.args.get('channel', '')
db_manager = get_db_manager()
# 获取当前登录用户(仅用于权限验证)
current_user = AuthUtils.get_current_user()
logger.info(f"[get_detail_list_authors] 获取到的用户信息: {current_user}")
if not current_user:
logger.error("[get_detail_list_authors] 未获取到用户信息")
return jsonify({'code': 401, 'message': '未登录或用户信息缺失'}), 401
# 构建查询条件
where_conditions = []
params = []
if search:
where_conditions.append("(author_name LIKE %s OR hospital LIKE %s OR title LIKE %s OR department_name LIKE %s)")
search_param = f"%{search}%"
params.extend([search_param, search_param, search_param, search_param])
if department:
where_conditions.append("department = %s")
params.append(department)
if status:
where_conditions.append("status = %s")
params.append(status)
if channel:
where_conditions.append("channel = %s")
params.append(channel)
# 构建WHERE子句
where_clause = ""
if where_conditions:
where_clause = "WHERE " + " AND ".join(where_conditions)
# 查询作者总数(用于分页)
count_sql = f"SELECT COUNT(*) as total FROM ai_authors {where_clause}"
logger.info(f"[get_detail_list_authors] 执行计数查询: {count_sql}, 参数: {params}")
count_result = db_manager.execute_query(count_sql, params)
total_authors = count_result[0]['total'] if count_result else 0
logger.info(f"[get_detail_list_authors] 查询到的总数: {total_authors}")
# 查询作者详细信息排除app_token字段
author_sql = f"""SELECT id as author_id, author_name, app_id,
department_id, department_name, department, toutiao_cookie,
title, hospital, specialty, introduction,
avatar_url, status, created_at, updated_at
FROM ai_authors {where_clause}
ORDER BY id DESC
LIMIT %s OFFSET %s"""
author_params = params + [size, offset]
logger.info(f"[get_detail_list_authors] 执行作者查询: {author_sql}, 参数: {author_params}")
authors = db_manager.execute_query(author_sql, author_params)
logger.info(f"[get_detail_list_authors] 查询到的作者数量: {len(authors)}")
# 处理时间戳格式
for author in authors:
# 确保时间戳格式正确
if author.get('created_at'):
author['created_at'] = int(author['created_at'].timestamp() * 1000)
if author.get('updated_at'):
author['updated_at'] = int(author['updated_at'].timestamp() * 1000)
return jsonify({
'code': 200,
'data': {
'list': authors,
'page': page,
'size': size,
'total': total_authors
},
'message': '获取成功',
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
import traceback
logger.error(f"获取作者详细列表失败: {e}")
logger.error(f"错误堆栈: {traceback.format_exc()}")
return jsonify({'code': 500, 'message': f'获取失败: {str(e)}'}), 500