Files
ai_wht_B/flask_wht_server_search.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

1128 lines
49 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
图文动态批量生产系统 v2.0
Flask后端API服务器 - 总入口
"""
from flask import Flask, request, jsonify, redirect
from flask_cors import CORS
import logging
import time
import os
from datetime import datetime, timedelta
from logging.handlers import TimedRotatingFileHandler
import json # Added for json.dumps
import random
# 导入统一日志配置
from log_config import setup_article_server_search_logger
# 导入数据库配置
from database_config import get_db_manager, DB_CONFIG
# 导入各个接口模块
from auth_routes import auth_bp
from user_routes import user_bp
from article_routes import article_bp
from statistics_routes import statistics_bp
from image_routes import image_bp
# 导入认证工具
from auth_utils import AuthUtils, require_auth
# 导入whoosh搜索模块
from whoosh_search_tags import tags_image_search, init_search_engine, enable_hot_reload
# 创建Flask应用
app = Flask(__name__, static_folder='public', static_url_path='')
# 初始化日志系统
logger = setup_article_server_search_logger()
# 服务初始化函数(参考搜索服务的初始化方式)
def initialize_service():
"""应用启动时预加载Whoosh索引数据避免首次请求时延迟并打印总耗时"""
start_ts = time.time()
try:
logger.info("正在初始化标签搜索引擎...")
# 禁用热加载避免首次API调用时再次触发一次耗时的重载
try:
enable_hot_reload(False)
logger.info("已暂时禁用标签搜索热加载功能")
except Exception as _e:
logger.warning(f"禁用热加载失败: {str(_e)}")
init_search_engine()
logger.info("标签搜索引擎初始化完成")
except Exception as e:
logger.error(f"标签搜索引擎初始化失败: {str(e)}", exc_info=True)
finally:
elapsed_ms = (time.time() - start_ts) * 1000
logger.info(f"标签搜索引擎初始化总耗时: {elapsed_ms:.2f} ms")
# 缓存表结构检查结果,避免频繁查询
_has_article_tags_coze_tag = None
def has_article_tags_coze_tag_column() -> bool:
"""检测 ai_article_tags 表是否存在 coze_tag 字段(带缓存)。"""
global _has_article_tags_coze_tag
if _has_article_tags_coze_tag is not None:
return _has_article_tags_coze_tag
try:
dbm = get_db_manager()
sql = (
"SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.COLUMNS "
"WHERE TABLE_SCHEMA=%s AND TABLE_NAME='ai_article_tags' AND COLUMN_NAME='coze_tag'"
)
db_name = DB_CONFIG.get('database')
res = dbm.execute_query(sql, (db_name,))
_has_article_tags_coze_tag = bool(res and res[0].get('cnt', 0) > 0)
logger.info(f"[表结构检测] ai_article_tags.coze_tag 存在: {_has_article_tags_coze_tag}")
except Exception as e:
logger.warning(f"[表结构检测] 检测 ai_article_tags.coze_tag 失败: {e}")
_has_article_tags_coze_tag = False
return _has_article_tags_coze_tag
def get_article_tags_column_info():
"""获取 ai_article_tags 表关键列的存在性与可空性信息。"""
try:
dbm = get_db_manager()
db_name = DB_CONFIG.get('database')
cols = ('coze_tag', 'tag_id', 'tag_name')
placeholders = ','.join(['%s'] * len(cols))
sql = (
"SELECT COLUMN_NAME, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS "
"WHERE TABLE_SCHEMA=%s AND TABLE_NAME='ai_article_tags' AND COLUMN_NAME IN (" + placeholders + ")"
)
rows = dbm.execute_query(sql, (db_name, *cols))
info = {c: {'exists': False, 'nullable': True} for c in cols}
for r in rows:
name = r.get('COLUMN_NAME')
nullable = (r.get('IS_NULLABLE', 'YES') == 'YES')
info[name] = {'exists': True, 'nullable': nullable}
return info
except Exception as e:
logger.warning(f"[表结构检测] 获取 ai_article_tags 列信息失败: {e}")
return {c: {'exists': False, 'nullable': True} for c in ('coze_tag', 'tag_id', 'tag_name')}
def has_fk_article_tags_tag_id() -> bool:
"""检测 ai_article_tags.tag_id 是否存在外键约束。"""
try:
dbm = get_db_manager()
db_name = DB_CONFIG.get('database')
sql = (
"SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "
"WHERE TABLE_SCHEMA=%s AND TABLE_NAME='ai_article_tags' AND COLUMN_NAME='tag_id' "
"AND REFERENCED_TABLE_NAME IS NOT NULL"
)
res = dbm.execute_query(sql, (db_name,))
return bool(res and res[0].get('cnt', 0) > 0)
except Exception as e:
logger.warning(f"[表结构检测] 检测 ai_article_tags.tag_id 外键失败: {e}")
return False
def get_article_images_column_info():
"""获取 ai_article_images 表关键列信息(是否存在与可空性)。"""
try:
dbm = get_db_manager()
db_name = DB_CONFIG.get('database')
# 检测所有可能的字段,包括新增的字段
cols = ('image_id', 'image_url', 'sort_order', 'keywords_id', 'keywords_name', 'department_id', 'department_name')
placeholders = ','.join(['%s'] * len(cols))
sql = (
"SELECT COLUMN_NAME, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS "
"WHERE TABLE_SCHEMA=%s AND TABLE_NAME='ai_article_images' AND COLUMN_NAME IN (" + placeholders + ")"
)
rows = dbm.execute_query(sql, (db_name, *cols))
info = {c: {'exists': False, 'nullable': True} for c in cols}
for r in rows:
name = r.get('COLUMN_NAME')
nullable = (r.get('IS_NULLABLE', 'YES') == 'YES')
info[name] = {'exists': True, 'nullable': nullable}
return info
except Exception as e:
logger.warning(f"[表结构检测] 获取 ai_article_images 列信息失败: {e}")
return {c: {'exists': False, 'nullable': True} for c in ('image_id', 'image_url', 'sort_order', 'keywords_id', 'keywords_name', 'department_id', 'department_name')}
def has_fk_article_images_image_id() -> bool:
"""检测 ai_article_images.image_id 是否存在外键约束。"""
try:
dbm = get_db_manager()
db_name = DB_CONFIG.get('database')
sql = (
"SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "
"WHERE TABLE_SCHEMA=%s AND TABLE_NAME='ai_article_images' AND COLUMN_NAME='image_id' "
"AND REFERENCED_TABLE_NAME IS NOT NULL"
)
res = dbm.execute_query(sql, (db_name,))
return bool(res and res[0].get('cnt', 0) > 0)
except Exception as e:
logger.warning(f"[表结构检测] 检测 ai_article_images.image_id 外键失败: {e}")
return False
# 配置CORS - 允许所有域名的跨域请求
CORS(app,
origins=["http://127.0.0.1:8321", "http://localhost:8321", "http://127.0.0.1:3000", "http://localhost:3000", "*"],
methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type", "Authorization", "Accept", "X-Requested-With"],
supports_credentials=True,
max_age=3600)
# 注册蓝图
app.register_blueprint(auth_bp)
app.register_blueprint(user_bp)
app.register_blueprint(article_bp)
app.register_blueprint(statistics_bp)
app.register_blueprint(image_bp)
# 日志查询接口
@app.route('/api/logs', methods=['GET'])
def get_logs():
"""获取操作日志"""
try:
# 获取查询参数
page = int(request.args.get('page', 1))
size = int(request.args.get('size', 10))
# 计算偏移量
offset = (page - 1) * size
db_manager = get_db_manager()
# 查询总数
count_sql = "SELECT COUNT(*) as total FROM ai_logs"
count_result = db_manager.execute_query(count_sql)
total = count_result[0]['total']
# 查询日志列表
sql = """
SELECT l.id, l.user_id, l.action, l.target_type, l.target_id,
l.description, l.ip_address, l.user_agent, l.status, l.created_at,
u.username, u.real_name
FROM ai_logs l
LEFT JOIN ai_users u ON l.user_id = u.id
ORDER BY l.created_at DESC
LIMIT %s OFFSET %s
"""
logs = db_manager.execute_query(sql, (size, offset))
return jsonify({
'code': 200,
'message': '获取成功',
'data': {
'list': logs,
'total': total,
'page': page,
'size': size
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取日志] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@app.route('/api/logs/file', methods=['GET'])
def get_file_logs():
"""获取文件日志内容"""
try:
# 获取查询参数
log_type = request.args.get('type', 'article') # article, error, whoosh
lines = int(request.args.get('lines', 100)) # 默认获取最后100行
date = request.args.get('date', '') # 可选的日期参数格式2025-08-14
# 限制最大行数,避免内存问题
if lines > 1000:
lines = 1000
# 构建日志文件路径
log_dir = 'logs'
if log_type == 'article':
if date:
log_file = f'{log_dir}/article_server.log.{date}'
else:
log_file = f'{log_dir}/article_server.log'
elif log_type == 'error':
if date:
log_file = f'{log_dir}/article_server_error.log.{date}'
else:
log_file = f'{log_dir}/article_server_error.log'
elif log_type == 'whoosh':
if date:
log_file = f'{log_dir}/whoosh_search_tags.log.{date}'
else:
log_file = f'{log_dir}/whoosh_search_tags.log'
else:
return jsonify({
'code': 400,
'message': '不支持的日志类型',
'data': None
}), 400
# 检查文件是否存在
if not os.path.exists(log_file):
return jsonify({
'code': 404,
'message': f'日志文件不存在: {log_file}',
'data': None
}), 404
# 读取日志文件的最后N行
try:
with open(log_file, 'r', encoding='utf-8') as f:
# 读取所有行
all_lines = f.readlines()
# 获取最后N行
recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
# 获取文件信息
file_stat = os.stat(log_file)
file_size = file_stat.st_size
file_mtime = datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
return jsonify({
'code': 200,
'message': '获取成功',
'data': {
'log_type': log_type,
'file_path': log_file,
'file_size': file_size,
'file_mtime': file_mtime,
'total_lines': len(all_lines),
'returned_lines': len(recent_lines),
'lines': [line.rstrip('\n') for line in recent_lines] # 移除换行符
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except UnicodeDecodeError:
# 如果UTF-8解码失败尝试其他编码
try:
with open(log_file, 'r', encoding='gbk') as f:
all_lines = f.readlines()
recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
return jsonify({
'code': 200,
'message': '获取成功使用GBK编码',
'data': {
'log_type': log_type,
'file_path': log_file,
'total_lines': len(all_lines),
'returned_lines': len(recent_lines),
'lines': [line.rstrip('\n') for line in recent_lines]
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as decode_error:
logger.error(f"[文件日志] 编码解析失败: {str(decode_error)}")
return jsonify({
'code': 500,
'message': f'文件编码解析失败: {str(decode_error)}',
'data': None
}), 500
except Exception as e:
logger.error(f"[文件日志] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@app.route('/api/logs/files', methods=['GET'])
def get_log_files():
"""获取可用的日志文件列表"""
try:
log_dir = 'logs'
if not os.path.exists(log_dir):
return jsonify({
'code': 404,
'message': '日志目录不存在',
'data': None
}), 404
files = []
for filename in os.listdir(log_dir):
if filename.endswith('.log') or '.log.' in filename:
file_path = os.path.join(log_dir, filename)
if os.path.isfile(file_path):
file_stat = os.stat(file_path)
files.append({
'filename': filename,
'size': file_stat.st_size,
'size_mb': round(file_stat.st_size / 1024 / 1024, 2),
'modified': datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'type': 'error' if 'error' in filename else ('whoosh' if 'whoosh' in filename else 'article')
})
# 按修改时间倒序排列
files.sort(key=lambda x: x['modified'], reverse=True)
return jsonify({
'code': 200,
'message': '获取成功',
'data': {
'files': files,
'total': len(files)
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[日志文件列表] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
# 请求前拦截器 - 记录所有API访问
@app.before_request
def log_request_info():
"""记录每个请求的基本信息"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
user_agent = request.headers.get('User-Agent', '未知')
logger.info(f"[API访问] {request.method} {request.path} - IP: {client_ip} - User-Agent: {user_agent[:100]}")
# 记录请求参数GET请求
if request.args:
logger.debug(f"[请求参数] GET参数: {dict(request.args)}")
# 处理OPTIONS预检请求
@app.route('/api/<path:path>', methods=['OPTIONS'])
def handle_options(path):
"""处理所有API的OPTIONS预检请求"""
response = jsonify({'message': 'OK'})
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization,Accept,X-Requested-With')
response.headers.add('Access-Control-Allow-Methods', 'GET,POST,PUT,DELETE,OPTIONS')
response.headers.add('Access-Control-Allow-Credentials', 'true')
response.headers.add('Access-Control-Max-Age', '3600')
return response
# 请求后拦截器 - 记录响应状态
@app.after_request
def log_response_info(response):
"""记录响应状态"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[API响应] {request.method} {request.path} - IP: {client_ip} - 状态码: {response.status_code}")
return response
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查接口"""
try:
# 检查数据库连接
db_manager = get_db_manager()
db_manager.execute_query("SELECT 1")
return jsonify({
'code': 200,
'message': '服务正常',
'data': {
'status': 'healthy',
'timestamp': int(datetime.now().timestamp() * 1000),
'version': '2.0'
}
})
except Exception as e:
logger.error(f"健康检查失败: {e}")
return jsonify({
'code': 500,
'message': '服务异常',
'data': {
'status': 'unhealthy',
'error': str(e),
'timestamp': int(datetime.now().timestamp() * 1000)
}
}), 500
@app.route('/', methods=['GET'])
def index():
"""根路径接口 - 重定向到dashboard.html"""
return redirect('/dashboard.html')
@app.route('/<path:filename>')
def serve_static(filename):
"""提供静态文件服务按文件类型返回正确的Content-Type"""
try:
import os
from flask import send_from_directory, make_response
static_root = 'public'
file_path = os.path.join(static_root, filename)
if os.path.exists(file_path) and os.path.isfile(file_path):
# 创建响应对象
response = make_response(send_from_directory(static_root, filename))
# 为图片文件设置强缓存策略
file_ext = filename.lower().split('.')[-1] if '.' in filename else ''
if file_ext in ['png', 'jpg', 'jpeg', 'gif', 'webp', 'svg', 'ico', 'bmp']:
# 设置1年的强缓存
response.headers['Cache-Control'] = 'public, max-age=31536000, immutable'
# 设置过期时间1年后
expires_date = datetime.now() + timedelta(days=365)
response.headers['Expires'] = expires_date.strftime('%a, %d %b %Y %H:%M:%S GMT')
# 添加ETag用于缓存验证
response.headers['ETag'] = f'"img-{int(os.path.getmtime(file_path))}-{os.path.getsize(file_path)}"'
logger.debug(f"[静态文件缓存] 为图片文件设置缓存头: {filename}")
# 为CSS/JS文件设置中等缓存策略
elif file_ext in ['css', 'js', 'map']:
response.headers['Cache-Control'] = 'public, max-age=86400' # 1天缓存
expires_date = datetime.now() + timedelta(days=1)
response.headers['Expires'] = expires_date.strftime('%a, %d %b %Y %H:%M:%S GMT')
logger.debug(f"[静态文件缓存] 为CSS/JS文件设置缓存头: {filename}")
# 为字体文件设置长期缓存
elif file_ext in ['woff', 'woff2', 'ttf', 'eot', 'otf']:
response.headers['Cache-Control'] = 'public, max-age=2592000' # 30天缓存
expires_date = datetime.now() + timedelta(days=30)
response.headers['Expires'] = expires_date.strftime('%a, %d %b %Y %H:%M:%S GMT')
logger.debug(f"[静态文件缓存] 为字体文件设置缓存头: {filename}")
return response
else:
return jsonify({
'code': 404,
'message': '文件不存在',
'data': None
}), 404
except Exception as e:
logger.error(f"[静态文件服务] 错误: {str(e)}")
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@app.errorhandler(404)
def not_found(error):
"""404错误处理"""
return jsonify({
'code': 404,
'message': '接口不存在',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
}), 404
@app.errorhandler(405)
def method_not_allowed(error):
"""405错误处理"""
return jsonify({
'code': 405,
'message': '请求方法不允许',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
}), 405
@app.errorhandler(500)
def internal_error(error):
"""500错误处理"""
logger.error(f"服务器内部错误: {error}")
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
}), 500
# 添加日志记录函数
def add_log(user_id, action, target_type, target_id, description, status='success', error_message=None):
"""添加操作日志"""
try:
db_manager = get_db_manager()
sql = """
INSERT INTO ai_logs (user_id, action, target_type, target_id, description,
ip_address, user_agent, status, error_message)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
ip_address = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
user_agent = request.headers.get('User-Agent', '未知')
db_manager.execute_insert(sql, (
user_id, action, target_type, target_id, description,
ip_address, user_agent, status, error_message
))
logger.info(f"[日志记录] {action} - 用户ID: {user_id}, 目标: {target_type}:{target_id}")
except Exception as e:
logger.error(f"[日志记录失败] {e}")
# 接口1upload_article - 提交百家号逻辑插入到数据库ai_articles
@app.route('/api/generate_article', methods=['POST'])
@require_auth
def generate_article():
"""上传文章到数据库"""
try:
# 获取当前用户
current_user = AuthUtils.get_current_user()
if not current_user:
return jsonify({
'code': 401,
'message': '用户未认证',
'data': None
}), 401
# 获取请求数据
data = request.get_json()
logger.info(f"[请求开始] 文章内容: {data}")
if not data:
return jsonify({
'code': 400,
'message': '请求数据为空',
'data': None
}), 400
# 检查是否为更新模式有article_id和batch_id
article_id = data.get('article_id')
batch_id = data.get('batch_id')
is_update_mode = bool(article_id and batch_id)
logger.info(f"[模式检测] article_id: {article_id}, batch_id: {batch_id}, 更新模式: {is_update_mode}")
# 验证必需字段
required_fields = ['title', 'content']
for field in required_fields:
if field not in data:
return jsonify({
'code': 400,
'message': f'缺少必需字段: {field}',
'data': None
}), 400
# 记录请求开始
request_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[请求开始 start] 客户端IP: {client_ip}, 时间: {request_time}")
logger.info(f"[请求开始 start] 文章标题: {data['title']}")
# 记录完整请求数据(调试用)
logger.debug(f"[请求数据] {json.dumps(data, ensure_ascii=False, indent=2)}")
db_manager = get_db_manager()
# 计算字数和图片数量
word_count = len(data['content']) if data['content'] else 0
#image_count = len(data.get('cover_images', [])) if data.get('cover_images') else 0
# 提取context_summarycontent前150字
CONTEXT_SUMMARY_LENGTH = 150
context_summary = data['content'][:CONTEXT_SUMMARY_LENGTH] if data['content'] else ''
# 获取tags用于图片匹配和存储
tags = data.get('tags', '')
# 如果tags是列表转换为字符串
if isinstance(tags, list):
tags = ','.join(tags) if tags else ''
elif not isinstance(tags, str):
tags = str(tags) if tags else ''
# 使用whoosh技术匹配tags图片
##########################################################
##########################################################
matched_images = []
image_tag_ids = [] # 收集image_tag_id到列表
if tags:
try:
logger.info(f"[Tags图片匹配] 开始匹配tags: {tags}")
tags_images = tags_image_search(tags, 3)
logger.info(f"[Tags图片匹配] 返回数据: {json.dumps(tags_images, ensure_ascii=False)}")
if tags_images:
# 提取图片信息(包含 image_id / image_tag_id
for result in tags_images:
#ai_image_tags.image_id
image_id = result.get('image_id')
#ai_image_tags.id
image_tag_id = result.get('image_tag_id')
if image_id:
matched_images.append({
'image_id': image_id,
'image_tag_id': image_tag_id
})
if image_tag_id:
image_tag_ids.append(image_tag_id)
logger.info(f"[image_tag_ids] 返回数据: {json.dumps(image_tag_ids, ensure_ascii=False)}")
logger.info(f"[Tags图片匹配] 找到 {len(matched_images)} 张匹配图片: {matched_images}")
logger.info(f"[Tags图片匹配] 收集到的image_tag_ids: {image_tag_ids}")
else:
logger.info(f"[Tags图片匹配] 搜索无结果")
except Exception as e:
logger.error(f"[Tags图片匹配] 匹配过程中发生错误: {str(e)}")
##########################################################
##########################################################
# 查询ai_image_tags表获取department相关信息
department_name = ''
department_id = 0
department_results = [] # 初始化变量避免UnboundLocalError
if tags and image_tag_ids and len(image_tag_ids) > 0:
try:
# 查询ai_image_tags表获取四个字段
placeholders = ','.join(['%s'] * len(image_tag_ids))
logger.info(f"[image_tag_ids] 返回数据: {json.dumps(image_tag_ids, ensure_ascii=False)}")
logger.info(f"[placeholders] 返回数据: {json.dumps(placeholders, ensure_ascii=False)}")
query_sql = f"""
SELECT id, keywords_id, keywords_name, department_id, department_name, image_url, image_thumb_url
FROM ai_image_tags
WHERE id IN ({placeholders})
"""
logger.info(f"[Department查询] 查询SQL: {query_sql}, 参数: {image_tag_ids}")
department_results = db_manager.execute_query(query_sql, tuple(image_tag_ids))
logger.info(f"[ai_image_tags] 返回数据: {json.dumps(department_results, ensure_ascii=False)}")
if department_results:
department_name = department_results[0]['department_name']
department_id = department_results[0]['department_id']
# 提取department_name和department_id用逗号拼接
logger.info(f"[Department查询] 查询结果: department={department_name}, department_id={department_id}")
else:
logger.info("[Department查询] 未查询到department信息")
except Exception as e:
logger.error(f"[Department查询] 查询ai_image_tags表时发生错误: {str(e)}")
# 保持原有department值不阻断主流程
else:
logger.info("[Department查询] 没有有效的image_tag_id")
##########################################################
##########################################################
# 根据模式选择插入或更新逻辑
if is_update_mode:
# 更新模式:更新现有文章
logger.info(f"[更新模式] 更新文章ID: {article_id}, batch_id: {batch_id}")
# 验证文章是否存在
check_sql = "SELECT id FROM ai_articles WHERE id = %s"
existing_article = db_manager.execute_query(check_sql, (article_id,))
if not existing_article:
return jsonify({
'code': 404,
'message': f'文章ID {article_id} 不存在',
'data': None
}), 404
# 更新文章包含status字段更新为pending_review
update_sql = """
UPDATE ai_articles
SET title = %s, content = %s, context_summary = %s, department_id = %s, department_name = %s,
word_count = %s, image_count = %s, coze_tag = %s, status = %s, updated_at = NOW()
WHERE id = %s
"""
# 获取status参数默认为pending_review符合Coze文章生成的业务逻辑
status = data.get('status', 'pending_review')
db_manager.execute_update(update_sql, (
data['title'],
data['content'],
context_summary,
department_id,
department_name,
word_count,
len(matched_images),
tags,
status,
article_id
))
logger.info(f"[更新成功] 文章ID: {article_id} 已更新")
else:
# 插入模式:创建新文章(保持原有逻辑)
logger.info("[插入模式] 创建新文章")
sql = """
INSERT INTO ai_articles (title, content, context_summary, department_id, department_name, created_user_id,
status, word_count, image_count, coze_tag)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
article_id = db_manager.execute_insert(sql, (
data['title'],
data['content'],
context_summary,
department_id,
department_name,
current_user['user_id'],
'pending_review', # 状态设置为pending_review
word_count,
len(matched_images),
tags # 存储coze_tag
))
##########################################################
##########################################################
# 补充ai_article_tags表数据简化仅记录 article_id 与 coze_tagcreated_at 走默认值)
if tags:
try:
if is_update_mode:
# 更新模式:先删除旧的标签关联,再插入新的
delete_tags_sql = "DELETE FROM ai_article_tags WHERE article_id = %s"
db_manager.execute_update(delete_tags_sql, (article_id,))
logger.info(f'[标签关联-更新] 已删除文章ID {article_id} 的旧标签关联')
# 插入新的标签关联(插入和更新模式都执行)
simple_sql = "INSERT INTO ai_article_tags (article_id, coze_tag) VALUES (%s, %s)"
db_manager.execute_insert(simple_sql, (article_id, tags))
mode_text = "更新" if is_update_mode else "插入"
logger.info(f'[标签关联-{mode_text}] 已写入 ai_article_tags (article_id: {article_id}, coze_tag: {tags})')
except Exception as e:
mode_text = "更新" if is_update_mode else "插入"
logger.warning(f"[标签关联-{mode_text}] 写入 ai_article_tags 失败,已跳过: {str(e)}")
##########################################################
##########################################################
# 补充 ai_article_images 表数据
# ✅ 新增逻辑:从 ai_images 表随机选择2-5张产品图片
product_images = []
total_image_count = 0 # 初始化总图片计数
# 从 ai_articles 表中查询 product_id 和 enterprise_id
try:
article_info_sql = "SELECT product_id, enterprise_id FROM ai_articles WHERE id = %s"
article_info = db_manager.execute_query(article_info_sql, (article_id,))
if article_info and len(article_info) > 0:
product_id = article_info[0].get('product_id', 0)
enterprise_id = article_info[0].get('enterprise_id', 0)
logger.info(f"[产品图片] 从文章表获取: product_id={product_id}, enterprise_id={enterprise_id}")
else:
product_id = 0
enterprise_id = 0
logger.warning(f"[产品图片] 未找到文章ID {article_id} 的信息")
except Exception as e:
product_id = 0
enterprise_id = 0
logger.error(f"[产品图片] 查询文章信息失败: {str(e)}", exc_info=True)
if product_id and enterprise_id:
try:
logger.info(f"[产品图片随机选择] 开始为产品ID {product_id} 选择图片")
# 随机选择2-5张图片使用Python random模块
random_count = random.randint(2, 5)
logger.info(f"[产品图片随机选择] 随机数量: {random_count}")
# 查询该产品下的图片,随机排序
product_images_sql = """
SELECT id, image_url, image_thumb_url
FROM ai_images
WHERE product_id = %s AND enterprise_id = %s AND status = 'active'
ORDER BY RAND()
LIMIT %s
"""
product_images_result = db_manager.execute_query(
product_images_sql,
(product_id, enterprise_id, random_count)
)
if product_images_result:
product_images = product_images_result
logger.info(f"[产品图片随机选择] 成功选择 {len(product_images)} 张图片")
# 插入产品图片到 ai_article_images
for i, img in enumerate(product_images):
insert_sql = """
INSERT INTO ai_article_images
(enterprise_id, article_id, image_id, image_url, image_thumb_url,
image_tag_id, sort_order, image_source)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
db_manager.execute_insert(insert_sql, (
enterprise_id,
article_id,
img['id'],
img['image_url'],
img['image_thumb_url'],
0,
i + 1,
2
))
# 更新文章的 image_count
update_count_sql = "UPDATE ai_articles SET image_count = %s WHERE id = %s"
db_manager.execute_update(update_count_sql, (len(product_images), article_id))
logger.info(f"[产品图片关联] 已更新文章 {article_id} 的 image_count 为 {len(product_images)}")
else:
logger.info(f"[产品图片随机选择] 产品ID {product_id} 下没有可用图片")
except Exception as e:
logger.error(f"[产品图片随机选择] 失败: {str(e)}", exc_info=True)
else:
logger.info(f"[产品图片随机选择] 缺少product_id或enterprise_id跳过")
if department_results and matched_images:
try:
if is_update_mode:
# 更新模式:检查是否已有封面图
# ⭐ 查询当前文章是否已有封面图sort_order=1 且 image_source!=1
check_cover_sql = """
SELECT id, sort_order FROM ai_article_images
WHERE article_id = %s AND sort_order = 1 AND image_source = 4
ORDER BY id DESC LIMIT 1
"""
existing_cover = db_manager.execute_query(check_cover_sql, (article_id,))
has_cover = bool(existing_cover)
if has_cover:
logger.info(f'[图片关联-更新] 检测到已存在封面图sort_order=1将从2开始编号')
# ⭐ 只删除 image_source=1 的图片tags匹配的图片保留封面图
delete_images_sql = "DELETE FROM ai_article_images WHERE article_id = %s AND image_source < 4"
db_manager.execute_update(delete_images_sql, (article_id,))
logger.info(f'[图片关联-更新] 已删除文章ID {article_id} 的旧tags图片保留封面图')
start_order = 2 # 从2开始编号
else:
logger.info(f'[图片关联-更新] 未检测到封面图将从1开始编号')
# 删除所有图片
delete_images_sql = "DELETE FROM ai_article_images WHERE article_id = %s AND image_source < 4"
db_manager.execute_update(delete_images_sql, (article_id,))
logger.info(f'[图片关联-更新] 已删除文章ID {article_id} 的所有旧图片')
start_order = 1 # 从1开始编号
else:
# 插入模式从1开始编号
start_order = 1
# 检测表结构,动态调整插入字段
table_info = get_article_images_column_info()
has_keywords_id = table_info.get('keywords_id', {}).get('exists', False)
has_keywords_name = table_info.get('keywords_name', {}).get('exists', False)
has_department_id = table_info.get('department_id', {}).get('exists', False)
has_department_name = table_info.get('department_name', {}).get('exists', False)
# 创建department_results的字典映射以image_tag_id为key
dept_dict = {}
for dept in department_results:
# 假设department_results中有id字段对应image_tag_id
dept_dict[dept.get('id')] = dept
# 根据表结构构建SQL和参数
if has_keywords_id and has_keywords_name and has_department_id and has_department_name:
# 完整模式:包含所有新增字段
sql = """
INSERT INTO ai_article_images (article_id, image_id, image_url, image_thumb_url, image_tag_id, keywords_id, keywords_name, department_id, department_name, sort_order,image_source)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
for i, img in enumerate(matched_images):
image_tag_id = img.get('image_tag_id')
dept_info = dept_dict.get(image_tag_id, {})
db_manager.execute_insert(sql, (
article_id,
img.get('image_id'),
dept_info.get('image_url', ''),
dept_info.get('image_thumb_url', ''),
image_tag_id,
dept_info.get('keywords_id', 0),
dept_info.get('keywords_name', ''),
dept_info.get('department_id', 0),
dept_info.get('department_name', ''),
start_order + i, # ⭐ 使用动态起始编号
1
))
elif has_keywords_id:
# 简化模式:只包含 keywords_id
sql = """
INSERT INTO ai_article_images (article_id, image_id, image_url, image_thumb_url, image_tag_id, keywords_id, sort_order,image_source)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
for i, img in enumerate(matched_images):
image_tag_id = img.get('image_tag_id')
dept_info = dept_dict.get(image_tag_id, {})
keywords_id = dept_info.get('keywords_id', 0)
db_manager.execute_insert(sql, (
article_id,
img.get('image_id'),
dept_info.get('image_url', ''),
dept_info.get('image_thumb_url', ''),
image_tag_id,
keywords_id,
start_order + i, # ⭐ 使用动态起始编号
1
))
else:
# 基础模式:不包含新增字段
sql = """
INSERT INTO ai_article_images (article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order,image_source)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
for i, img in enumerate(matched_images):
db_manager.execute_insert(sql, (
article_id,
img.get('image_id'),
dept_info.get('image_url', ''),
dept_info.get('image_thumb_url', ''),
img.get('image_tag_id'),
start_order + i, # ⭐ 使用动态起始编号
1
))
# 记录插入模式
table_mode = "完整模式" if (has_keywords_id and has_keywords_name and has_department_id and has_department_name) else ("简化模式" if has_keywords_id else "基础模式")
operation_mode = "更新" if is_update_mode else "插入"
total_image_count += len(matched_images)
logger.info(f"[图片关联-{operation_mode}] 成功写入 {len(matched_images)} 条文章-图片关联({table_mode}sort_order从{start_order}开始")
except Exception as e:
operation_mode = "更新" if is_update_mode else "插入"
logger.error(f"[图片关联-{operation_mode}] 写入 ai_article_images 时发生错误: {str(e)}")
# 更新文章的总图片数
if total_image_count > 0:
try:
update_count_sql = "UPDATE ai_articles SET image_count = %s WHERE id = %s"
db_manager.execute_update(update_count_sql, (total_image_count, article_id))
logger.info(f"[图片计数更新] 文章 {article_id} 的 image_count 已更新为 {total_image_count}")
except Exception as e:
logger.error(f"[图片计数更新] 失败: {str(e)}")
# 记录成功日志
if is_update_mode:
add_log(
current_user['user_id'],
'update_article',
'article',
article_id,
f"更新文章: {data['title']} (batch_id: {batch_id})",
'success'
)
logger.info(f"[更新成功] 文章《{data['title']}》已更新ID: {article_id}, batch_id: {batch_id}")
success_message = '文章更新成功'
success_errmsg = '更新成功'
else:
add_log(
current_user['user_id'],
'create_article',
'article',
article_id,
f"创建文章: {data['title']}",
'success'
)
logger.info(f"[发布成功] 文章《{data['title']}》已保存到数据库ID: {article_id}")
success_message = '文章保存成功'
success_errmsg = '发布成功'
return jsonify({
'errno': 0,
'errmsg': success_errmsg,
'message': success_message,
'data': {
'article_id': article_id,
'batch_id': batch_id if is_update_mode else None,
'title': data['title'],
'department': department_name,
'status': 'pending_review',
'word_count': word_count,
'image_count': len(matched_images),
'coze_tag': tags,
'matched_images': matched_images,
'operation_mode': 'update' if is_update_mode else 'create'
},
'account_info': {
'user_name': current_user.get('real_name') or current_user.get('username', ''),
'department': current_user.get('department', '')
}
})
except Exception as e:
logger.error(f"[系统错误] 上传文章时发生未知错误: {str(e)}", exc_info=True)
# 记录错误日志
if 'current_user' in locals():
add_log(
current_user['user_id'],
'create_article',
'article',
None,
f"创建文章失败: {data.get('title', '未知标题')}",
'error',
str(e)
)
return jsonify({
'errno': 500,
'errmsg': '发布失败',
'message': f'服务器内部错误: {str(e)}',
'data': None
}), 500
# 开发环境启动入口
if __name__ == '__main__':
print("=" * 60)
print("图文动态批量生产系统 v2.0 启动中...")
print("⚠️ 警告: 当前使用开发服务器,不建议在生产环境中使用")
print("⚠️ 生产环境请使用: gunicorn、waitress 等WSGI服务器")
print("-" * 60)
print("服务地址: http://127.0.0.1:8321")
print("API文档: http://127.0.0.1:8321")
print("健康检查: http://127.0.0.1:8321/health")
print("-" * 60)
print("接口模块:")
print(" - 用户认证: /api/auth")
print(" - 用户管理: /api/users")
print(" - 文章管理: /api/articles")
print(" - 统计接口: /api/statistics")
print("-" * 60)
print("日志配置信息:")
print(f" 主日志文件: logs/baijiahao_server.log")
print(f" 错误日志文件: logs/baijiahao_error.log")
print(f" 日志切割: 每日午夜自动切割")
print(f" 日志保留: 主日志30天错误日志90天")
print("-" * 60)
print("数据库配置:")
print(f" 主机: 127.0.0.1:3306")
print(f" 数据库: ai_article")
print(f" 用户: root")
print("-" * 60)
print("生产部署选项:")
print(" 1. Waitress: waitress-serve --host=0.0.0.0 --port=8321 flask_article_server_search:app")
print(" 2. Gunicorn: gunicorn --bind 0.0.0.0:8321 --workers 4 flask_article_server_search:app")
print("=" * 60)
try:
# 预加载Whoosh索引
initialize_service()
# 测试数据库连接
logger.info("测试数据库连接...")
db_manager = get_db_manager()
db_manager.execute_query("SELECT 1")
logger.info("数据库连接成功")
# 启动开发服务器
logger.warning("使用Flask开发服务器启动仅用于开发测试")
app.run(
host='0.0.0.0',
port=8321,
debug=False,
threaded=True
)
except KeyboardInterrupt:
logger.info("收到中断信号,正在关闭服务器...")
print("\n服务器已关闭")
except Exception as e:
logger.error(f"服务器启动失败: {str(e)}", exc_info=True)
print(f"服务器启动失败: {str(e)}")
finally:
logger.info("图文动态批量生产系统已停止")
print("日志已保存到 logs/ 目录")
# 生产环境初始化(当作为模块导入时执行)
else:
print('图文动态批量生产系统 v2.0 模块已加载')
# 当作为模块被导入如生产WSGI时也预加载Whoosh索引
try:
#print(1)
initialize_service()
except Exception:
pass