#!/usr/bin/env python # -*- coding: utf-8 -*- """日志管理接口""" from flask import Blueprint, request, jsonify import logging import os from datetime import datetime from auth_utils import require_auth, require_role, AuthUtils from database_config import get_db_manager, format_datetime_fields # 导入统一日志配置 from log_config import setup_article_server_logger # 初始化日志 logger = setup_article_server_logger() # 创建蓝图 log_bp = Blueprint('log', __name__, url_prefix='/api/log') # 日志查询接口 @log_bp.route('/logs', methods=['GET']) @require_auth def get_logs(): """获取操作日志""" try: # 获取查询参数 page = int(request.args.get('page', 1)) size = int(request.args.get('size', 10)) # 计算偏移量 offset = (page - 1) * size db_manager = get_db_manager() # 查询总数 count_sql = "SELECT COUNT(*) as total FROM wht_logs" count_result = db_manager.execute_query(count_sql) total = count_result[0]['total'] # 查询日志列表 sql = """ SELECT l.id, l.user_id, l.action, l.target_type, l.target_id, l.description, l.ip_address, l.user_agent, l.status, l.created_at, u.username, u.real_name FROM wht_logs l LEFT JOIN wht_admin_users u ON l.user_id = u.id ORDER BY l.created_at DESC LIMIT %s OFFSET %s """ logs = db_manager.execute_query(sql, (size, offset)) # 格式化日期时间字段 logs = format_datetime_fields(logs) return jsonify({ 'code': 200, 'message': '获取成功', 'data': { 'list': logs, 'total': total, 'page': page, 'size': size }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取日志] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @log_bp.route('/logs/file', methods=['GET']) @require_auth def get_file_logs(): """获取文件日志内容""" try: # 获取查询参数 log_type = request.args.get('type', 'article') # article, error, whoosh lines = int(request.args.get('lines', 100)) # 默认获取最后100行 date = request.args.get('date', '') # 可选的日期参数,格式:2025-08-14 # 限制最大行数,避免内存问题 if lines > 1000: lines = 1000 # 构建日志文件路径 log_dir = 'logs' if log_type == 'article': if date: log_file = f'{log_dir}/article_server.log.{date}' else: log_file = f'{log_dir}/article_server.log' elif log_type == 'error': if date: log_file = f'{log_dir}/article_server_error.log.{date}' else: log_file = f'{log_dir}/article_server_error.log' elif log_type == 'whoosh': if date: log_file = f'{log_dir}/whoosh_search_tags.log.{date}' else: log_file = f'{log_dir}/whoosh_search_tags.log' else: return jsonify({ 'code': 400, 'message': '不支持的日志类型', 'data': None }), 400 # 检查文件是否存在 if not os.path.exists(log_file): return jsonify({ 'code': 404, 'message': f'日志文件不存在: {log_file}', 'data': None }), 404 # 读取日志文件的最后N行 try: with open(log_file, 'r', encoding='utf-8') as f: # 读取所有行 all_lines = f.readlines() # 获取最后N行 recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines # 获取文件信息 file_stat = os.stat(log_file) file_size = file_stat.st_size file_mtime = datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S') return jsonify({ 'code': 200, 'message': '获取成功', 'data': { 'log_type': log_type, 'file_path': log_file, 'file_size': file_size, 'file_mtime': file_mtime, 'total_lines': len(all_lines), 'returned_lines': len(recent_lines), 'lines': [line.rstrip('\n') for line in recent_lines] # 移除换行符 }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except UnicodeDecodeError: # 如果UTF-8解码失败,尝试其他编码 try: with open(log_file, 'r', encoding='gbk') as f: all_lines = f.readlines() recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines return jsonify({ 'code': 200, 'message': '获取成功(使用GBK编码)', 'data': { 'log_type': log_type, 'file_path': log_file, 'total_lines': len(all_lines), 'returned_lines': len(recent_lines), 'lines': [line.rstrip('\n') for line in recent_lines] }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as decode_error: logger.error(f"[文件日志] 编码解析失败: {str(decode_error)}") return jsonify({ 'code': 500, 'message': f'文件编码解析失败: {str(decode_error)}', 'data': None }), 500 except Exception as e: logger.error(f"[文件日志] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @log_bp.route('/logs/files', methods=['GET']) @require_auth def get_log_files(): """获取可用的日志文件列表""" try: log_dir = 'logs' if not os.path.exists(log_dir): return jsonify({ 'code': 404, 'message': '日志目录不存在', 'data': None }), 404 files = [] for filename in os.listdir(log_dir): if filename.endswith('.log') or '.log.' in filename: file_path = os.path.join(log_dir, filename) if os.path.isfile(file_path): file_stat = os.stat(file_path) files.append({ 'filename': filename, 'size': file_stat.st_size, 'size_mb': round(file_stat.st_size / 1024 / 1024, 2), 'modified': datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'type': 'error' if 'error' in filename else ('whoosh' if 'whoosh' in filename else 'article') }) # 按修改时间倒序排列 files.sort(key=lambda x: x['modified'], reverse=True) return jsonify({ 'code': 200, 'message': '获取成功', 'data': { 'files': files, 'total': len(files) }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[日志文件列表] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500