""" 配置管理模块 提供配置的读取、更新和持久化功能 """ import os import re from pathlib import Path from typing import Dict, Any, Optional from config import Config class ConfigManager: """配置管理器""" # 可修改的配置项定义 CONFIGURABLE_ITEMS = { 'click_strategy': { 'MIN_CLICK_COUNT': {'type': 'int', 'min': 1, 'max': 100, 'label': '最小点击次数'}, 'MAX_CLICK_COUNT': {'type': 'int', 'min': 1, 'max': 100, 'label': '最大点击次数'}, 'CLICK_INTERVAL_MINUTES': {'type': 'int', 'min': 1, 'max': 1440, 'label': '点击间隔(分钟)'}, }, 'work_time': { 'WORK_START_HOUR': {'type': 'int', 'min': 0, 'max': 23, 'label': '工作开始时间'}, 'WORK_END_HOUR': {'type': 'int', 'min': 0, 'max': 23, 'label': '工作结束时间'}, }, 'task_config': { 'MIN_TASK_INTERVAL_MINUTES': {'type': 'int', 'min': 1, 'max': 60, 'label': '任务最小间隔(分钟)'}, 'MAX_TASK_INTERVAL_MINUTES': {'type': 'int', 'min': 1, 'max': 60, 'label': '任务最大间隔(分钟)'}, 'REPLY_WAIT_TIMEOUT': {'type': 'int', 'min': 5, 'max': 300, 'label': '回复等待超时(秒)'}, }, 'crawler_config': { 'CRAWLER_ENABLED': {'type': 'bool', 'label': '启用爬虫'}, 'CRAWLER_SCHEDULE_TIME': {'type': 'str', 'pattern': r'^\d{2}:\d{2}$', 'label': '爬虫执行时间'}, 'CRAWLER_BATCH_SIZE': {'type': 'int', 'min': 1, 'max': 100, 'label': '每次爬取数量'}, }, 'server_config': { 'SERVER_PORT': {'type': 'int', 'min': 1024, 'max': 65535, 'label': '服务端口'}, 'DEBUG': {'type': 'bool', 'label': '调试模式'}, } } def __init__(self): self.env = os.getenv('ENV', 'development') self.env_file = f'.env.{self.env}' if self.env in ['development', 'production'] else '.env' self.env_path = Path(self.env_file) def get_current_config(self) -> Dict[str, Any]: """获取当前配置""" config = { 'env': self.env, 'click_strategy': { 'min_click_count': getattr(Config, 'MIN_CLICK_COUNT', 1), 'max_click_count': getattr(Config, 'MAX_CLICK_COUNT', 3), 'click_interval_minutes': getattr(Config, 'CLICK_INTERVAL_MINUTES', 30), }, 'work_time': { 'start_hour': getattr(Config, 'WORK_START_HOUR', 9), 'end_hour': getattr(Config, 'WORK_END_HOUR', 21), }, 'task_config': { 'min_interval_minutes': getattr(Config, 'MIN_TASK_INTERVAL_MINUTES', 3), 'max_interval_minutes': getattr(Config, 'MAX_TASK_INTERVAL_MINUTES', 5), 'reply_wait_timeout': getattr(Config, 'REPLY_WAIT_TIMEOUT', 30), }, 'crawler_config': { 'enabled': getattr(Config, 'CRAWLER_ENABLED', True), 'schedule_time': getattr(Config, 'CRAWLER_SCHEDULE_TIME', '02:00'), 'batch_size': getattr(Config, 'CRAWLER_BATCH_SIZE', 10), }, 'server_config': { 'host': getattr(Config, 'SERVER_HOST', '0.0.0.0'), 'port': getattr(Config, 'SERVER_PORT', 5000), 'debug': getattr(Config, 'DEBUG', False), }, 'adspower_config': { 'api_url': getattr(Config, 'ADSPOWER_API_URL', 'http://local.adspower.net:50325'), 'user_id': getattr(Config, 'ADSPOWER_USER_ID', ''), }, 'paths': { 'data_dir': getattr(Config, 'DATA_DIR', './data'), 'log_dir': getattr(Config, 'LOG_DIR', './logs'), } } return config def validate_config(self, key: str, value: Any, config_def: Dict) -> tuple: """ 验证配置值 返回: (是否有效, 错误消息) """ config_type = config_def.get('type', 'str') try: if config_type == 'int': value = int(value) min_val = config_def.get('min') max_val = config_def.get('max') if min_val is not None and value < min_val: return False, f'{key} 不能小于 {min_val}' if max_val is not None and value > max_val: return False, f'{key} 不能大于 {max_val}' elif config_type == 'bool': if isinstance(value, str): value = value.lower() in ('true', '1', 'yes') else: value = bool(value) elif config_type == 'str': value = str(value) pattern = config_def.get('pattern') if pattern and not re.match(pattern, value): return False, f'{key} 格式不正确' return True, None except (ValueError, TypeError) as e: return False, f'{key} 值无效: {str(e)}' def update_config(self, updates: Dict[str, Any]) -> Dict[str, Any]: """ 更新配置 返回: {'success': bool, 'message': str, 'updated': list, 'requires_restart': bool} """ result = { 'success': True, 'message': '', 'updated': [], 'requires_restart': False, 'errors': [] } # 配置项映射 config_mapping = { 'min_click_count': 'MIN_CLICK_COUNT', 'max_click_count': 'MAX_CLICK_COUNT', 'click_interval_minutes': 'CLICK_INTERVAL_MINUTES', 'start_hour': 'WORK_START_HOUR', 'end_hour': 'WORK_END_HOUR', 'min_interval_minutes': 'MIN_TASK_INTERVAL_MINUTES', 'max_interval_minutes': 'MAX_TASK_INTERVAL_MINUTES', 'reply_wait_timeout': 'REPLY_WAIT_TIMEOUT', 'crawler_enabled': 'CRAWLER_ENABLED', 'schedule_time': 'CRAWLER_SCHEDULE_TIME', 'batch_size': 'CRAWLER_BATCH_SIZE', 'server_port': 'SERVER_PORT', 'debug': 'DEBUG', } # 需要重启的配置项 restart_required_keys = ['server_port', 'debug', 'crawler_enabled', 'schedule_time'] env_updates = {} for key, value in updates.items(): env_key = config_mapping.get(key) if not env_key: continue # 查找配置定义 config_def = None for category, items in self.CONFIGURABLE_ITEMS.items(): if env_key in items: config_def = items[env_key] break if not config_def: continue # 验证 is_valid, error = self.validate_config(key, value, config_def) if not is_valid: result['errors'].append(error) continue env_updates[env_key] = value result['updated'].append(key) if key in restart_required_keys: result['requires_restart'] = True if result['errors']: result['success'] = False result['message'] = '部分配置验证失败: ' + '; '.join(result['errors']) return result if not env_updates: result['message'] = '没有需要更新的配置' return result # 更新环境变量和Config类 for key, value in env_updates.items(): os.environ[key] = str(value) if hasattr(Config, key): setattr(Config, key, value) # 尝试更新.env文件 try: self._update_env_file(env_updates) except Exception as e: result['message'] = f'配置已更新到内存,但写入文件失败: {str(e)}' return result result['message'] = f'成功更新 {len(result["updated"])} 项配置' if result['requires_restart']: result['message'] += '(部分配置需要重启服务生效)' return result def _update_env_file(self, updates: Dict[str, Any]): """更新.env文件""" if not self.env_path.exists(): # 如果文件不存在,创建新文件 lines = [] else: with open(self.env_path, 'r', encoding='utf-8') as f: lines = f.readlines() # 更新已存在的配置 updated_keys = set() new_lines = [] for line in lines: stripped = line.strip() if stripped and not stripped.startswith('#') and '=' in stripped: key = stripped.split('=')[0].strip() if key in updates: value = updates[key] if isinstance(value, bool): value = 'true' if value else 'false' new_lines.append(f'{key}={value}\n') updated_keys.add(key) else: new_lines.append(line) else: new_lines.append(line) # 添加新的配置项 for key, value in updates.items(): if key not in updated_keys: if isinstance(value, bool): value = 'true' if value else 'false' new_lines.append(f'{key}={value}\n') # 写入文件 with open(self.env_path, 'w', encoding='utf-8') as f: f.writelines(new_lines) def get_config_schema(self) -> Dict: """获取配置项定义(用于前端表单生成)""" schema = {} for category, items in self.CONFIGURABLE_ITEMS.items(): schema[category] = {} for key, config_def in items.items(): schema[category][key] = { 'type': config_def['type'], 'label': config_def['label'], 'min': config_def.get('min'), 'max': config_def.get('max'), 'pattern': config_def.get('pattern'), } return schema