255 lines
8.6 KiB
Python
255 lines
8.6 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
数据库配置管理模块
|
|||
|
|
统一管理数据库连接和SQL操作
|
|||
|
|
"""
|
|||
|
|
import pymysql
|
|||
|
|
from typing import Optional, Dict, List, Tuple, Any
|
|||
|
|
from log_config import setup_database_logger
|
|||
|
|
|
|||
|
|
# 初始化数据库日志记录器
|
|||
|
|
logger = setup_database_logger()
|
|||
|
|
|
|||
|
|
# 数据库配置
|
|||
|
|
# ai_statistics_read 账号对 ai_article 数据库有 SELECT, INSERT, UPDATE 权限
|
|||
|
|
# 可用于数据查询和写入操作(ai_statistics, ai_statistics_day, ai_statistics_days 表)
|
|||
|
|
DB_CONFIG = {
|
|||
|
|
'host': '8.149.233.36',
|
|||
|
|
'port': 3306,
|
|||
|
|
'user': 'ai_statistics_read',
|
|||
|
|
'password': '7aK_H2yvokVumr84lLNDt8fDBp6P',
|
|||
|
|
'database': 'ai_article', # 修正:使用 ai_article 数据库
|
|||
|
|
'charset': 'utf8mb4'
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
class DatabaseManager:
|
|||
|
|
"""数据库管理器:统一管理数据库连接和操作"""
|
|||
|
|
|
|||
|
|
def __init__(self, config: Optional[Dict] = None):
|
|||
|
|
"""初始化数据库管理器
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
config: 数据库配置字典,默认使用 DB_CONFIG
|
|||
|
|
"""
|
|||
|
|
self.config = config or DB_CONFIG
|
|||
|
|
|
|||
|
|
def get_connection(self, autocommit: bool = False):
|
|||
|
|
"""获取数据库连接
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
autocommit: 是否启用自动提交模式
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
pymysql连接对象
|
|||
|
|
"""
|
|||
|
|
return pymysql.connect(**self.config, autocommit=autocommit)
|
|||
|
|
|
|||
|
|
def execute_query(self, sql: str, params: Optional[Tuple] = None,
|
|||
|
|
fetch_one: bool = False, dict_cursor: bool = True) -> Any:
|
|||
|
|
"""执行查询SQL(SELECT)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
sql: SQL语句
|
|||
|
|
params: SQL参数(tuple或list)
|
|||
|
|
fetch_one: True返回单条记录,False返回所有记录
|
|||
|
|
dict_cursor: True返回字典格式,False返回元组格式
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
查询结果
|
|||
|
|
"""
|
|||
|
|
conn = None
|
|||
|
|
cursor = None
|
|||
|
|
try:
|
|||
|
|
conn = self.get_connection()
|
|||
|
|
cursor_class = pymysql.cursors.DictCursor if dict_cursor else pymysql.cursors.Cursor
|
|||
|
|
cursor = conn.cursor(cursor_class)
|
|||
|
|
|
|||
|
|
logger.info(f'[SQL] {sql.strip()} | params: {params}')
|
|||
|
|
cursor.execute(sql, params or ())
|
|||
|
|
|
|||
|
|
if fetch_one:
|
|||
|
|
result = cursor.fetchone()
|
|||
|
|
else:
|
|||
|
|
result = cursor.fetchall()
|
|||
|
|
|
|||
|
|
logger.debug(f'[SQL结果] 返回 {len(result) if not fetch_one and result else (1 if result else 0)} 条记录')
|
|||
|
|
return result
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f'执行查询失败:{e}', exc_info=True)
|
|||
|
|
raise
|
|||
|
|
finally:
|
|||
|
|
if cursor:
|
|||
|
|
cursor.close()
|
|||
|
|
if conn:
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
def execute_update(self, sql: str, params: Optional[Tuple] = None,
|
|||
|
|
autocommit: bool = True) -> int:
|
|||
|
|
"""执行更新SQL(INSERT/UPDATE/DELETE)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
sql: SQL语句
|
|||
|
|
params: SQL参数(tuple或list)
|
|||
|
|
autocommit: 是否自动提交
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
影响的行数
|
|||
|
|
"""
|
|||
|
|
conn = None
|
|||
|
|
cursor = None
|
|||
|
|
try:
|
|||
|
|
conn = self.get_connection(autocommit=autocommit)
|
|||
|
|
cursor = conn.cursor()
|
|||
|
|
|
|||
|
|
logger.info(f'[SQL] {sql.strip()} | params: {params}')
|
|||
|
|
result = cursor.execute(sql, params or ())
|
|||
|
|
|
|||
|
|
# 总是手动提交,确保数据落地
|
|||
|
|
conn.commit()
|
|||
|
|
|
|||
|
|
logger.info(f'[SQL执行] 影响 {result} 行')
|
|||
|
|
return result
|
|||
|
|
except Exception as e:
|
|||
|
|
if conn:
|
|||
|
|
conn.rollback()
|
|||
|
|
logger.error(f'执行更新失败:{e}', exc_info=True)
|
|||
|
|
raise
|
|||
|
|
finally:
|
|||
|
|
if cursor:
|
|||
|
|
cursor.close()
|
|||
|
|
if conn:
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
def execute_many(self, sql: str, params_list: List[Tuple],
|
|||
|
|
autocommit: bool = True) -> int:
|
|||
|
|
"""批量执行SQL
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
sql: SQL语句
|
|||
|
|
params_list: 参数列表,每个元素是一组参数
|
|||
|
|
autocommit: 是否自动提交
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
成功执行的行数
|
|||
|
|
"""
|
|||
|
|
conn = None
|
|||
|
|
cursor = None
|
|||
|
|
try:
|
|||
|
|
conn = self.get_connection(autocommit=False) # 总是使用非Autocommit连接
|
|||
|
|
cursor = conn.cursor()
|
|||
|
|
|
|||
|
|
logger.info(f'[SQL批量] {sql.strip()} | 批次数: {len(params_list)}')
|
|||
|
|
|
|||
|
|
success_count = 0
|
|||
|
|
failed_count = 0
|
|||
|
|
unchanged_count = 0 # 数据未变化的记录数
|
|||
|
|
for idx, params in enumerate(params_list, 1):
|
|||
|
|
try:
|
|||
|
|
result = cursor.execute(sql, params)
|
|||
|
|
# INSERT...ON DUPLICATE KEY UPDATE 返回值:
|
|||
|
|
# 0 = 数据未变化(已存在且相同)
|
|||
|
|
# 1 = 新插入
|
|||
|
|
# 2 = 更新现有行
|
|||
|
|
if result > 0:
|
|||
|
|
success_count += 1
|
|||
|
|
logger.debug(f'批量执行[{idx}/{len(params_list)}]: 影响{result}行, params={params}')
|
|||
|
|
else:
|
|||
|
|
unchanged_count += 1
|
|||
|
|
logger.debug(f'批量执行[{idx}/{len(params_list)}]: 数据未变化(已存在且相同), params={params}')
|
|||
|
|
except Exception as e:
|
|||
|
|
failed_count += 1
|
|||
|
|
logger.error(f'批量执行失败[{idx}/{len(params_list)}]: params={params},错误:{e}', exc_info=True)
|
|||
|
|
|
|||
|
|
# 总是手动提交,确保数据落地
|
|||
|
|
conn.commit()
|
|||
|
|
|
|||
|
|
if failed_count > 0:
|
|||
|
|
logger.warning(f'[SQL批量执行] 成功 {success_count}/{len(params_list)} 条,失败 {failed_count} 条,未变化 {unchanged_count} 条')
|
|||
|
|
elif unchanged_count > 0:
|
|||
|
|
logger.info(f'[SQL批量执行] 成功 {success_count}/{len(params_list)} 条,未变化 {unchanged_count} 条(数据已存在且相同)')
|
|||
|
|
else:
|
|||
|
|
logger.info(f'[SQL批量执行] 成功 {success_count}/{len(params_list)} 条')
|
|||
|
|
return success_count
|
|||
|
|
except Exception as e:
|
|||
|
|
if conn:
|
|||
|
|
conn.rollback()
|
|||
|
|
logger.error(f'批量执行失败:{e}', exc_info=True)
|
|||
|
|
raise
|
|||
|
|
finally:
|
|||
|
|
if cursor:
|
|||
|
|
cursor.close()
|
|||
|
|
if conn:
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
def insert_or_update(self, table: str, data: Dict,
|
|||
|
|
unique_keys: List[str], autocommit: bool = True) -> int:
|
|||
|
|
"""插入或更新数据(ON DUPLICATE KEY UPDATE)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
table: 表名
|
|||
|
|
data: 数据字典
|
|||
|
|
unique_keys: 唯一键字段列表
|
|||
|
|
autocommit: 是否自动提交
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
影响的行数
|
|||
|
|
"""
|
|||
|
|
if not data:
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
# 构建INSERT语句
|
|||
|
|
columns = ', '.join(data.keys())
|
|||
|
|
placeholders = ', '.join(['%s'] * len(data))
|
|||
|
|
|
|||
|
|
# 构建UPDATE部分(排除唯一键)
|
|||
|
|
update_parts = []
|
|||
|
|
for key in data.keys():
|
|||
|
|
if key not in unique_keys:
|
|||
|
|
update_parts.append(f"{key} = VALUES({key})")
|
|||
|
|
|
|||
|
|
sql = f"""
|
|||
|
|
INSERT INTO {table} ({columns})
|
|||
|
|
VALUES ({placeholders})
|
|||
|
|
ON DUPLICATE KEY UPDATE {', '.join(update_parts)}
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
params = tuple(data.values())
|
|||
|
|
return self.execute_update(sql, params, autocommit=autocommit)
|
|||
|
|
|
|||
|
|
def test_connection(self) -> bool:
|
|||
|
|
"""测试数据库连接
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
连接是否成功
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
conn = self.get_connection()
|
|||
|
|
cursor = conn.cursor()
|
|||
|
|
cursor.execute("SELECT 1")
|
|||
|
|
result = cursor.fetchone()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
if result:
|
|||
|
|
logger.info(f"[OK] 数据库连接成功: {self.config['host']}:{self.config.get('port', 3306)}/{self.config['database']}")
|
|||
|
|
return True
|
|||
|
|
return False
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"[X] 数据库连接失败: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 创建全局数据库管理器实例
|
|||
|
|
db_manager = DatabaseManager()
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
# 测试连接
|
|||
|
|
print("\n测试数据库连接...")
|
|||
|
|
if db_manager.test_connection():
|
|||
|
|
print("✓ 连接成功")
|
|||
|
|
else:
|
|||
|
|
print("✗ 连接失败")
|