Files
ai_import_quary/import_keywords.py
2026-02-05 21:34:39 +08:00

427 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
从Excel文件导入关键词到baidu_keyword表
"""
import pandas as pd
import logging
import argparse
import os
import time
import glob
import shutil
from database_config import DatabaseManager
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def read_excel_keywords_with_department(excel_path, query_column='query', department_column='科室'):
"""
读取Excel文件中的关键词和部门信息
Args:
excel_path: Excel文件路径
query_column: query列名默认为'query'
department_column: 部门列名,默认为'科室'如果为None则不读取部门信息
Returns:
包含(keyword, department)元组的列表如果没有部门列则department为None
"""
try:
# 读取Excel文件
df = pd.read_excel(excel_path)
logger.info(f"成功读取Excel文件: {excel_path}")
logger.info(f"Excel文件包含 {len(df)} 行数据")
logger.info(f"Excel列名: {df.columns.tolist()}")
# 检查query列是否存在
if query_column not in df.columns:
logger.error(f"未找到query列: {query_column}")
return []
# 检查是否有部门列
has_department = department_column and department_column in df.columns
if department_column and not has_department:
logger.warning(f"未找到department列: {department_column},将不使用部门信息")
# 获取query数据
query_data = df[query_column].dropna()
query_list = query_data.tolist()
# 根据是否有部门列,组合数据
keyword_dept_pairs = []
if has_department:
# 有部门列,获取部门数据
department_data = df[department_column].dropna()
# 对齐数据长度,取最短长度
min_length = min(len(query_data), len(department_data))
query_list = query_data.iloc[:min_length].tolist()
department_list = department_data.iloc[:min_length].tolist()
for i in range(min_length):
keyword = str(query_list[i]).strip()
department = str(department_list[i]).strip()
if keyword and department: # 确保关键词和部门都不为空
keyword_dept_pairs.append((keyword, department))
else:
# 没有部门列,只提取关键词
logger.info("没有部门列,将只导入关键词,不指定科室")
for keyword in query_list:
keyword = str(keyword).strip()
if keyword: # 确保关键词不为空
keyword_dept_pairs.append((keyword, None))
# 去除重复项,保留第一个出现的组合
seen = set()
unique_keyword_dept_pairs = []
for keyword, dept in keyword_dept_pairs:
if (keyword, dept) not in seen:
seen.add((keyword, dept))
unique_keyword_dept_pairs.append((keyword, dept))
if has_department:
logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词-部门组合")
else:
logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词")
return unique_keyword_dept_pairs
except Exception as e:
logger.error(f"读取Excel文件失败: {e}", exc_info=True)
raise
def get_department_id(db_manager, department_name):
"""
根据科室名称从ai_departments表中获取对应的ID
Args:
db_manager: 数据库管理器实例
department_name: 科室名称
Returns:
科室ID如果未找到则抛出异常
"""
try:
# 查询科室ID - 使用正确的字段名
sql = "SELECT id FROM ai_departments WHERE department_name = %s"
result = db_manager.execute_query(sql, (department_name,), fetch_one=True)
if result:
return result[0] # 返回ID
else:
error_msg = f"未找到科室 '{department_name}' 的ID请先在ai_departments表中添加该科室"
logger.error(error_msg)
raise ValueError(error_msg)
except Exception as e:
logger.error(f"查询科室ID失败: {e}", exc_info=True)
raise
def get_author_info_by_department(db_manager, department_id):
"""
根据科室ID从ai_authors表中获取任一符合条件的作者信息
Args:
db_manager: 数据库管理器实例
department_id: 科室ID
Returns:
(author_id, author_name) 元组,如果未找到则返回 (None, None)
"""
try:
# 查询符合条件的作者信息
sql = "SELECT id, author_name FROM ai_authors WHERE department_id = %s AND status = 'active' AND daily_post_max > 0 LIMIT 1"
result = db_manager.execute_query(sql, (department_id,), fetch_one=True)
if result:
return result[0], result[1] # 返回 author_id, author_name
else:
logger.warning(f"未找到科室ID {department_id} 下符合条件的活跃作者")
return None, None
except Exception as e:
logger.error(f"查询作者信息失败: {e}", exc_info=True)
return None, None
def import_keywords_to_db(db_manager, keyword_dept_pairs, seed_id=9999, seed_name='手动提交', crawled=1, batch_size=100, sleep_seconds=0.1):
"""
将关键词批量导入到baidu_keyword表
Args:
db_manager: 数据库管理器实例
keyword_dept_pairs: 包含(keyword, department)元组的列表
seed_id: 种子ID默认9999
seed_name: 种子名称,默认'手动提交'
crawled: 是否已爬取默认1
batch_size: 日志批次大小,每多少条记录输出一次进度
sleep_seconds: 每条记录间隔睡眠时间默认0.1秒
Returns:
成功导入的数量
"""
if not keyword_dept_pairs:
logger.warning("没有关键词需要导入")
return 0
try:
logger.info(f"开始导入 {len(keyword_dept_pairs)} 个关键词-部门组合到数据库...")
logger.info("采用逐条查询+插入模式,避免重复")
# 准备SQL语句
check_sql = "SELECT COUNT(*) FROM baidu_keyword WHERE keyword = %s"
insert_sql = """
INSERT INTO baidu_keyword (keyword, seed_id, seed_name, crawled, parents_id, created_at, department, department_id, query_status, author_id, author_name)
VALUES (%s, %s, %s, %s, 0, NOW(), %s, %s, %s, %s, %s)
"""
success_count = 0
skip_count = 0
failed_count = 0
# 逐条处理
for idx, (keyword, department) in enumerate(keyword_dept_pairs, 1):
try:
if department:
logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 部门: {department}')
else:
logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 无部门信息')
# 1. 如果有部门信息获取科室ID和作者信息
if department:
dept_id = get_department_id(db_manager, department)
author_id, author_name = get_author_info_by_department(db_manager, dept_id)
else:
# 没有部门信息使用默认值空字符串而不是None避免数据库NOT NULL约束
department = ''
dept_id = 0
author_id = 0
author_name = ''
# 2. 查询关键词是否存在
result = db_manager.execute_query(check_sql, (keyword,), fetch_one=True)
exists = result[0] > 0 if result else False
if exists:
skip_count += 1
logger.debug(f'[调试] 关键词已存在,跳过: {keyword}')
else:
# 3. 不存在则插入
if department:
logger.debug(f'[调试] 准备插入: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}, query_status: manual_review')
else:
logger.debug(f'[调试] 准备插入: {keyword}, 无部门信息, query_status: manual_review')
affected = db_manager.execute_update(
insert_sql,
(keyword, seed_id, seed_name, crawled, department, dept_id, 'manual_review', author_id, author_name),
autocommit=True
)
if affected > 0:
success_count += 1
if department:
logger.debug(f'[调试] 插入成功: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}, query_status: manual_review')
else:
logger.debug(f'[调试] 插入成功: {keyword}, 无部门信息, query_status: manual_review')
# 5. 输出进度
if idx % batch_size == 0 or idx == len(keyword_dept_pairs):
progress = (idx / len(keyword_dept_pairs)) * 100
logger.info(f'[插入进度] {idx}/{len(keyword_dept_pairs)} ({progress:.1f}%) | 成功: {success_count} | 跳过: {skip_count} | 失败: {failed_count}')
# 6. 每次执行完sleep
time.sleep(sleep_seconds)
except ValueError as ve:
# 遇到科室不存在的错误,停止整个导入过程
logger.error(f'[错误] 第 {idx} 条记录遇到错误: {ve}')
raise ve
except Exception as e:
failed_count += 1
logger.warning(f'[调试] 处理失败 [{idx}/{len(keyword_dept_pairs)}]: keyword={keyword}, 部门={department},错误:{e}')
logger.info(f"导入完成!成功插入: {success_count} | 跳过已存在: {skip_count} | 失败: {failed_count}")
return success_count
except Exception as e:
logger.error(f"导入关键词失败: {e}", exc_info=True)
raise
def main():
"""主函数"""
# 检查query_upload文件夹是否存在
upload_folder = 'query_upload'
if not os.path.exists(upload_folder):
logger.error(f'未找到 {upload_folder} 文件夹')
return
# 查找Excel文件
excel_patterns = ['*.xlsx', '*.xls']
excel_files = []
for pattern in excel_patterns:
excel_files.extend(glob.glob(os.path.join(upload_folder, pattern)))
excel_files.extend(glob.glob(os.path.join(upload_folder, pattern.upper())))
if not excel_files:
logger.error(f'{upload_folder} 文件夹中未找到Excel文件 (.xlsx 或 .xls)')
return
logger.info(f'{upload_folder} 文件夹中找到 {len(excel_files)} 个Excel文件:')
for i, file_path in enumerate(excel_files, 1):
logger.info(f' {i}. {os.path.basename(file_path)}')
# 如果只有一个文件,直接使用;如果有多个文件,让用户选择
if len(excel_files) == 1:
excel_path = excel_files[0]
logger.info(f'自动选择文件: {os.path.basename(excel_path)}')
else:
print('\n找到多个Excel文件请选择要使用的文件:')
for i, file_path in enumerate(excel_files, 1):
print(f' {i}. {os.path.basename(file_path)}')
while True:
try:
choice = int(input(f'请选择文件 (1-{len(excel_files)}): '))
if 1 <= choice <= len(excel_files):
excel_path = excel_files[choice - 1]
break
else:
print(f'请输入1到{len(excel_files)}之间的数字')
except ValueError:
print('请输入有效的数字')
# 从database_config.py读取默认配置
from database_config import DB_CONFIG
# 解析命令行参数
parser = argparse.ArgumentParser(description='导入Excel关键词到baidu_keyword表')
parser.add_argument('--host', default=DB_CONFIG['host'], help='数据库主机')
parser.add_argument('--port', type=int, default=3306, help='数据库端口')
parser.add_argument('--user', default=DB_CONFIG['user'], help='数据库用户名')
parser.add_argument('--password', default=DB_CONFIG['password'], help='数据库密码')
parser.add_argument('--database', default=DB_CONFIG['database'], help='数据库名')
parser.add_argument('--batch-size', type=int, default=1, help='日志批次大小')
parser.add_argument('--sleep', type=float, default=0.1, help='每条记录间隔时间(秒)')
parser.add_argument('--query-column', default='query', help='Excel中的query列名默认为query')
parser.add_argument('--dept-column', default='科室', help='Excel中的部门列名默认为科室')
parser.add_argument('--seed-id', type=int, default=9999, help='种子ID')
parser.add_argument('--seed-name', default='手动提交', help='种子名称')
# 移除了命令行参数,改为交互式询问
args = parser.parse_args()
# Excel文件路径已经在上面确定了
# excel_path 已经被赋值
# 创建数据库连接配置
db_config = {
'host': args.host,
'port': args.port,
'user': args.user,
'password': args.password,
'database': args.database,
'charset': DB_CONFIG['charset'] # 使用database_config.py中的字符集配置
}
logger.info("=" * 60)
logger.info("开始导入关键词到baidu_keyword表")
logger.info(f"数据库配置: {args.user}@{args.host}:{args.port}/{args.database}")
logger.info(f"Excel文件: {os.path.basename(excel_path)}")
logger.info("=" * 60)
# 创建数据库管理器
db_manager = DatabaseManager(db_config)
try:
# 1. 读取Excel文件
keyword_dept_pairs = read_excel_keywords_with_department(excel_path, args.query_column, args.dept_column)
# 询问用户是要导入全部数据还是部分测试
print(f"\nExcel中共有 {len(keyword_dept_pairs)} 条数据")
while True:
choice = input("请选择导入方式: A) 全部导入 B) 测试模式(输入前N条数据): ").strip().upper()
if choice == 'A':
# 全部导入
break
elif choice == 'B':
try:
test_count = int(input(f"请输入要测试的条数 (1-{len(keyword_dept_pairs)}): "))
if 1 <= test_count <= len(keyword_dept_pairs):
keyword_dept_pairs = keyword_dept_pairs[:test_count]
print(f"已选择导入前 {test_count} 条数据进行测试")
break
else:
print(f"输入超出范围请输入1到{len(keyword_dept_pairs)}之间的数字")
except ValueError:
print("请输入有效的数字")
else:
print("请输入 A 或 B")
if not keyword_dept_pairs:
logger.warning("没有可导入的关键词,程序退出")
return
# 打印前10个关键词-部门组合作为预览
logger.info(f"\n关键词-部门预览前10个:")
for i, (keyword, department) in enumerate(keyword_dept_pairs[:10], 1):
logger.info(f" {i}. {keyword} (部门: {department})")
if len(keyword_dept_pairs) > 10:
logger.info(f" ... 还有 {len(keyword_dept_pairs) - 10} 个关键词-部门组合")
# 2. 确认导入
print("\n" + "=" * 60)
print(f"即将导入 {len(keyword_dept_pairs)} 个关键词-部门组合到 baidu_keyword 表")
print(f"配置: seed_id={args.seed_id}, seed_name='{args.seed_name}', crawled=1")
confirm = input("确认导入? (y/n): ").strip().lower()
if confirm != 'y':
logger.info("用户取消导入")
return
# 3. 执行导入
success_count = import_keywords_to_db(
db_manager=db_manager,
keyword_dept_pairs=keyword_dept_pairs,
seed_id=args.seed_id,
seed_name=args.seed_name,
crawled=1,
batch_size=args.batch_size,
sleep_seconds=args.sleep
)
logger.info("=" * 60)
logger.info(f"✓ 导入完成!共成功导入 {success_count} 个关键词")
logger.info("=" * 60)
# 运行完成后自动删除 query_upload 文件夹中的所有文件(仅在成功时)
try:
upload_folder = 'query_upload'
for filename in os.listdir(upload_folder):
file_path = os.path.join(upload_folder, filename)
if os.path.isfile(file_path):
os.remove(file_path)
logger.info(f'已删除文件: {filename}')
logger.info('已清理 query_upload 文件夹中的所有文件')
except Exception as e:
logger.error(f'清理 query_upload 文件夹时出错: {e}')
except Exception as e:
logger.error(f"✗ 导入过程出错: {e}", exc_info=True)
logger.info("=" * 60)
logger.info("✗ 导入失败")
logger.info("=" * 60)
# 导入失败时不删除源文件,以便排查问题
if __name__ == '__main__':
main()