""" 日志管理模块 提供日志读取、过滤、搜索和导出功能 """ import os import re from datetime import datetime, timedelta from pathlib import Path from typing import List, Dict, Optional, Tuple from config import Config class LogManager: """日志管理器""" LOG_PATTERN = re.compile( r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s*\|\s*(\w+)\s*\|\s*(.*)', re.DOTALL ) LEVEL_PRIORITY = { 'DEBUG': 0, 'INFO': 1, 'WARNING': 2, 'ERROR': 3, 'CRITICAL': 4 } def __init__(self, log_dir: str = None): self.log_dir = Path(log_dir or Config.LOG_DIR) if not self.log_dir.exists(): self.log_dir.mkdir(parents=True, exist_ok=True) def get_log_files(self) -> List[Dict]: """获取所有日志文件列表""" log_files = [] for file_path in self.log_dir.glob('*.log'): stat = file_path.stat() log_files.append({ 'name': file_path.name, 'path': str(file_path), 'size': stat.st_size, 'size_human': self._format_size(stat.st_size), 'modified_time': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S') }) return sorted(log_files, key=lambda x: x['modified_time'], reverse=True) def _format_size(self, size: int) -> str: """格式化文件大小""" for unit in ['B', 'KB', 'MB', 'GB']: if size < 1024: return f"{size:.1f} {unit}" size /= 1024 return f"{size:.1f} TB" def _parse_log_line(self, line: str) -> Optional[Dict]: """解析单行日志""" match = self.LOG_PATTERN.match(line.strip()) if match: return { 'time': match.group(1), 'level': match.group(2).upper(), 'message': match.group(3).strip() } return None def _read_log_file_reverse(self, file_path: Path, limit: int = 500) -> List[str]: """从文件末尾反向读取日志行""" lines = [] try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: # 读取所有行 all_lines = f.readlines() # 取最后limit行 lines = all_lines[-limit:] if len(all_lines) > limit else all_lines except Exception: pass return lines def get_latest_logs(self, limit: int = 100, level: str = 'ALL') -> List[Dict]: """获取最新的日志条目(用于实时流)""" logs = [] log_files = self.get_log_files() if not log_files: return logs # 读取最新的日志文件 latest_file = Path(log_files[0]['path']) lines = self._read_log_file_reverse(latest_file, limit * 2) current_log = None for line in lines: parsed = self._parse_log_line(line) if parsed: if current_log: logs.append(current_log) current_log = parsed elif current_log and line.strip(): # 多行日志,追加到消息中 current_log['message'] += '\n' + line.strip() if current_log: logs.append(current_log) # 按级别过滤 if level and level.upper() != 'ALL': logs = [log for log in logs if log['level'] == level.upper()] # 返回最新的limit条 return logs[-limit:] def search_logs( self, keyword: str = None, level: str = 'ALL', start_time: str = None, end_time: str = None, page: int = 1, page_size: int = 100 ) -> Tuple[List[Dict], int]: """ 搜索日志 返回: (日志列表, 总数) """ all_logs = [] log_files = self.get_log_files() # 时间范围过滤 start_dt = None end_dt = None if start_time: try: start_dt = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') except ValueError: try: start_dt = datetime.strptime(start_time, '%Y-%m-%d') except ValueError: pass if end_time: try: end_dt = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') except ValueError: try: end_dt = datetime.strptime(end_time, '%Y-%m-%d') end_dt = end_dt.replace(hour=23, minute=59, second=59) except ValueError: pass # 读取日志文件 for log_file in log_files[:5]: # 最多读取最近5个日志文件 file_path = Path(log_file['path']) try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: current_log = None for line in f: parsed = self._parse_log_line(line) if parsed: if current_log: all_logs.append(current_log) current_log = parsed elif current_log and line.strip(): current_log['message'] += '\n' + line.strip() if current_log: all_logs.append(current_log) except Exception: continue # 过滤 filtered_logs = [] for log in all_logs: # 级别过滤 if level and level.upper() != 'ALL' and log['level'] != level.upper(): continue # 关键词过滤 if keyword and keyword.lower() not in log['message'].lower(): continue # 时间范围过滤 try: log_dt = datetime.strptime(log['time'], '%Y-%m-%d %H:%M:%S') if start_dt and log_dt < start_dt: continue if end_dt and log_dt > end_dt: continue except ValueError: pass filtered_logs.append(log) # 按时间倒序排序 filtered_logs.sort(key=lambda x: x['time'], reverse=True) # 分页 total = len(filtered_logs) start_idx = (page - 1) * page_size end_idx = start_idx + page_size paginated_logs = filtered_logs[start_idx:end_idx] return paginated_logs, total def get_log_content(self, filename: str) -> str: """获取指定日志文件的内容""" file_path = self.log_dir / filename if not file_path.exists(): return "" try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: return f.read() except Exception: return "" def get_log_stats(self) -> Dict: """获取日志统计信息""" stats = { 'total_files': 0, 'total_size': 0, 'total_size_human': '0 B', 'level_counts': { 'DEBUG': 0, 'INFO': 0, 'WARNING': 0, 'ERROR': 0, 'CRITICAL': 0 }, 'latest_error': None, 'latest_warning': None } log_files = self.get_log_files() stats['total_files'] = len(log_files) stats['total_size'] = sum(f['size'] for f in log_files) stats['total_size_human'] = self._format_size(stats['total_size']) # 统计最近日志文件的级别分布 if log_files: latest_file = Path(log_files[0]['path']) lines = self._read_log_file_reverse(latest_file, 1000) for line in lines: parsed = self._parse_log_line(line) if parsed: level = parsed['level'] if level in stats['level_counts']: stats['level_counts'][level] += 1 if level == 'ERROR' and not stats['latest_error']: stats['latest_error'] = parsed elif level == 'WARNING' and not stats['latest_warning']: stats['latest_warning'] = parsed return stats