""" 数据库配置管理模块 统一管理数据库连接和SQL操作 """ import pymysql import logging logger = logging.getLogger(__name__) # 数据库配置 DB_CONFIG = { 'host': '8.149.233.36', 'user': 'ai_article_read', 'password': '7aK_H2yvokVumr84lLNDt8fDBp6P', 'database': 'ai_article', 'charset': 'utf8mb4' } class DatabaseManager: """数据库管理器:统一管理数据库连接和操作""" def __init__(self, config=None): """初始化数据库管理器 Args: config: 数据库配置字典,默认使用 DB_CONFIG """ self.config = config or DB_CONFIG def get_connection(self, autocommit=False): """获取数据库连接 Args: autocommit: 是否启用自动提交模式 Returns: pymysql连接对象 """ return pymysql.connect(**self.config, autocommit=autocommit) def execute_query(self, sql, params=None, fetch_one=False): """执行查询SQL(SELECT) Args: sql: SQL语句 params: SQL参数(tuple或list) fetch_one: True返回单条记录,False返回所有记录 Returns: 查询结果 """ conn = None cursor = None try: conn = self.get_connection() cursor = conn.cursor() logger.info(f'[SQL] {sql.strip()} | params: {params}') cursor.execute(sql, params or ()) if fetch_one: result = cursor.fetchone() else: result = cursor.fetchall() logger.debug(f'[SQL结果] 返回 {len(result) if not fetch_one and result else (1 if result else 0)} 条记录') return result except Exception as e: logger.error(f'执行查询失败:{e}', exc_info=True) raise finally: if cursor: cursor.close() if conn: conn.close() def execute_update(self, sql, params=None, autocommit=True): """执行更新SQL(INSERT/UPDATE/DELETE) Args: sql: SQL语句 params: SQL参数(tuple或list) autocommit: 是否自动提交 Returns: 影响的行数 """ conn = None cursor = None try: conn = self.get_connection(autocommit=autocommit) cursor = conn.cursor() logger.info(f'[SQL] {sql.strip()} | params: {params}') result = cursor.execute(sql, params or ()) if not autocommit: conn.commit() logger.info(f'[SQL执行] 影响 {result} 行') return result except Exception as e: if not autocommit and conn: conn.rollback() logger.error(f'执行更新失败:{e}', exc_info=True) raise finally: if cursor: cursor.close() if conn: conn.close() def execute_many(self, sql, params_list, autocommit=True): """批量执行SQL Args: sql: SQL语句 params_list: 参数列表,每个元素是一组参数 autocommit: 是否自动提交 Returns: 成功执行的行数 """ conn = None cursor = None try: conn = self.get_connection(autocommit=autocommit) cursor = conn.cursor() logger.info(f'[SQL批量] {sql.strip()} | 批次数: {len(params_list)}') success_count = 0 for params in params_list: try: result = cursor.execute(sql, params) if result > 0: success_count += 1 except Exception as e: logger.debug(f'批量执行跳过:params={params},错误:{e}') if not autocommit: conn.commit() logger.info(f'[SQL批量执行] 成功 {success_count}/{len(params_list)} 条') return success_count except Exception as e: if not autocommit and conn: conn.rollback() logger.error(f'批量执行失败:{e}', exc_info=True) raise finally: if cursor: cursor.close() if conn: conn.close() # 创建全局数据库管理器实例 db_manager = DatabaseManager()