Files
ai_wht_B/ver_25122115/database_config.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

288 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
数据库连接配置
"""
import pymysql
import logging
import time
from datetime import datetime
from pymysql.cursors import DictCursor
from contextlib import contextmanager
from pymysql.err import OperationalError, InterfaceError, InternalError
import threading
# 数据库配置
DB_CONFIG = {
'host': '8.149.233.36',
'port': 3306,
'user': 'ai_wht_write',
'password': '7aK_H2yvokVumr84lLNDt8fDBp6P',
'database': 'ai_wht',
'charset': 'utf8mb4',
'collation': 'utf8mb4_unicode_ci',
'cursorclass': DictCursor,
'autocommit': True,
'connect_timeout': 10,
'read_timeout': 30,
'write_timeout': 30,
'max_allowed_packet': 16777216, # 16MB
'sql_mode': 'STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE,ERROR_FOR_DIVISION_BY_ZERO'
}
# 使用与主应用相同的日志记录器
logger = logging.getLogger('article_server')
class DatabaseManager:
"""数据库连接管理器"""
def __init__(self, config=None):
self.config = config or DB_CONFIG
# 使用线程本地连接,避免多线程复用同一连接导致的数据包序号错误
self._local = threading.local()
self._max_retries = 3
self._retry_delay = 1 # 秒
def get_connection(self):
"""获取数据库连接"""
try:
# 线程本地连接对象
connection = getattr(self._local, 'connection', None)
# 检查现有连接是否有效
if connection is not None:
try:
# 尝试 ping 并在需要时自动重连
connection.ping(reconnect=True)
if connection.open:
return connection
except (OperationalError, InterfaceError, AttributeError):
logger.warning("检测到无效连接,将重新创建")
connection = None
self._local.connection = None
# 创建新连接(线程专有)
self._local.connection = pymysql.connect(**self.config)
# 确保连接使用正确的字符集
with self._local.connection.cursor() as cursor:
cursor.execute("SET NAMES utf8mb4")
logger.info("数据库连接成功,字符集设置完成")
return self._local.connection
except Exception as e:
logger.error(f"数据库连接失败: {e}")
# 确保线程本地连接被清理
try:
self._local.connection = None
except Exception:
pass
raise
def close_connection(self):
"""关闭数据库连接"""
connection = getattr(self._local, 'connection', None)
if connection:
try:
if connection.open:
connection.close()
logger.info("数据库连接已关闭")
except Exception as e:
logger.warning(f"关闭数据库连接时出错: {e}")
finally:
self._local.connection = None
def _execute_with_retry(self, operation, *args, **kwargs):
"""带重试机制的执行操作"""
last_exception = None
for attempt in range(self._max_retries):
try:
return operation(*args, **kwargs)
except (OperationalError, InterfaceError, InternalError, AttributeError) as e:
last_exception = e
error_code = getattr(e, 'args', [None])[0] if getattr(e, 'args', None) else None
# 检查是否是连接丢失错误
message = str(e)
is_packet_seq_error = isinstance(e, InternalError) and 'Packet sequence number wrong' in message
is_already_closed = isinstance(e, InterfaceError) and ('Already closed' in message or 'closed' in message.lower())
is_none_read = isinstance(e, AttributeError) and "'NoneType' object has no attribute 'read'" in message
if error_code in [2006, 2013, 2055] or is_packet_seq_error or is_already_closed or is_none_read:
logger.warning(f"数据库连接丢失 (尝试 {attempt + 1}/{self._max_retries}): {e}")
# 关闭现有连接
self.close_connection()
if attempt < self._max_retries - 1:
time.sleep(self._retry_delay)
continue
else:
# 其他数据库错误,不重试
raise
except Exception as e:
# 其他错误,不重试
raise
# 所有重试都失败了
logger.error(f"数据库操作失败,已重试 {self._max_retries} 次: {last_exception}")
raise last_exception
@contextmanager
def get_cursor(self):
"""获取数据库游标的上下文管理器"""
connection = None
cursor = None
try:
# 获取连接
connection = self._execute_with_retry(self.get_connection)
cursor = connection.cursor(DictCursor)
yield cursor
# 提交事务
if connection and connection.open:
try:
connection.commit()
except (OperationalError, InterfaceError) as e:
logger.warning(f"提交事务失败: {e}")
# 不抛出异常,因为可能是连接问题
except Exception as e:
# 回滚事务
if connection and connection.open:
try:
connection.rollback()
except (OperationalError, InterfaceError) as rollback_error:
logger.warning(f"回滚事务失败: {rollback_error}")
# 不抛出异常,因为可能是连接问题
logger.error(f"数据库操作失败: {e}")
raise
finally:
# 关闭游标
if cursor:
try:
cursor.close()
except Exception as e:
logger.warning(f"关闭游标时出错: {e}")
def execute_query(self, sql, params=None):
"""执行查询SQL"""
def _query_operation():
try:
with self.get_cursor() as cursor:
# 记录SQL执行开始
logger.info(f"[SQL执行] 开始执行查询SQL")
logger.info(f"[SQL语句] {sql}")
if params:
logger.info(f"[SQL参数] {params}")
cursor.execute(sql, params)
result = cursor.fetchall()
# 记录SQL执行结果
logger.info(f"[SQL结果] 查询完成,返回 {len(result)} 条记录")
if result and len(result) <= 3: # 只记录前3条结果避免日志过长
logger.info(f"[SQL数据] 查询结果: {result}")
return result
except Exception as e:
# 异常记录到错误日志
logger.error(f"[SQL执行失败] 查询SQL执行异常: {e}")
logger.error(f"[SQL语句] {sql}")
if params:
logger.error(f"[SQL参数] {params}")
raise
return self._execute_with_retry(_query_operation)
def execute_update(self, sql, params=None):
"""执行更新SQL"""
def _update_operation():
try:
with self.get_cursor() as cursor:
# 记录SQL执行开始
logger.info(f"[SQL执行] 开始执行更新SQL")
logger.info(f"[SQL语句] {sql}")
if params:
logger.info(f"[SQL参数] {params}")
affected_rows = cursor.execute(sql, params)
# 记录SQL执行结果
logger.info(f"[SQL结果] 更新完成,影响 {affected_rows}")
return affected_rows
except Exception as e:
# 异常记录到错误日志
logger.error(f"[SQL执行失败] 更新SQL执行异常: {e}")
logger.error(f"[SQL语句] {sql}")
if params:
logger.error(f"[SQL参数] {params}")
raise
return self._execute_with_retry(_update_operation)
def execute_insert(self, sql, params=None):
"""执行插入SQL"""
def _insert_operation():
try:
with self.get_cursor() as cursor:
# 记录SQL执行开始
logger.info(f"[SQL执行] 开始执行插入SQL")
logger.info(f"[SQL语句] {sql}")
if params:
logger.info(f"[SQL参数] {params}")
cursor.execute(sql, params)
last_id = cursor.lastrowid
# 记录SQL执行结果
logger.info(f"[SQL结果] 插入完成新记录ID: {last_id}")
return last_id
except Exception as e:
# 异常记录到错误日志
logger.error(f"[SQL执行失败] 插入SQL执行异常: {e}")
logger.error(f"[SQL语句] {sql}")
if params:
logger.error(f"[SQL参数] {params}")
raise
return self._execute_with_retry(_insert_operation)
# 全局数据库管理器实例
db_manager = DatabaseManager()
def get_db_manager():
"""获取数据库管理器实例"""
return db_manager
def format_datetime_fields(data, datetime_fields=['created_at', 'updated_at']):
"""
格式化数据中的日期时间字段为 Y-m-d H:i:s 格式
Args:
data: 单个字典或字典列表
datetime_fields: 需要格式化的字段名列表
Returns:
格式化后的数据
"""
def format_single_record(record):
if not isinstance(record, dict):
return record
for field in datetime_fields:
if field in record and record[field] is not None:
if isinstance(record[field], datetime):
record[field] = record[field].strftime('%Y-%m-%d %H:%M:%S')
return record
if isinstance(data, list):
return [format_single_record(record) for record in data]
else:
return format_single_record(data)