#!/usr/bin/env python # -*- coding: utf-8 -*- """ 图文动态批量生产系统 v2.0 Flask后端API服务器 - 总入口 """ from flask import Flask, request, jsonify, redirect from flask_cors import CORS import logging import time import os from datetime import datetime, timedelta from logging.handlers import TimedRotatingFileHandler import json # Added for json.dumps import random # 导入统一日志配置 from log_config import setup_article_server_search_logger # 导入数据库配置 from database_config import get_db_manager, DB_CONFIG # 导入各个接口模块 from auth_routes import auth_bp from user_routes import user_bp from article_routes import article_bp from statistics_routes import statistics_bp from image_routes import image_bp # 导入认证工具 from auth_utils import AuthUtils, require_auth # 导入whoosh搜索模块 from whoosh_search_tags import tags_image_search, init_search_engine, enable_hot_reload # 创建Flask应用 app = Flask(__name__, static_folder='public', static_url_path='') # 初始化日志系统 logger = setup_article_server_search_logger() # 服务初始化函数(参考搜索服务的初始化方式) def initialize_service(): """应用启动时预加载Whoosh索引数据,避免首次请求时延迟,并打印总耗时""" start_ts = time.time() try: logger.info("正在初始化标签搜索引擎...") # 禁用热加载,避免首次API调用时再次触发一次耗时的重载 try: enable_hot_reload(False) logger.info("已暂时禁用标签搜索热加载功能") except Exception as _e: logger.warning(f"禁用热加载失败: {str(_e)}") init_search_engine() logger.info("标签搜索引擎初始化完成") except Exception as e: logger.error(f"标签搜索引擎初始化失败: {str(e)}", exc_info=True) finally: elapsed_ms = (time.time() - start_ts) * 1000 logger.info(f"标签搜索引擎初始化总耗时: {elapsed_ms:.2f} ms") # 缓存表结构检查结果,避免频繁查询 _has_article_tags_coze_tag = None def has_article_tags_coze_tag_column() -> bool: """检测 ai_article_tags 表是否存在 coze_tag 字段(带缓存)。""" global _has_article_tags_coze_tag if _has_article_tags_coze_tag is not None: return _has_article_tags_coze_tag try: dbm = get_db_manager() sql = ( "SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA=%s AND TABLE_NAME='ai_article_tags' AND COLUMN_NAME='coze_tag'" ) db_name = DB_CONFIG.get('database') res = dbm.execute_query(sql, (db_name,)) _has_article_tags_coze_tag = bool(res and res[0].get('cnt', 0) > 0) logger.info(f"[表结构检测] ai_article_tags.coze_tag 存在: {_has_article_tags_coze_tag}") except Exception as e: logger.warning(f"[表结构检测] 检测 ai_article_tags.coze_tag 失败: {e}") _has_article_tags_coze_tag = False return _has_article_tags_coze_tag def get_article_tags_column_info(): """获取 ai_article_tags 表关键列的存在性与可空性信息。""" try: dbm = get_db_manager() db_name = DB_CONFIG.get('database') cols = ('coze_tag', 'tag_id', 'tag_name') placeholders = ','.join(['%s'] * len(cols)) sql = ( "SELECT COLUMN_NAME, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA=%s AND TABLE_NAME='ai_article_tags' AND COLUMN_NAME IN (" + placeholders + ")" ) rows = dbm.execute_query(sql, (db_name, *cols)) info = {c: {'exists': False, 'nullable': True} for c in cols} for r in rows: name = r.get('COLUMN_NAME') nullable = (r.get('IS_NULLABLE', 'YES') == 'YES') info[name] = {'exists': True, 'nullable': nullable} return info except Exception as e: logger.warning(f"[表结构检测] 获取 ai_article_tags 列信息失败: {e}") return {c: {'exists': False, 'nullable': True} for c in ('coze_tag', 'tag_id', 'tag_name')} def has_fk_article_tags_tag_id() -> bool: """检测 ai_article_tags.tag_id 是否存在外键约束。""" try: dbm = get_db_manager() db_name = DB_CONFIG.get('database') sql = ( "SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE " "WHERE TABLE_SCHEMA=%s AND TABLE_NAME='ai_article_tags' AND COLUMN_NAME='tag_id' " "AND REFERENCED_TABLE_NAME IS NOT NULL" ) res = dbm.execute_query(sql, (db_name,)) return bool(res and res[0].get('cnt', 0) > 0) except Exception as e: logger.warning(f"[表结构检测] 检测 ai_article_tags.tag_id 外键失败: {e}") return False def get_article_images_column_info(): """获取 ai_article_images 表关键列信息(是否存在与可空性)。""" try: dbm = get_db_manager() db_name = DB_CONFIG.get('database') # 检测所有可能的字段,包括新增的字段 cols = ('image_id', 'image_url', 'sort_order', 'keywords_id', 'keywords_name', 'department_id', 'department_name') placeholders = ','.join(['%s'] * len(cols)) sql = ( "SELECT COLUMN_NAME, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA=%s AND TABLE_NAME='ai_article_images' AND COLUMN_NAME IN (" + placeholders + ")" ) rows = dbm.execute_query(sql, (db_name, *cols)) info = {c: {'exists': False, 'nullable': True} for c in cols} for r in rows: name = r.get('COLUMN_NAME') nullable = (r.get('IS_NULLABLE', 'YES') == 'YES') info[name] = {'exists': True, 'nullable': nullable} return info except Exception as e: logger.warning(f"[表结构检测] 获取 ai_article_images 列信息失败: {e}") return {c: {'exists': False, 'nullable': True} for c in ('image_id', 'image_url', 'sort_order', 'keywords_id', 'keywords_name', 'department_id', 'department_name')} def has_fk_article_images_image_id() -> bool: """检测 ai_article_images.image_id 是否存在外键约束。""" try: dbm = get_db_manager() db_name = DB_CONFIG.get('database') sql = ( "SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE " "WHERE TABLE_SCHEMA=%s AND TABLE_NAME='ai_article_images' AND COLUMN_NAME='image_id' " "AND REFERENCED_TABLE_NAME IS NOT NULL" ) res = dbm.execute_query(sql, (db_name,)) return bool(res and res[0].get('cnt', 0) > 0) except Exception as e: logger.warning(f"[表结构检测] 检测 ai_article_images.image_id 外键失败: {e}") return False # 配置CORS - 允许所有域名的跨域请求 CORS(app, origins=["http://127.0.0.1:8321", "http://localhost:8321", "http://127.0.0.1:3000", "http://localhost:3000", "*"], methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["Content-Type", "Authorization", "Accept", "X-Requested-With"], supports_credentials=True, max_age=3600) # 注册蓝图 app.register_blueprint(auth_bp) app.register_blueprint(user_bp) app.register_blueprint(article_bp) app.register_blueprint(statistics_bp) app.register_blueprint(image_bp) # 日志查询接口 @app.route('/api/logs', methods=['GET']) def get_logs(): """获取操作日志""" try: # 获取查询参数 page = int(request.args.get('page', 1)) size = int(request.args.get('size', 10)) # 计算偏移量 offset = (page - 1) * size db_manager = get_db_manager() # 查询总数 count_sql = "SELECT COUNT(*) as total FROM ai_logs" count_result = db_manager.execute_query(count_sql) total = count_result[0]['total'] # 查询日志列表 sql = """ SELECT l.id, l.user_id, l.action, l.target_type, l.target_id, l.description, l.ip_address, l.user_agent, l.status, l.created_at, u.username, u.real_name FROM ai_logs l LEFT JOIN ai_users u ON l.user_id = u.id ORDER BY l.created_at DESC LIMIT %s OFFSET %s """ logs = db_manager.execute_query(sql, (size, offset)) return jsonify({ 'code': 200, 'message': '获取成功', 'data': { 'list': logs, 'total': total, 'page': page, 'size': size }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取日志] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @app.route('/api/logs/file', methods=['GET']) def get_file_logs(): """获取文件日志内容""" try: # 获取查询参数 log_type = request.args.get('type', 'article') # article, error, whoosh lines = int(request.args.get('lines', 100)) # 默认获取最后100行 date = request.args.get('date', '') # 可选的日期参数,格式:2025-08-14 # 限制最大行数,避免内存问题 if lines > 1000: lines = 1000 # 构建日志文件路径 log_dir = 'logs' if log_type == 'article': if date: log_file = f'{log_dir}/article_server.log.{date}' else: log_file = f'{log_dir}/article_server.log' elif log_type == 'error': if date: log_file = f'{log_dir}/article_server_error.log.{date}' else: log_file = f'{log_dir}/article_server_error.log' elif log_type == 'whoosh': if date: log_file = f'{log_dir}/whoosh_search_tags.log.{date}' else: log_file = f'{log_dir}/whoosh_search_tags.log' else: return jsonify({ 'code': 400, 'message': '不支持的日志类型', 'data': None }), 400 # 检查文件是否存在 if not os.path.exists(log_file): return jsonify({ 'code': 404, 'message': f'日志文件不存在: {log_file}', 'data': None }), 404 # 读取日志文件的最后N行 try: with open(log_file, 'r', encoding='utf-8') as f: # 读取所有行 all_lines = f.readlines() # 获取最后N行 recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines # 获取文件信息 file_stat = os.stat(log_file) file_size = file_stat.st_size file_mtime = datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S') return jsonify({ 'code': 200, 'message': '获取成功', 'data': { 'log_type': log_type, 'file_path': log_file, 'file_size': file_size, 'file_mtime': file_mtime, 'total_lines': len(all_lines), 'returned_lines': len(recent_lines), 'lines': [line.rstrip('\n') for line in recent_lines] # 移除换行符 }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except UnicodeDecodeError: # 如果UTF-8解码失败,尝试其他编码 try: with open(log_file, 'r', encoding='gbk') as f: all_lines = f.readlines() recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines return jsonify({ 'code': 200, 'message': '获取成功(使用GBK编码)', 'data': { 'log_type': log_type, 'file_path': log_file, 'total_lines': len(all_lines), 'returned_lines': len(recent_lines), 'lines': [line.rstrip('\n') for line in recent_lines] }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as decode_error: logger.error(f"[文件日志] 编码解析失败: {str(decode_error)}") return jsonify({ 'code': 500, 'message': f'文件编码解析失败: {str(decode_error)}', 'data': None }), 500 except Exception as e: logger.error(f"[文件日志] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @app.route('/api/logs/files', methods=['GET']) def get_log_files(): """获取可用的日志文件列表""" try: log_dir = 'logs' if not os.path.exists(log_dir): return jsonify({ 'code': 404, 'message': '日志目录不存在', 'data': None }), 404 files = [] for filename in os.listdir(log_dir): if filename.endswith('.log') or '.log.' in filename: file_path = os.path.join(log_dir, filename) if os.path.isfile(file_path): file_stat = os.stat(file_path) files.append({ 'filename': filename, 'size': file_stat.st_size, 'size_mb': round(file_stat.st_size / 1024 / 1024, 2), 'modified': datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'type': 'error' if 'error' in filename else ('whoosh' if 'whoosh' in filename else 'article') }) # 按修改时间倒序排列 files.sort(key=lambda x: x['modified'], reverse=True) return jsonify({ 'code': 200, 'message': '获取成功', 'data': { 'files': files, 'total': len(files) }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[日志文件列表] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 # 请求前拦截器 - 记录所有API访问 @app.before_request def log_request_info(): """记录每个请求的基本信息""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) user_agent = request.headers.get('User-Agent', '未知') logger.info(f"[API访问] {request.method} {request.path} - IP: {client_ip} - User-Agent: {user_agent[:100]}") # 记录请求参数(GET请求) if request.args: logger.debug(f"[请求参数] GET参数: {dict(request.args)}") # 处理OPTIONS预检请求 @app.route('/api/', methods=['OPTIONS']) def handle_options(path): """处理所有API的OPTIONS预检请求""" response = jsonify({'message': 'OK'}) response.headers.add('Access-Control-Allow-Origin', '*') response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization,Accept,X-Requested-With') response.headers.add('Access-Control-Allow-Methods', 'GET,POST,PUT,DELETE,OPTIONS') response.headers.add('Access-Control-Allow-Credentials', 'true') response.headers.add('Access-Control-Max-Age', '3600') return response # 请求后拦截器 - 记录响应状态 @app.after_request def log_response_info(response): """记录响应状态""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[API响应] {request.method} {request.path} - IP: {client_ip} - 状态码: {response.status_code}") return response @app.route('/health', methods=['GET']) def health_check(): """健康检查接口""" try: # 检查数据库连接 db_manager = get_db_manager() db_manager.execute_query("SELECT 1") return jsonify({ 'code': 200, 'message': '服务正常', 'data': { 'status': 'healthy', 'timestamp': int(datetime.now().timestamp() * 1000), 'version': '2.0' } }) except Exception as e: logger.error(f"健康检查失败: {e}") return jsonify({ 'code': 500, 'message': '服务异常', 'data': { 'status': 'unhealthy', 'error': str(e), 'timestamp': int(datetime.now().timestamp() * 1000) } }), 500 @app.route('/', methods=['GET']) def index(): """根路径接口 - 重定向到dashboard.html""" return redirect('/dashboard.html') @app.route('/') def serve_static(filename): """提供静态文件服务(按文件类型返回正确的Content-Type)""" try: import os from flask import send_from_directory, make_response static_root = 'public' file_path = os.path.join(static_root, filename) if os.path.exists(file_path) and os.path.isfile(file_path): # 创建响应对象 response = make_response(send_from_directory(static_root, filename)) # 为图片文件设置强缓存策略 file_ext = filename.lower().split('.')[-1] if '.' in filename else '' if file_ext in ['png', 'jpg', 'jpeg', 'gif', 'webp', 'svg', 'ico', 'bmp']: # 设置1年的强缓存 response.headers['Cache-Control'] = 'public, max-age=31536000, immutable' # 设置过期时间(1年后) expires_date = datetime.now() + timedelta(days=365) response.headers['Expires'] = expires_date.strftime('%a, %d %b %Y %H:%M:%S GMT') # 添加ETag用于缓存验证 response.headers['ETag'] = f'"img-{int(os.path.getmtime(file_path))}-{os.path.getsize(file_path)}"' logger.debug(f"[静态文件缓存] 为图片文件设置缓存头: {filename}") # 为CSS/JS文件设置中等缓存策略 elif file_ext in ['css', 'js', 'map']: response.headers['Cache-Control'] = 'public, max-age=86400' # 1天缓存 expires_date = datetime.now() + timedelta(days=1) response.headers['Expires'] = expires_date.strftime('%a, %d %b %Y %H:%M:%S GMT') logger.debug(f"[静态文件缓存] 为CSS/JS文件设置缓存头: {filename}") # 为字体文件设置长期缓存 elif file_ext in ['woff', 'woff2', 'ttf', 'eot', 'otf']: response.headers['Cache-Control'] = 'public, max-age=2592000' # 30天缓存 expires_date = datetime.now() + timedelta(days=30) response.headers['Expires'] = expires_date.strftime('%a, %d %b %Y %H:%M:%S GMT') logger.debug(f"[静态文件缓存] 为字体文件设置缓存头: {filename}") return response else: return jsonify({ 'code': 404, 'message': '文件不存在', 'data': None }), 404 except Exception as e: logger.error(f"[静态文件服务] 错误: {str(e)}") return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @app.errorhandler(404) def not_found(error): """404错误处理""" return jsonify({ 'code': 404, 'message': '接口不存在', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }), 404 @app.errorhandler(405) def method_not_allowed(error): """405错误处理""" return jsonify({ 'code': 405, 'message': '请求方法不允许', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }), 405 @app.errorhandler(500) def internal_error(error): """500错误处理""" logger.error(f"服务器内部错误: {error}") return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }), 500 # 添加日志记录函数 def add_log(user_id, action, target_type, target_id, description, status='success', error_message=None): """添加操作日志""" try: db_manager = get_db_manager() sql = """ INSERT INTO ai_logs (user_id, action, target_type, target_id, description, ip_address, user_agent, status, error_message) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """ ip_address = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) user_agent = request.headers.get('User-Agent', '未知') db_manager.execute_insert(sql, ( user_id, action, target_type, target_id, description, ip_address, user_agent, status, error_message )) logger.info(f"[日志记录] {action} - 用户ID: {user_id}, 目标: {target_type}:{target_id}") except Exception as e: logger.error(f"[日志记录失败] {e}") # 接口1:upload_article - 提交百家号逻辑,插入到数据库ai_articles @app.route('/api/generate_article', methods=['POST']) @require_auth def generate_article(): """上传文章到数据库""" try: # 获取当前用户 current_user = AuthUtils.get_current_user() if not current_user: return jsonify({ 'code': 401, 'message': '用户未认证', 'data': None }), 401 # 获取请求数据 data = request.get_json() logger.info(f"[请求开始] 文章内容: {data}") if not data: return jsonify({ 'code': 400, 'message': '请求数据为空', 'data': None }), 400 # 检查是否为更新模式(有article_id和batch_id) article_id = data.get('article_id') batch_id = data.get('batch_id') is_update_mode = bool(article_id and batch_id) logger.info(f"[模式检测] article_id: {article_id}, batch_id: {batch_id}, 更新模式: {is_update_mode}") # 验证必需字段 required_fields = ['title', 'content'] for field in required_fields: if field not in data: return jsonify({ 'code': 400, 'message': f'缺少必需字段: {field}', 'data': None }), 400 # 记录请求开始 request_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[请求开始 start] 客户端IP: {client_ip}, 时间: {request_time}") logger.info(f"[请求开始 start] 文章标题: {data['title']}") # 记录完整请求数据(调试用) logger.debug(f"[请求数据] {json.dumps(data, ensure_ascii=False, indent=2)}") db_manager = get_db_manager() # 计算字数和图片数量 word_count = len(data['content']) if data['content'] else 0 #image_count = len(data.get('cover_images', [])) if data.get('cover_images') else 0 # 提取context_summary(content前150字) CONTEXT_SUMMARY_LENGTH = 150 context_summary = data['content'][:CONTEXT_SUMMARY_LENGTH] if data['content'] else '' # 获取tags,用于图片匹配和存储 tags = data.get('tags', '') # 如果tags是列表,转换为字符串 if isinstance(tags, list): tags = ','.join(tags) if tags else '' elif not isinstance(tags, str): tags = str(tags) if tags else '' # 使用whoosh技术匹配tags图片 ########################################################## ########################################################## matched_images = [] image_tag_ids = [] # 收集image_tag_id到列表 if tags: try: logger.info(f"[Tags图片匹配] 开始匹配tags: {tags}") tags_images = tags_image_search(tags, 3) logger.info(f"[Tags图片匹配] 返回数据: {json.dumps(tags_images, ensure_ascii=False)}") if tags_images: # 提取图片信息(包含 image_id / image_tag_id) for result in tags_images: #ai_image_tags.image_id image_id = result.get('image_id') #ai_image_tags.id image_tag_id = result.get('image_tag_id') if image_id: matched_images.append({ 'image_id': image_id, 'image_tag_id': image_tag_id }) if image_tag_id: image_tag_ids.append(image_tag_id) logger.info(f"[image_tag_ids] 返回数据: {json.dumps(image_tag_ids, ensure_ascii=False)}") logger.info(f"[Tags图片匹配] 找到 {len(matched_images)} 张匹配图片: {matched_images}") logger.info(f"[Tags图片匹配] 收集到的image_tag_ids: {image_tag_ids}") else: logger.info(f"[Tags图片匹配] 搜索无结果") except Exception as e: logger.error(f"[Tags图片匹配] 匹配过程中发生错误: {str(e)}") ########################################################## ########################################################## # 查询ai_image_tags表获取department相关信息 department_name = '' department_id = 0 department_results = [] # 初始化变量避免UnboundLocalError if tags and image_tag_ids and len(image_tag_ids) > 0: try: # 查询ai_image_tags表获取四个字段 placeholders = ','.join(['%s'] * len(image_tag_ids)) logger.info(f"[image_tag_ids] 返回数据: {json.dumps(image_tag_ids, ensure_ascii=False)}") logger.info(f"[placeholders] 返回数据: {json.dumps(placeholders, ensure_ascii=False)}") query_sql = f""" SELECT id, keywords_id, keywords_name, department_id, department_name, image_url, image_thumb_url FROM ai_image_tags WHERE id IN ({placeholders}) """ logger.info(f"[Department查询] 查询SQL: {query_sql}, 参数: {image_tag_ids}") department_results = db_manager.execute_query(query_sql, tuple(image_tag_ids)) logger.info(f"[ai_image_tags] 返回数据: {json.dumps(department_results, ensure_ascii=False)}") if department_results: department_name = department_results[0]['department_name'] department_id = department_results[0]['department_id'] # 提取department_name和department_id,用逗号拼接 logger.info(f"[Department查询] 查询结果: department={department_name}, department_id={department_id}") else: logger.info("[Department查询] 未查询到department信息") except Exception as e: logger.error(f"[Department查询] 查询ai_image_tags表时发生错误: {str(e)}") # 保持原有department值,不阻断主流程 else: logger.info("[Department查询] 没有有效的image_tag_id") ########################################################## ########################################################## # 根据模式选择插入或更新逻辑 if is_update_mode: # 更新模式:更新现有文章 logger.info(f"[更新模式] 更新文章ID: {article_id}, batch_id: {batch_id}") # 验证文章是否存在 check_sql = "SELECT id FROM ai_articles WHERE id = %s" existing_article = db_manager.execute_query(check_sql, (article_id,)) if not existing_article: return jsonify({ 'code': 404, 'message': f'文章ID {article_id} 不存在', 'data': None }), 404 # 更新文章(包含status字段更新为pending_review) update_sql = """ UPDATE ai_articles SET title = %s, content = %s, context_summary = %s, department_id = %s, department_name = %s, word_count = %s, image_count = %s, coze_tag = %s, status = %s, updated_at = NOW() WHERE id = %s """ # 获取status参数,默认为pending_review(符合Coze文章生成的业务逻辑) status = data.get('status', 'pending_review') db_manager.execute_update(update_sql, ( data['title'], data['content'], context_summary, department_id, department_name, word_count, len(matched_images), tags, status, article_id )) logger.info(f"[更新成功] 文章ID: {article_id} 已更新") else: # 插入模式:创建新文章(保持原有逻辑) logger.info("[插入模式] 创建新文章") sql = """ INSERT INTO ai_articles (title, content, context_summary, department_id, department_name, created_user_id, status, word_count, image_count, coze_tag) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ article_id = db_manager.execute_insert(sql, ( data['title'], data['content'], context_summary, department_id, department_name, current_user['user_id'], 'pending_review', # 状态设置为pending_review word_count, len(matched_images), tags # 存储coze_tag )) ########################################################## ########################################################## # 补充ai_article_tags表数据(简化:仅记录 article_id 与 coze_tag,created_at 走默认值) if tags: try: if is_update_mode: # 更新模式:先删除旧的标签关联,再插入新的 delete_tags_sql = "DELETE FROM ai_article_tags WHERE article_id = %s" db_manager.execute_update(delete_tags_sql, (article_id,)) logger.info(f'[标签关联-更新] 已删除文章ID {article_id} 的旧标签关联') # 插入新的标签关联(插入和更新模式都执行) simple_sql = "INSERT INTO ai_article_tags (article_id, coze_tag) VALUES (%s, %s)" db_manager.execute_insert(simple_sql, (article_id, tags)) mode_text = "更新" if is_update_mode else "插入" logger.info(f'[标签关联-{mode_text}] 已写入 ai_article_tags (article_id: {article_id}, coze_tag: {tags})') except Exception as e: mode_text = "更新" if is_update_mode else "插入" logger.warning(f"[标签关联-{mode_text}] 写入 ai_article_tags 失败,已跳过: {str(e)}") ########################################################## ########################################################## # 补充 ai_article_images 表数据 # ✅ 新增逻辑:从 ai_images 表随机选择2-5张产品图片 product_images = [] total_image_count = 0 # 初始化总图片计数 # 从 ai_articles 表中查询 product_id 和 enterprise_id try: article_info_sql = "SELECT product_id, enterprise_id FROM ai_articles WHERE id = %s" article_info = db_manager.execute_query(article_info_sql, (article_id,)) if article_info and len(article_info) > 0: product_id = article_info[0].get('product_id', 0) enterprise_id = article_info[0].get('enterprise_id', 0) logger.info(f"[产品图片] 从文章表获取: product_id={product_id}, enterprise_id={enterprise_id}") else: product_id = 0 enterprise_id = 0 logger.warning(f"[产品图片] 未找到文章ID {article_id} 的信息") except Exception as e: product_id = 0 enterprise_id = 0 logger.error(f"[产品图片] 查询文章信息失败: {str(e)}", exc_info=True) if product_id and enterprise_id: try: logger.info(f"[产品图片随机选择] 开始为产品ID {product_id} 选择图片") # 随机选择2-5张图片(使用Python random模块) random_count = random.randint(2, 5) logger.info(f"[产品图片随机选择] 随机数量: {random_count}") # 查询该产品下的图片,随机排序 product_images_sql = """ SELECT id, image_url, image_thumb_url FROM ai_images WHERE product_id = %s AND enterprise_id = %s AND status = 'active' ORDER BY RAND() LIMIT %s """ product_images_result = db_manager.execute_query( product_images_sql, (product_id, enterprise_id, random_count) ) if product_images_result: product_images = product_images_result logger.info(f"[产品图片随机选择] 成功选择 {len(product_images)} 张图片") # 插入产品图片到 ai_article_images for i, img in enumerate(product_images): insert_sql = """ INSERT INTO ai_article_images (enterprise_id, article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order, image_source) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ db_manager.execute_insert(insert_sql, ( enterprise_id, article_id, img['id'], img['image_url'], img['image_thumb_url'], 0, i + 1, 2 )) # 更新文章的 image_count update_count_sql = "UPDATE ai_articles SET image_count = %s WHERE id = %s" db_manager.execute_update(update_count_sql, (len(product_images), article_id)) logger.info(f"[产品图片关联] 已更新文章 {article_id} 的 image_count 为 {len(product_images)}") else: logger.info(f"[产品图片随机选择] 产品ID {product_id} 下没有可用图片") except Exception as e: logger.error(f"[产品图片随机选择] 失败: {str(e)}", exc_info=True) else: logger.info(f"[产品图片随机选择] 缺少product_id或enterprise_id,跳过") if department_results and matched_images: try: if is_update_mode: # 更新模式:检查是否已有封面图 # ⭐ 查询当前文章是否已有封面图(sort_order=1 且 image_source!=1) check_cover_sql = """ SELECT id, sort_order FROM ai_article_images WHERE article_id = %s AND sort_order = 1 AND image_source = 4 ORDER BY id DESC LIMIT 1 """ existing_cover = db_manager.execute_query(check_cover_sql, (article_id,)) has_cover = bool(existing_cover) if has_cover: logger.info(f'[图片关联-更新] 检测到已存在封面图(sort_order=1),将从2开始编号') # ⭐ 只删除 image_source=1 的图片(tags匹配的图片),保留封面图 delete_images_sql = "DELETE FROM ai_article_images WHERE article_id = %s AND image_source < 4" db_manager.execute_update(delete_images_sql, (article_id,)) logger.info(f'[图片关联-更新] 已删除文章ID {article_id} 的旧tags图片(保留封面图)') start_order = 2 # 从2开始编号 else: logger.info(f'[图片关联-更新] 未检测到封面图,将从1开始编号') # 删除所有图片 delete_images_sql = "DELETE FROM ai_article_images WHERE article_id = %s AND image_source < 4" db_manager.execute_update(delete_images_sql, (article_id,)) logger.info(f'[图片关联-更新] 已删除文章ID {article_id} 的所有旧图片') start_order = 1 # 从1开始编号 else: # 插入模式:从1开始编号 start_order = 1 # 检测表结构,动态调整插入字段 table_info = get_article_images_column_info() has_keywords_id = table_info.get('keywords_id', {}).get('exists', False) has_keywords_name = table_info.get('keywords_name', {}).get('exists', False) has_department_id = table_info.get('department_id', {}).get('exists', False) has_department_name = table_info.get('department_name', {}).get('exists', False) # 创建department_results的字典映射,以image_tag_id为key dept_dict = {} for dept in department_results: # 假设department_results中有id字段对应image_tag_id dept_dict[dept.get('id')] = dept # 根据表结构构建SQL和参数 if has_keywords_id and has_keywords_name and has_department_id and has_department_name: # 完整模式:包含所有新增字段 sql = """ INSERT INTO ai_article_images (article_id, image_id, image_url, image_thumb_url, image_tag_id, keywords_id, keywords_name, department_id, department_name, sort_order,image_source) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ for i, img in enumerate(matched_images): image_tag_id = img.get('image_tag_id') dept_info = dept_dict.get(image_tag_id, {}) db_manager.execute_insert(sql, ( article_id, img.get('image_id'), dept_info.get('image_url', ''), dept_info.get('image_thumb_url', ''), image_tag_id, dept_info.get('keywords_id', 0), dept_info.get('keywords_name', ''), dept_info.get('department_id', 0), dept_info.get('department_name', ''), start_order + i, # ⭐ 使用动态起始编号 1 )) elif has_keywords_id: # 简化模式:只包含 keywords_id sql = """ INSERT INTO ai_article_images (article_id, image_id, image_url, image_thumb_url, image_tag_id, keywords_id, sort_order,image_source) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ for i, img in enumerate(matched_images): image_tag_id = img.get('image_tag_id') dept_info = dept_dict.get(image_tag_id, {}) keywords_id = dept_info.get('keywords_id', 0) db_manager.execute_insert(sql, ( article_id, img.get('image_id'), dept_info.get('image_url', ''), dept_info.get('image_thumb_url', ''), image_tag_id, keywords_id, start_order + i, # ⭐ 使用动态起始编号 1 )) else: # 基础模式:不包含新增字段 sql = """ INSERT INTO ai_article_images (article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order,image_source) VALUES (%s, %s, %s, %s, %s, %s, %s) """ for i, img in enumerate(matched_images): db_manager.execute_insert(sql, ( article_id, img.get('image_id'), dept_info.get('image_url', ''), dept_info.get('image_thumb_url', ''), img.get('image_tag_id'), start_order + i, # ⭐ 使用动态起始编号 1 )) # 记录插入模式 table_mode = "完整模式" if (has_keywords_id and has_keywords_name and has_department_id and has_department_name) else ("简化模式" if has_keywords_id else "基础模式") operation_mode = "更新" if is_update_mode else "插入" total_image_count += len(matched_images) logger.info(f"[图片关联-{operation_mode}] 成功写入 {len(matched_images)} 条文章-图片关联({table_mode}),sort_order从{start_order}开始") except Exception as e: operation_mode = "更新" if is_update_mode else "插入" logger.error(f"[图片关联-{operation_mode}] 写入 ai_article_images 时发生错误: {str(e)}") # 更新文章的总图片数 if total_image_count > 0: try: update_count_sql = "UPDATE ai_articles SET image_count = %s WHERE id = %s" db_manager.execute_update(update_count_sql, (total_image_count, article_id)) logger.info(f"[图片计数更新] 文章 {article_id} 的 image_count 已更新为 {total_image_count}") except Exception as e: logger.error(f"[图片计数更新] 失败: {str(e)}") # 记录成功日志 if is_update_mode: add_log( current_user['user_id'], 'update_article', 'article', article_id, f"更新文章: {data['title']} (batch_id: {batch_id})", 'success' ) logger.info(f"[更新成功] 文章《{data['title']}》已更新,ID: {article_id}, batch_id: {batch_id}") success_message = '文章更新成功' success_errmsg = '更新成功' else: add_log( current_user['user_id'], 'create_article', 'article', article_id, f"创建文章: {data['title']}", 'success' ) logger.info(f"[发布成功] 文章《{data['title']}》已保存到数据库,ID: {article_id}") success_message = '文章保存成功' success_errmsg = '发布成功' return jsonify({ 'errno': 0, 'errmsg': success_errmsg, 'message': success_message, 'data': { 'article_id': article_id, 'batch_id': batch_id if is_update_mode else None, 'title': data['title'], 'department': department_name, 'status': 'pending_review', 'word_count': word_count, 'image_count': len(matched_images), 'coze_tag': tags, 'matched_images': matched_images, 'operation_mode': 'update' if is_update_mode else 'create' }, 'account_info': { 'user_name': current_user.get('real_name') or current_user.get('username', ''), 'department': current_user.get('department', '') } }) except Exception as e: logger.error(f"[系统错误] 上传文章时发生未知错误: {str(e)}", exc_info=True) # 记录错误日志 if 'current_user' in locals(): add_log( current_user['user_id'], 'create_article', 'article', None, f"创建文章失败: {data.get('title', '未知标题')}", 'error', str(e) ) return jsonify({ 'errno': 500, 'errmsg': '发布失败', 'message': f'服务器内部错误: {str(e)}', 'data': None }), 500 # 开发环境启动入口 if __name__ == '__main__': print("=" * 60) print("图文动态批量生产系统 v2.0 启动中...") print("⚠️ 警告: 当前使用开发服务器,不建议在生产环境中使用") print("⚠️ 生产环境请使用: gunicorn、waitress 等WSGI服务器") print("-" * 60) print("服务地址: http://127.0.0.1:8321") print("API文档: http://127.0.0.1:8321") print("健康检查: http://127.0.0.1:8321/health") print("-" * 60) print("接口模块:") print(" - 用户认证: /api/auth") print(" - 用户管理: /api/users") print(" - 文章管理: /api/articles") print(" - 统计接口: /api/statistics") print("-" * 60) print("日志配置信息:") print(f" 主日志文件: logs/baijiahao_server.log") print(f" 错误日志文件: logs/baijiahao_error.log") print(f" 日志切割: 每日午夜自动切割") print(f" 日志保留: 主日志30天,错误日志90天") print("-" * 60) print("数据库配置:") print(f" 主机: 127.0.0.1:3306") print(f" 数据库: ai_article") print(f" 用户: root") print("-" * 60) print("生产部署选项:") print(" 1. Waitress: waitress-serve --host=0.0.0.0 --port=8321 flask_article_server_search:app") print(" 2. Gunicorn: gunicorn --bind 0.0.0.0:8321 --workers 4 flask_article_server_search:app") print("=" * 60) try: # 预加载Whoosh索引 initialize_service() # 测试数据库连接 logger.info("测试数据库连接...") db_manager = get_db_manager() db_manager.execute_query("SELECT 1") logger.info("数据库连接成功") # 启动开发服务器 logger.warning("使用Flask开发服务器启动(仅用于开发测试)") app.run( host='0.0.0.0', port=8321, debug=False, threaded=True ) except KeyboardInterrupt: logger.info("收到中断信号,正在关闭服务器...") print("\n服务器已关闭") except Exception as e: logger.error(f"服务器启动失败: {str(e)}", exc_info=True) print(f"服务器启动失败: {str(e)}") finally: logger.info("图文动态批量生产系统已停止") print("日志已保存到 logs/ 目录") # 生产环境初始化(当作为模块导入时执行) else: print('图文动态批量生产系统 v2.0 模块已加载') # 当作为模块被导入(如生产WSGI)时也预加载Whoosh索引 try: #print(1) initialize_service() except Exception: pass