255 lines
8.4 KiB
Python
255 lines
8.4 KiB
Python
"""
|
|
日志管理模块
|
|
提供日志读取、过滤、搜索和导出功能
|
|
"""
|
|
import os
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional, Tuple
|
|
from config import Config
|
|
|
|
|
|
class LogManager:
|
|
"""日志管理器"""
|
|
|
|
LOG_PATTERN = re.compile(
|
|
r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s*\|\s*(\w+)\s*\|\s*(.*)',
|
|
re.DOTALL
|
|
)
|
|
|
|
LEVEL_PRIORITY = {
|
|
'DEBUG': 0,
|
|
'INFO': 1,
|
|
'WARNING': 2,
|
|
'ERROR': 3,
|
|
'CRITICAL': 4
|
|
}
|
|
|
|
def __init__(self, log_dir: str = None):
|
|
self.log_dir = Path(log_dir or Config.LOG_DIR)
|
|
if not self.log_dir.exists():
|
|
self.log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def get_log_files(self) -> List[Dict]:
|
|
"""获取所有日志文件列表"""
|
|
log_files = []
|
|
for file_path in self.log_dir.glob('*.log'):
|
|
stat = file_path.stat()
|
|
log_files.append({
|
|
'name': file_path.name,
|
|
'path': str(file_path),
|
|
'size': stat.st_size,
|
|
'size_human': self._format_size(stat.st_size),
|
|
'modified_time': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
|
|
})
|
|
return sorted(log_files, key=lambda x: x['modified_time'], reverse=True)
|
|
|
|
def _format_size(self, size: int) -> str:
|
|
"""格式化文件大小"""
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
if size < 1024:
|
|
return f"{size:.1f} {unit}"
|
|
size /= 1024
|
|
return f"{size:.1f} TB"
|
|
|
|
def _parse_log_line(self, line: str) -> Optional[Dict]:
|
|
"""解析单行日志"""
|
|
match = self.LOG_PATTERN.match(line.strip())
|
|
if match:
|
|
return {
|
|
'time': match.group(1),
|
|
'level': match.group(2).upper(),
|
|
'message': match.group(3).strip()
|
|
}
|
|
return None
|
|
|
|
def _read_log_file_reverse(self, file_path: Path, limit: int = 500) -> List[str]:
|
|
"""从文件末尾反向读取日志行"""
|
|
lines = []
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
# 读取所有行
|
|
all_lines = f.readlines()
|
|
# 取最后limit行
|
|
lines = all_lines[-limit:] if len(all_lines) > limit else all_lines
|
|
except Exception:
|
|
pass
|
|
return lines
|
|
|
|
def get_latest_logs(self, limit: int = 100, level: str = 'ALL') -> List[Dict]:
|
|
"""获取最新的日志条目(用于实时流)"""
|
|
logs = []
|
|
log_files = self.get_log_files()
|
|
|
|
if not log_files:
|
|
return logs
|
|
|
|
# 读取最新的日志文件
|
|
latest_file = Path(log_files[0]['path'])
|
|
lines = self._read_log_file_reverse(latest_file, limit * 2)
|
|
|
|
current_log = None
|
|
for line in lines:
|
|
parsed = self._parse_log_line(line)
|
|
if parsed:
|
|
if current_log:
|
|
logs.append(current_log)
|
|
current_log = parsed
|
|
elif current_log and line.strip():
|
|
# 多行日志,追加到消息中
|
|
current_log['message'] += '\n' + line.strip()
|
|
|
|
if current_log:
|
|
logs.append(current_log)
|
|
|
|
# 按级别过滤
|
|
if level and level.upper() != 'ALL':
|
|
logs = [log for log in logs if log['level'] == level.upper()]
|
|
|
|
# 返回最新的limit条
|
|
return logs[-limit:]
|
|
|
|
def search_logs(
|
|
self,
|
|
keyword: str = None,
|
|
level: str = 'ALL',
|
|
start_time: str = None,
|
|
end_time: str = None,
|
|
page: int = 1,
|
|
page_size: int = 100
|
|
) -> Tuple[List[Dict], int]:
|
|
"""
|
|
搜索日志
|
|
返回: (日志列表, 总数)
|
|
"""
|
|
all_logs = []
|
|
log_files = self.get_log_files()
|
|
|
|
# 时间范围过滤
|
|
start_dt = None
|
|
end_dt = None
|
|
if start_time:
|
|
try:
|
|
start_dt = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
try:
|
|
start_dt = datetime.strptime(start_time, '%Y-%m-%d')
|
|
except ValueError:
|
|
pass
|
|
if end_time:
|
|
try:
|
|
end_dt = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
try:
|
|
end_dt = datetime.strptime(end_time, '%Y-%m-%d')
|
|
end_dt = end_dt.replace(hour=23, minute=59, second=59)
|
|
except ValueError:
|
|
pass
|
|
|
|
# 读取日志文件
|
|
for log_file in log_files[:5]: # 最多读取最近5个日志文件
|
|
file_path = Path(log_file['path'])
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
current_log = None
|
|
for line in f:
|
|
parsed = self._parse_log_line(line)
|
|
if parsed:
|
|
if current_log:
|
|
all_logs.append(current_log)
|
|
current_log = parsed
|
|
elif current_log and line.strip():
|
|
current_log['message'] += '\n' + line.strip()
|
|
if current_log:
|
|
all_logs.append(current_log)
|
|
except Exception:
|
|
continue
|
|
|
|
# 过滤
|
|
filtered_logs = []
|
|
for log in all_logs:
|
|
# 级别过滤
|
|
if level and level.upper() != 'ALL' and log['level'] != level.upper():
|
|
continue
|
|
|
|
# 关键词过滤
|
|
if keyword and keyword.lower() not in log['message'].lower():
|
|
continue
|
|
|
|
# 时间范围过滤
|
|
try:
|
|
log_dt = datetime.strptime(log['time'], '%Y-%m-%d %H:%M:%S')
|
|
if start_dt and log_dt < start_dt:
|
|
continue
|
|
if end_dt and log_dt > end_dt:
|
|
continue
|
|
except ValueError:
|
|
pass
|
|
|
|
filtered_logs.append(log)
|
|
|
|
# 按时间倒序排序
|
|
filtered_logs.sort(key=lambda x: x['time'], reverse=True)
|
|
|
|
# 分页
|
|
total = len(filtered_logs)
|
|
start_idx = (page - 1) * page_size
|
|
end_idx = start_idx + page_size
|
|
paginated_logs = filtered_logs[start_idx:end_idx]
|
|
|
|
return paginated_logs, total
|
|
|
|
def get_log_content(self, filename: str) -> str:
|
|
"""获取指定日志文件的内容"""
|
|
file_path = self.log_dir / filename
|
|
if not file_path.exists():
|
|
return ""
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
return f.read()
|
|
except Exception:
|
|
return ""
|
|
|
|
def get_log_stats(self) -> Dict:
|
|
"""获取日志统计信息"""
|
|
stats = {
|
|
'total_files': 0,
|
|
'total_size': 0,
|
|
'total_size_human': '0 B',
|
|
'level_counts': {
|
|
'DEBUG': 0,
|
|
'INFO': 0,
|
|
'WARNING': 0,
|
|
'ERROR': 0,
|
|
'CRITICAL': 0
|
|
},
|
|
'latest_error': None,
|
|
'latest_warning': None
|
|
}
|
|
|
|
log_files = self.get_log_files()
|
|
stats['total_files'] = len(log_files)
|
|
stats['total_size'] = sum(f['size'] for f in log_files)
|
|
stats['total_size_human'] = self._format_size(stats['total_size'])
|
|
|
|
# 统计最近日志文件的级别分布
|
|
if log_files:
|
|
latest_file = Path(log_files[0]['path'])
|
|
lines = self._read_log_file_reverse(latest_file, 1000)
|
|
|
|
for line in lines:
|
|
parsed = self._parse_log_line(line)
|
|
if parsed:
|
|
level = parsed['level']
|
|
if level in stats['level_counts']:
|
|
stats['level_counts'][level] += 1
|
|
|
|
if level == 'ERROR' and not stats['latest_error']:
|
|
stats['latest_error'] = parsed
|
|
elif level == 'WARNING' and not stats['latest_warning']:
|
|
stats['latest_warning'] = parsed
|
|
|
|
return stats
|