Files
ai_image_quary/database_config.py

161 lines
4.7 KiB
Python
Raw Permalink Normal View History

"""
数据库配置管理模块
统一管理数据库连接和SQL操作
"""
import pymysql
import logging
logger = logging.getLogger(__name__)
# 数据库配置
DB_CONFIG = {
'host': '8.149.233.36',
'user': 'ai_article_read',
'password': '7aK_H2yvokVumr84lLNDt8fDBp6P',
'database': 'ai_article',
'charset': 'utf8mb4'
}
class DatabaseManager:
"""数据库管理器:统一管理数据库连接和操作"""
def __init__(self, config=None):
"""初始化数据库管理器
Args:
config: 数据库配置字典默认使用 DB_CONFIG
"""
self.config = config or DB_CONFIG
def get_connection(self, autocommit=False):
"""获取数据库连接
Args:
autocommit: 是否启用自动提交模式
Returns:
pymysql连接对象
"""
return pymysql.connect(**self.config, autocommit=autocommit)
def execute_query(self, sql, params=None, fetch_one=False):
"""执行查询SQLSELECT
Args:
sql: SQL语句
params: SQL参数tuple或list
fetch_one: True返回单条记录False返回所有记录
Returns:
查询结果
"""
conn = None
cursor = None
try:
conn = self.get_connection()
cursor = conn.cursor()
logger.info(f'[SQL] {sql.strip()} | params: {params}')
cursor.execute(sql, params or ())
if fetch_one:
result = cursor.fetchone()
else:
result = cursor.fetchall()
logger.debug(f'[SQL结果] 返回 {len(result) if not fetch_one and result else (1 if result else 0)} 条记录')
return result
except Exception as e:
logger.error(f'执行查询失败:{e}', exc_info=True)
raise
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def execute_update(self, sql, params=None, autocommit=True):
"""执行更新SQLINSERT/UPDATE/DELETE
Args:
sql: SQL语句
params: SQL参数tuple或list
autocommit: 是否自动提交
Returns:
影响的行数
"""
conn = None
cursor = None
try:
conn = self.get_connection(autocommit=autocommit)
cursor = conn.cursor()
logger.info(f'[SQL] {sql.strip()} | params: {params}')
result = cursor.execute(sql, params or ())
if not autocommit:
conn.commit()
logger.info(f'[SQL执行] 影响 {result}')
return result
except Exception as e:
if not autocommit and conn:
conn.rollback()
logger.error(f'执行更新失败:{e}', exc_info=True)
raise
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def execute_many(self, sql, params_list, autocommit=True):
"""批量执行SQL
Args:
sql: SQL语句
params_list: 参数列表每个元素是一组参数
autocommit: 是否自动提交
Returns:
成功执行的行数
"""
conn = None
cursor = None
try:
conn = self.get_connection(autocommit=autocommit)
cursor = conn.cursor()
logger.info(f'[SQL批量] {sql.strip()} | 批次数: {len(params_list)}')
success_count = 0
for params in params_list:
try:
result = cursor.execute(sql, params)
if result > 0:
success_count += 1
except Exception as e:
logger.debug(f'批量执行跳过params={params},错误:{e}')
if not autocommit:
conn.commit()
logger.info(f'[SQL批量执行] 成功 {success_count}/{len(params_list)}')
return success_count
except Exception as e:
if not autocommit and conn:
conn.rollback()
logger.error(f'批量执行失败:{e}', exc_info=True)
raise
finally:
if cursor:
cursor.close()
if conn:
conn.close()
# 创建全局数据库管理器实例
db_manager = DatabaseManager()