Files
ai_mip/log_manager.py
2026-02-24 12:46:35 +08:00

255 lines
8.4 KiB
Python

"""
日志管理模块
提供日志读取、过滤、搜索和导出功能
"""
import os
import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from config import Config
class LogManager:
"""日志管理器"""
LOG_PATTERN = re.compile(
r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s*\|\s*(\w+)\s*\|\s*(.*)',
re.DOTALL
)
LEVEL_PRIORITY = {
'DEBUG': 0,
'INFO': 1,
'WARNING': 2,
'ERROR': 3,
'CRITICAL': 4
}
def __init__(self, log_dir: str = None):
self.log_dir = Path(log_dir or Config.LOG_DIR)
if not self.log_dir.exists():
self.log_dir.mkdir(parents=True, exist_ok=True)
def get_log_files(self) -> List[Dict]:
"""获取所有日志文件列表"""
log_files = []
for file_path in self.log_dir.glob('*.log'):
stat = file_path.stat()
log_files.append({
'name': file_path.name,
'path': str(file_path),
'size': stat.st_size,
'size_human': self._format_size(stat.st_size),
'modified_time': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
})
return sorted(log_files, key=lambda x: x['modified_time'], reverse=True)
def _format_size(self, size: int) -> str:
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024:
return f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} TB"
def _parse_log_line(self, line: str) -> Optional[Dict]:
"""解析单行日志"""
match = self.LOG_PATTERN.match(line.strip())
if match:
return {
'time': match.group(1),
'level': match.group(2).upper(),
'message': match.group(3).strip()
}
return None
def _read_log_file_reverse(self, file_path: Path, limit: int = 500) -> List[str]:
"""从文件末尾反向读取日志行"""
lines = []
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
# 读取所有行
all_lines = f.readlines()
# 取最后limit行
lines = all_lines[-limit:] if len(all_lines) > limit else all_lines
except Exception:
pass
return lines
def get_latest_logs(self, limit: int = 100, level: str = 'ALL') -> List[Dict]:
"""获取最新的日志条目(用于实时流)"""
logs = []
log_files = self.get_log_files()
if not log_files:
return logs
# 读取最新的日志文件
latest_file = Path(log_files[0]['path'])
lines = self._read_log_file_reverse(latest_file, limit * 2)
current_log = None
for line in lines:
parsed = self._parse_log_line(line)
if parsed:
if current_log:
logs.append(current_log)
current_log = parsed
elif current_log and line.strip():
# 多行日志,追加到消息中
current_log['message'] += '\n' + line.strip()
if current_log:
logs.append(current_log)
# 按级别过滤
if level and level.upper() != 'ALL':
logs = [log for log in logs if log['level'] == level.upper()]
# 返回最新的limit条
return logs[-limit:]
def search_logs(
self,
keyword: str = None,
level: str = 'ALL',
start_time: str = None,
end_time: str = None,
page: int = 1,
page_size: int = 100
) -> Tuple[List[Dict], int]:
"""
搜索日志
返回: (日志列表, 总数)
"""
all_logs = []
log_files = self.get_log_files()
# 时间范围过滤
start_dt = None
end_dt = None
if start_time:
try:
start_dt = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
except ValueError:
try:
start_dt = datetime.strptime(start_time, '%Y-%m-%d')
except ValueError:
pass
if end_time:
try:
end_dt = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
except ValueError:
try:
end_dt = datetime.strptime(end_time, '%Y-%m-%d')
end_dt = end_dt.replace(hour=23, minute=59, second=59)
except ValueError:
pass
# 读取日志文件
for log_file in log_files[:5]: # 最多读取最近5个日志文件
file_path = Path(log_file['path'])
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
current_log = None
for line in f:
parsed = self._parse_log_line(line)
if parsed:
if current_log:
all_logs.append(current_log)
current_log = parsed
elif current_log and line.strip():
current_log['message'] += '\n' + line.strip()
if current_log:
all_logs.append(current_log)
except Exception:
continue
# 过滤
filtered_logs = []
for log in all_logs:
# 级别过滤
if level and level.upper() != 'ALL' and log['level'] != level.upper():
continue
# 关键词过滤
if keyword and keyword.lower() not in log['message'].lower():
continue
# 时间范围过滤
try:
log_dt = datetime.strptime(log['time'], '%Y-%m-%d %H:%M:%S')
if start_dt and log_dt < start_dt:
continue
if end_dt and log_dt > end_dt:
continue
except ValueError:
pass
filtered_logs.append(log)
# 按时间倒序排序
filtered_logs.sort(key=lambda x: x['time'], reverse=True)
# 分页
total = len(filtered_logs)
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
paginated_logs = filtered_logs[start_idx:end_idx]
return paginated_logs, total
def get_log_content(self, filename: str) -> str:
"""获取指定日志文件的内容"""
file_path = self.log_dir / filename
if not file_path.exists():
return ""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
except Exception:
return ""
def get_log_stats(self) -> Dict:
"""获取日志统计信息"""
stats = {
'total_files': 0,
'total_size': 0,
'total_size_human': '0 B',
'level_counts': {
'DEBUG': 0,
'INFO': 0,
'WARNING': 0,
'ERROR': 0,
'CRITICAL': 0
},
'latest_error': None,
'latest_warning': None
}
log_files = self.get_log_files()
stats['total_files'] = len(log_files)
stats['total_size'] = sum(f['size'] for f in log_files)
stats['total_size_human'] = self._format_size(stats['total_size'])
# 统计最近日志文件的级别分布
if log_files:
latest_file = Path(log_files[0]['path'])
lines = self._read_log_file_reverse(latest_file, 1000)
for line in lines:
parsed = self._parse_log_line(line)
if parsed:
level = parsed['level']
if level in stats['level_counts']:
stats['level_counts'][level] += 1
if level == 'ERROR' and not stats['latest_error']:
stats['latest_error'] = parsed
elif level == 'WARNING' and not stats['latest_warning']:
stats['latest_warning'] = parsed
return stats