#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 统一日志配置模块 提供按日期自动切割日志文件的功能 """ import os import logging import sys from logging.handlers import TimedRotatingFileHandler from datetime import datetime def setup_logger(name, log_file, error_log_file=None, level=logging.INFO, backup_count=30, error_backup_count=90, console_output=True, force_reinit=False): """ 设置日志记录器,支持按日期自动切割 Args: name: 日志记录器名称 log_file: 主日志文件路径 error_log_file: 错误日志文件路径(可选) level: 日志级别 backup_count: 主日志文件保留天数 error_backup_count: 错误日志文件保留天数 console_output: 是否输出到控制台 force_reinit: 是否强制重新初始化(删除现有handlers) Returns: logging.Logger: 配置好的日志记录器 """ # 创建logs目录 log_dir = os.path.dirname(log_file) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir) # 获取或创建logger logger = logging.getLogger(name) logger.setLevel(level) # 检查是否需要重新初始化 need_reinit = force_reinit or not logger.handlers # 如果强制重新初始化或没有handlers,则清除现有handlers if force_reinit and logger.handlers: print(f"强制重新初始化日志记录器: {name}") for handler in logger.handlers[:]: # 使用切片创建副本 logger.removeHandler(handler) need_reinit = True # 如果没有handlers,则添加新的handlers if need_reinit: # 创建日志格式 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 1. 主日志文件处理器 - 按日期切割 file_handler = TimedRotatingFileHandler( filename=log_file, when='midnight', # 每天午夜切割 interval=1, # 每1天切割一次 backupCount=backup_count, # 保留天数 encoding='utf-8' ) file_handler.setLevel(level) file_handler.setFormatter(formatter) # 设置切割后的文件名格式:filename.log.2025-07-21 file_handler.suffix = "%Y-%m-%d" # 自定义文件名生成函数,确保格式正确 def namer(default_name): # 确保文件名格式为 filename.log.2025-07-21 return default_name file_handler.namer = namer # 添加主日志处理器 logger.addHandler(file_handler) # 2. 错误日志文件处理器(如果指定) if error_log_file: error_file_handler = TimedRotatingFileHandler( filename=error_log_file, when='midnight', interval=1, backupCount=error_backup_count, # 错误日志保留更长时间 encoding='utf-8' ) error_file_handler.setLevel(logging.ERROR) error_file_handler.setFormatter(formatter) error_file_handler.suffix = "%Y-%m-%d" error_file_handler.namer = namer logger.addHandler(error_file_handler) # 3. 控制台处理器(如果启用) if console_output: console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(level) console_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) # 设置第三方库的日志级别 logging.getLogger('requests').setLevel(logging.WARNING) logging.getLogger('urllib3').setLevel(logging.WARNING) logging.getLogger('pymysql').setLevel(logging.WARNING) # 记录日志系统启动信息 logger.info(f"日志系统已启动 - 记录器: {name}") logger.info(f"主日志文件: {log_file}") if error_log_file: logger.info(f"错误日志文件: {error_log_file}") logger.info(f"日志保留策略: 每天午夜分割,主日志保留{backup_count}天") if error_log_file: logger.info(f"错误日志保留策略: 每天午夜分割,保留{error_backup_count}天") return logger def setup_database_logger(force_reinit=False): """设置数据库操作的日志记录器""" return setup_logger( name='database', log_file='logs/database.log', error_log_file='logs/database_error.log', level=logging.INFO, backup_count=30, error_backup_count=90, console_output=True, force_reinit=force_reinit ) def setup_bjh_analytics_logger(force_reinit=False): """设置百家号数据分析的日志记录器""" return setup_logger( name='bjh_analytics', log_file='logs/bjh_analytics.log', error_log_file='logs/bjh_analytics_error.log', level=logging.INFO, backup_count=30, error_backup_count=90, console_output=True, force_reinit=force_reinit ) def setup_bjh_daemon_logger(force_reinit=False): """设置百家号守护进程的日志记录器""" return setup_logger( name='bjh_daemon', log_file='logs/bjh_daemon.log', error_log_file='logs/bjh_daemon_error.log', level=logging.INFO, backup_count=30, error_backup_count=90, console_output=True, force_reinit=force_reinit ) def setup_cookie_sync_logger(force_reinit=False): """设置Cookie同步的日志记录器""" return setup_logger( name='cookie_sync', log_file='logs/cookie_sync.log', error_log_file='logs/cookie_sync_error.log', level=logging.INFO, backup_count=30, error_backup_count=90, console_output=True, force_reinit=force_reinit ) def setup_data_to_db_logger(force_reinit=False): """设置数据抓取并导出CSV的日志记录器""" return setup_logger( name='bjh_data_to_db', log_file='logs/bjh_data_to_db.log', error_log_file='logs/bjh_data_to_db_error.log', level=logging.INFO, backup_count=30, error_backup_count=90, console_output=False, # 不输出到控制台,避免与print重复 force_reinit=force_reinit ) def cleanup_old_logs(log_dir='logs', days_to_keep=30): """ 清理旧的日志文件 Args: log_dir: 日志目录 days_to_keep: 保留天数 """ import glob from datetime import datetime, timedelta if not os.path.exists(log_dir): return cutoff_date = datetime.now() - timedelta(days=days_to_keep) # 查找所有日志文件 log_patterns = [ os.path.join(log_dir, '*.log.*'), # 切割后的日志文件 os.path.join(log_dir, '*.log') # 当前日志文件 ] deleted_count = 0 for pattern in log_patterns: for log_file in glob.glob(pattern): try: # 获取文件修改时间 file_mtime = datetime.fromtimestamp(os.path.getmtime(log_file)) if file_mtime < cutoff_date: os.remove(log_file) print(f"已删除旧日志文件: {log_file}") deleted_count += 1 except Exception as e: print(f"删除日志文件失败 {log_file}: {e}") if deleted_count > 0: print(f"共清理 {deleted_count} 个旧日志文件") else: print("没有需要清理的旧日志文件") def get_log_file_info(log_dir='logs'): """ 获取日志文件信息 Args: log_dir: 日志目录 Returns: dict: 日志文件信息 """ if not os.path.exists(log_dir): return {} log_info = {} for filename in os.listdir(log_dir): if filename.endswith('.log') or '.log.' in filename: file_path = os.path.join(log_dir, filename) try: size = os.path.getsize(file_path) mtime = datetime.fromtimestamp(os.path.getmtime(file_path)) log_info[filename] = { 'size': size, 'size_mb': round(size / (1024 * 1024), 2), 'modified': mtime.strftime('%Y-%m-%d %H:%M:%S'), 'path': file_path } except Exception as e: log_info[filename] = {'error': str(e)} return log_info if __name__ == "__main__": # 测试日志配置 print("="*70) print("测试日志配置...") print("="*70) # 测试各个日志记录器 logger1 = setup_database_logger() logger1.info("数据库日志测试") logger1.error("数据库错误日志测试") logger2 = setup_bjh_analytics_logger() logger2.info("百家号分析日志测试") logger3 = setup_bjh_daemon_logger() logger3.info("守护进程日志测试") logger4 = setup_cookie_sync_logger() logger4.info("Cookie同步日志测试") # 显示日志文件信息 print("\n" + "="*70) print("当前日志文件信息:") print("="*70) log_info = get_log_file_info() if log_info: for filename, info in sorted(log_info.items()): if 'error' not in info: print(f"{filename:40s} | {info['size_mb']:>8.2f}MB | {info['modified']}") else: print(f"{filename:40s} | 错误: {info['error']}") else: print("暂无日志文件") print("\n" + "="*70) print("日志配置测试完成!") print("="*70)