#!/usr/bin/env python # -*- coding: utf-8 -*- """ 数据库连接配置 """ import pymysql import logging import time from datetime import datetime from pymysql.cursors import DictCursor from contextlib import contextmanager from pymysql.err import OperationalError, InterfaceError, InternalError import threading # 数据库配置 DB_CONFIG = { 'host': '8.149.233.36', 'port': 3306, 'user': 'ai_wht_write', 'password': '7aK_H2yvokVumr84lLNDt8fDBp6P', 'database': 'ai_wht', 'charset': 'utf8mb4', 'collation': 'utf8mb4_unicode_ci', 'cursorclass': DictCursor, 'autocommit': True, 'connect_timeout': 10, 'read_timeout': 30, 'write_timeout': 30, 'max_allowed_packet': 16777216, # 16MB 'sql_mode': 'STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE,ERROR_FOR_DIVISION_BY_ZERO' } # 使用与主应用相同的日志记录器 logger = logging.getLogger('article_server') class DatabaseManager: """数据库连接管理器""" def __init__(self, config=None): self.config = config or DB_CONFIG # 使用线程本地连接,避免多线程复用同一连接导致的数据包序号错误 self._local = threading.local() self._max_retries = 3 self._retry_delay = 1 # 秒 def get_connection(self): """获取数据库连接""" try: # 线程本地连接对象 connection = getattr(self._local, 'connection', None) # 检查现有连接是否有效 if connection is not None: try: # 尝试 ping 并在需要时自动重连 connection.ping(reconnect=True) if connection.open: return connection except (OperationalError, InterfaceError, AttributeError): logger.warning("检测到无效连接,将重新创建") connection = None self._local.connection = None # 创建新连接(线程专有) self._local.connection = pymysql.connect(**self.config) # 确保连接使用正确的字符集 with self._local.connection.cursor() as cursor: cursor.execute("SET NAMES utf8mb4") logger.info("数据库连接成功,字符集设置完成") return self._local.connection except Exception as e: logger.error(f"数据库连接失败: {e}") # 确保线程本地连接被清理 try: self._local.connection = None except Exception: pass raise def close_connection(self): """关闭数据库连接""" connection = getattr(self._local, 'connection', None) if connection: try: if connection.open: connection.close() logger.info("数据库连接已关闭") except Exception as e: logger.warning(f"关闭数据库连接时出错: {e}") finally: self._local.connection = None def _execute_with_retry(self, operation, *args, **kwargs): """带重试机制的执行操作""" last_exception = None for attempt in range(self._max_retries): try: return operation(*args, **kwargs) except (OperationalError, InterfaceError, InternalError, AttributeError) as e: last_exception = e error_code = getattr(e, 'args', [None])[0] if getattr(e, 'args', None) else None # 检查是否是连接丢失错误 message = str(e) is_packet_seq_error = isinstance(e, InternalError) and 'Packet sequence number wrong' in message is_already_closed = isinstance(e, InterfaceError) and ('Already closed' in message or 'closed' in message.lower()) is_none_read = isinstance(e, AttributeError) and "'NoneType' object has no attribute 'read'" in message if error_code in [2006, 2013, 2055] or is_packet_seq_error or is_already_closed or is_none_read: logger.warning(f"数据库连接丢失 (尝试 {attempt + 1}/{self._max_retries}): {e}") # 关闭现有连接 self.close_connection() if attempt < self._max_retries - 1: time.sleep(self._retry_delay) continue else: # 其他数据库错误,不重试 raise except Exception as e: # 其他错误,不重试 raise # 所有重试都失败了 logger.error(f"数据库操作失败,已重试 {self._max_retries} 次: {last_exception}") raise last_exception @contextmanager def get_cursor(self): """获取数据库游标的上下文管理器""" connection = None cursor = None try: # 获取连接 connection = self._execute_with_retry(self.get_connection) cursor = connection.cursor(DictCursor) yield cursor # 提交事务 if connection and connection.open: try: connection.commit() except (OperationalError, InterfaceError) as e: logger.warning(f"提交事务失败: {e}") # 不抛出异常,因为可能是连接问题 except Exception as e: # 回滚事务 if connection and connection.open: try: connection.rollback() except (OperationalError, InterfaceError) as rollback_error: logger.warning(f"回滚事务失败: {rollback_error}") # 不抛出异常,因为可能是连接问题 logger.error(f"数据库操作失败: {e}") raise finally: # 关闭游标 if cursor: try: cursor.close() except Exception as e: logger.warning(f"关闭游标时出错: {e}") def execute_query(self, sql, params=None): """执行查询SQL""" def _query_operation(): try: with self.get_cursor() as cursor: # 记录SQL执行开始 logger.info(f"[SQL执行] 开始执行查询SQL") logger.info(f"[SQL语句] {sql}") if params: logger.info(f"[SQL参数] {params}") cursor.execute(sql, params) result = cursor.fetchall() # 记录SQL执行结果 logger.info(f"[SQL结果] 查询完成,返回 {len(result)} 条记录") if result and len(result) <= 3: # 只记录前3条结果,避免日志过长 logger.info(f"[SQL数据] 查询结果: {result}") return result except Exception as e: # 异常记录到错误日志 logger.error(f"[SQL执行失败] 查询SQL执行异常: {e}") logger.error(f"[SQL语句] {sql}") if params: logger.error(f"[SQL参数] {params}") raise return self._execute_with_retry(_query_operation) def execute_update(self, sql, params=None): """执行更新SQL""" def _update_operation(): try: with self.get_cursor() as cursor: # 记录SQL执行开始 logger.info(f"[SQL执行] 开始执行更新SQL") logger.info(f"[SQL语句] {sql}") if params: logger.info(f"[SQL参数] {params}") affected_rows = cursor.execute(sql, params) # 记录SQL执行结果 logger.info(f"[SQL结果] 更新完成,影响 {affected_rows} 行") return affected_rows except Exception as e: # 异常记录到错误日志 logger.error(f"[SQL执行失败] 更新SQL执行异常: {e}") logger.error(f"[SQL语句] {sql}") if params: logger.error(f"[SQL参数] {params}") raise return self._execute_with_retry(_update_operation) def execute_insert(self, sql, params=None): """执行插入SQL""" def _insert_operation(): try: with self.get_cursor() as cursor: # 记录SQL执行开始 logger.info(f"[SQL执行] 开始执行插入SQL") logger.info(f"[SQL语句] {sql}") if params: logger.info(f"[SQL参数] {params}") cursor.execute(sql, params) last_id = cursor.lastrowid # 记录SQL执行结果 logger.info(f"[SQL结果] 插入完成,新记录ID: {last_id}") return last_id except Exception as e: # 异常记录到错误日志 logger.error(f"[SQL执行失败] 插入SQL执行异常: {e}") logger.error(f"[SQL语句] {sql}") if params: logger.error(f"[SQL参数] {params}") raise return self._execute_with_retry(_insert_operation) # 全局数据库管理器实例 db_manager = DatabaseManager() def get_db_manager(): """获取数据库管理器实例""" return db_manager def format_datetime_fields(data, datetime_fields=['created_at', 'updated_at']): """ 格式化数据中的日期时间字段为 Y-m-d H:i:s 格式 Args: data: 单个字典或字典列表 datetime_fields: 需要格式化的字段名列表 Returns: 格式化后的数据 """ def format_single_record(record): if not isinstance(record, dict): return record for field in datetime_fields: if field in record and record[field] is not None: if isinstance(record[field], datetime): record[field] = record[field].strftime('%Y-%m-%d %H:%M:%S') return record if isinstance(data, list): return [format_single_record(record) for record in data] else: return format_single_record(data)