Files
baijiahao_data_crawl/database_config.py
“shengyudong” 322ac74336 2025-12-25 upload
2025-12-25 11:16:59 +08:00

255 lines
8.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库配置管理模块
统一管理数据库连接和SQL操作
"""
import pymysql
from typing import Optional, Dict, List, Tuple, Any
from log_config import setup_database_logger
# 初始化数据库日志记录器
logger = setup_database_logger()
# 数据库配置
# ai_statistics_read 账号对 ai_article 数据库有 SELECT, INSERT, UPDATE 权限
# 可用于数据查询和写入操作ai_statistics, ai_statistics_day, ai_statistics_days 表)
DB_CONFIG = {
'host': '8.149.233.36',
'port': 3306,
'user': 'ai_statistics_read',
'password': '7aK_H2yvokVumr84lLNDt8fDBp6P',
'database': 'ai_article', # 修正:使用 ai_article 数据库
'charset': 'utf8mb4'
}
class DatabaseManager:
"""数据库管理器:统一管理数据库连接和操作"""
def __init__(self, config: Optional[Dict] = None):
"""初始化数据库管理器
Args:
config: 数据库配置字典,默认使用 DB_CONFIG
"""
self.config = config or DB_CONFIG
def get_connection(self, autocommit: bool = False):
"""获取数据库连接
Args:
autocommit: 是否启用自动提交模式
Returns:
pymysql连接对象
"""
return pymysql.connect(**self.config, autocommit=autocommit)
def execute_query(self, sql: str, params: Optional[Tuple] = None,
fetch_one: bool = False, dict_cursor: bool = True) -> Any:
"""执行查询SQLSELECT
Args:
sql: SQL语句
params: SQL参数tuple或list
fetch_one: True返回单条记录False返回所有记录
dict_cursor: True返回字典格式False返回元组格式
Returns:
查询结果
"""
conn = None
cursor = None
try:
conn = self.get_connection()
cursor_class = pymysql.cursors.DictCursor if dict_cursor else pymysql.cursors.Cursor
cursor = conn.cursor(cursor_class)
logger.info(f'[SQL] {sql.strip()} | params: {params}')
cursor.execute(sql, params or ())
if fetch_one:
result = cursor.fetchone()
else:
result = cursor.fetchall()
logger.debug(f'[SQL结果] 返回 {len(result) if not fetch_one and result else (1 if result else 0)} 条记录')
return result
except Exception as e:
logger.error(f'执行查询失败:{e}', exc_info=True)
raise
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def execute_update(self, sql: str, params: Optional[Tuple] = None,
autocommit: bool = True) -> int:
"""执行更新SQLINSERT/UPDATE/DELETE
Args:
sql: SQL语句
params: SQL参数tuple或list
autocommit: 是否自动提交
Returns:
影响的行数
"""
conn = None
cursor = None
try:
conn = self.get_connection(autocommit=autocommit)
cursor = conn.cursor()
logger.info(f'[SQL] {sql.strip()} | params: {params}')
result = cursor.execute(sql, params or ())
# 总是手动提交,确保数据落地
conn.commit()
logger.info(f'[SQL执行] 影响 {result}')
return result
except Exception as e:
if conn:
conn.rollback()
logger.error(f'执行更新失败:{e}', exc_info=True)
raise
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def execute_many(self, sql: str, params_list: List[Tuple],
autocommit: bool = True) -> int:
"""批量执行SQL
Args:
sql: SQL语句
params_list: 参数列表,每个元素是一组参数
autocommit: 是否自动提交
Returns:
成功执行的行数
"""
conn = None
cursor = None
try:
conn = self.get_connection(autocommit=False) # 总是使用非Autocommit连接
cursor = conn.cursor()
logger.info(f'[SQL批量] {sql.strip()} | 批次数: {len(params_list)}')
success_count = 0
failed_count = 0
unchanged_count = 0 # 数据未变化的记录数
for idx, params in enumerate(params_list, 1):
try:
result = cursor.execute(sql, params)
# INSERT...ON DUPLICATE KEY UPDATE 返回值:
# 0 = 数据未变化(已存在且相同)
# 1 = 新插入
# 2 = 更新现有行
if result > 0:
success_count += 1
logger.debug(f'批量执行[{idx}/{len(params_list)}]: 影响{result}行, params={params}')
else:
unchanged_count += 1
logger.debug(f'批量执行[{idx}/{len(params_list)}]: 数据未变化(已存在且相同), params={params}')
except Exception as e:
failed_count += 1
logger.error(f'批量执行失败[{idx}/{len(params_list)}]: params={params},错误:{e}', exc_info=True)
# 总是手动提交,确保数据落地
conn.commit()
if failed_count > 0:
logger.warning(f'[SQL批量执行] 成功 {success_count}/{len(params_list)} 条,失败 {failed_count} 条,未变化 {unchanged_count}')
elif unchanged_count > 0:
logger.info(f'[SQL批量执行] 成功 {success_count}/{len(params_list)} 条,未变化 {unchanged_count} 条(数据已存在且相同)')
else:
logger.info(f'[SQL批量执行] 成功 {success_count}/{len(params_list)}')
return success_count
except Exception as e:
if conn:
conn.rollback()
logger.error(f'批量执行失败:{e}', exc_info=True)
raise
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def insert_or_update(self, table: str, data: Dict,
unique_keys: List[str], autocommit: bool = True) -> int:
"""插入或更新数据ON DUPLICATE KEY UPDATE
Args:
table: 表名
data: 数据字典
unique_keys: 唯一键字段列表
autocommit: 是否自动提交
Returns:
影响的行数
"""
if not data:
return 0
# 构建INSERT语句
columns = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
# 构建UPDATE部分排除唯一键
update_parts = []
for key in data.keys():
if key not in unique_keys:
update_parts.append(f"{key} = VALUES({key})")
sql = f"""
INSERT INTO {table} ({columns})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {', '.join(update_parts)}
"""
params = tuple(data.values())
return self.execute_update(sql, params, autocommit=autocommit)
def test_connection(self) -> bool:
"""测试数据库连接
Returns:
连接是否成功
"""
try:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
logger.info(f"[OK] 数据库连接成功: {self.config['host']}:{self.config.get('port', 3306)}/{self.config['database']}")
return True
return False
except Exception as e:
logger.error(f"[X] 数据库连接失败: {e}")
return False
# 创建全局数据库管理器实例
db_manager = DatabaseManager()
if __name__ == '__main__':
# 测试连接
print("\n测试数据库连接...")
if db_manager.test_connection():
print("✓ 连接成功")
else:
print("✗ 连接失败")