Files
ai_mip/log_manager.py

255 lines
8.4 KiB
Python
Raw Permalink Normal View History

2026-02-24 12:46:35 +08:00
"""
日志管理模块
提供日志读取过滤搜索和导出功能
"""
import os
import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from config import Config
class LogManager:
"""日志管理器"""
LOG_PATTERN = re.compile(
r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s*\|\s*(\w+)\s*\|\s*(.*)',
re.DOTALL
)
LEVEL_PRIORITY = {
'DEBUG': 0,
'INFO': 1,
'WARNING': 2,
'ERROR': 3,
'CRITICAL': 4
}
def __init__(self, log_dir: str = None):
self.log_dir = Path(log_dir or Config.LOG_DIR)
if not self.log_dir.exists():
self.log_dir.mkdir(parents=True, exist_ok=True)
def get_log_files(self) -> List[Dict]:
"""获取所有日志文件列表"""
log_files = []
for file_path in self.log_dir.glob('*.log'):
stat = file_path.stat()
log_files.append({
'name': file_path.name,
'path': str(file_path),
'size': stat.st_size,
'size_human': self._format_size(stat.st_size),
'modified_time': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
})
return sorted(log_files, key=lambda x: x['modified_time'], reverse=True)
def _format_size(self, size: int) -> str:
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024:
return f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} TB"
def _parse_log_line(self, line: str) -> Optional[Dict]:
"""解析单行日志"""
match = self.LOG_PATTERN.match(line.strip())
if match:
return {
'time': match.group(1),
'level': match.group(2).upper(),
'message': match.group(3).strip()
}
return None
def _read_log_file_reverse(self, file_path: Path, limit: int = 500) -> List[str]:
"""从文件末尾反向读取日志行"""
lines = []
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
# 读取所有行
all_lines = f.readlines()
# 取最后limit行
lines = all_lines[-limit:] if len(all_lines) > limit else all_lines
except Exception:
pass
return lines
def get_latest_logs(self, limit: int = 100, level: str = 'ALL') -> List[Dict]:
"""获取最新的日志条目(用于实时流)"""
logs = []
log_files = self.get_log_files()
if not log_files:
return logs
# 读取最新的日志文件
latest_file = Path(log_files[0]['path'])
lines = self._read_log_file_reverse(latest_file, limit * 2)
current_log = None
for line in lines:
parsed = self._parse_log_line(line)
if parsed:
if current_log:
logs.append(current_log)
current_log = parsed
elif current_log and line.strip():
# 多行日志,追加到消息中
current_log['message'] += '\n' + line.strip()
if current_log:
logs.append(current_log)
# 按级别过滤
if level and level.upper() != 'ALL':
logs = [log for log in logs if log['level'] == level.upper()]
# 返回最新的limit条
return logs[-limit:]
def search_logs(
self,
keyword: str = None,
level: str = 'ALL',
start_time: str = None,
end_time: str = None,
page: int = 1,
page_size: int = 100
) -> Tuple[List[Dict], int]:
"""
搜索日志
返回: (日志列表, 总数)
"""
all_logs = []
log_files = self.get_log_files()
# 时间范围过滤
start_dt = None
end_dt = None
if start_time:
try:
start_dt = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
except ValueError:
try:
start_dt = datetime.strptime(start_time, '%Y-%m-%d')
except ValueError:
pass
if end_time:
try:
end_dt = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
except ValueError:
try:
end_dt = datetime.strptime(end_time, '%Y-%m-%d')
end_dt = end_dt.replace(hour=23, minute=59, second=59)
except ValueError:
pass
# 读取日志文件
for log_file in log_files[:5]: # 最多读取最近5个日志文件
file_path = Path(log_file['path'])
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
current_log = None
for line in f:
parsed = self._parse_log_line(line)
if parsed:
if current_log:
all_logs.append(current_log)
current_log = parsed
elif current_log and line.strip():
current_log['message'] += '\n' + line.strip()
if current_log:
all_logs.append(current_log)
except Exception:
continue
# 过滤
filtered_logs = []
for log in all_logs:
# 级别过滤
if level and level.upper() != 'ALL' and log['level'] != level.upper():
continue
# 关键词过滤
if keyword and keyword.lower() not in log['message'].lower():
continue
# 时间范围过滤
try:
log_dt = datetime.strptime(log['time'], '%Y-%m-%d %H:%M:%S')
if start_dt and log_dt < start_dt:
continue
if end_dt and log_dt > end_dt:
continue
except ValueError:
pass
filtered_logs.append(log)
# 按时间倒序排序
filtered_logs.sort(key=lambda x: x['time'], reverse=True)
# 分页
total = len(filtered_logs)
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
paginated_logs = filtered_logs[start_idx:end_idx]
return paginated_logs, total
def get_log_content(self, filename: str) -> str:
"""获取指定日志文件的内容"""
file_path = self.log_dir / filename
if not file_path.exists():
return ""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
except Exception:
return ""
def get_log_stats(self) -> Dict:
"""获取日志统计信息"""
stats = {
'total_files': 0,
'total_size': 0,
'total_size_human': '0 B',
'level_counts': {
'DEBUG': 0,
'INFO': 0,
'WARNING': 0,
'ERROR': 0,
'CRITICAL': 0
},
'latest_error': None,
'latest_warning': None
}
log_files = self.get_log_files()
stats['total_files'] = len(log_files)
stats['total_size'] = sum(f['size'] for f in log_files)
stats['total_size_human'] = self._format_size(stats['total_size'])
# 统计最近日志文件的级别分布
if log_files:
latest_file = Path(log_files[0]['path'])
lines = self._read_log_file_reverse(latest_file, 1000)
for line in lines:
parsed = self._parse_log_line(line)
if parsed:
level = parsed['level']
if level in stats['level_counts']:
stats['level_counts'][level] += 1
if level == 'ERROR' and not stats['latest_error']:
stats['latest_error'] = parsed
elif level == 'WARNING' and not stats['latest_warning']:
stats['latest_warning'] = parsed
return stats