Files
ai_mip/query_keyword_importer.py

205 lines
8.3 KiB
Python
Raw Permalink Normal View History

2026-02-24 12:46:35 +08:00
"""
Query关键词导入模块
从Excel文件读取关键词批量导入到baidu_keyword表
"""
import os
import glob
import time
import pandas as pd
from datetime import datetime
from loguru import logger
from config import Config
from db_manager import QueryKeywordManager, QueryImportLogManager
class QueryKeywordImporter:
"""Query关键词导入器"""
# 支持的query列名
QUERY_COLUMNS = ['query', 'Query', '查询词', 'keyword', '关键词']
# 支持的科室列名
DEPT_COLUMNS = ['科室', 'department', 'Department', '部门']
# 批量插入大小
BATCH_SIZE = 500
def __init__(self):
self.keyword_mgr = QueryKeywordManager()
self.log_mgr = QueryImportLogManager()
def _detect_column(self, df_columns, candidates):
"""从DataFrame列中检测匹配的列名"""
for col in candidates:
if col in df_columns:
return col
return None
def import_file(self, filepath: str, log_id: int = None, import_mode: str = 'query_only') -> dict:
"""
导入单个Excel文件到baidu_keyword表批量模式
Args:
filepath: Excel文件路径
log_id: 导入日志ID可选如果已创建
import_mode: 导入模式
- 'query_only': 仅导入query列query_status=draft
- 'full_import': 三列科室/关键字/queryquery_status=manual_review
Returns:
{'success': bool, 'stats': {...}, 'error': str}
"""
filename = os.path.basename(filepath)
stats = {'total': 0, 'success': 0, 'skip': 0, 'fail': 0}
# 根据模式确定 query_status
query_status = 'draft' if import_mode == 'query_only' else 'manual_review'
# 创建或获取日志记录
if log_id is None:
log_id = self.log_mgr.create_log(filename, filepath)
try:
# 更新状态为运行中
if log_id:
self.log_mgr.update_status(log_id, 'running')
logger.info(f"[Query导入] 开始导入文件: {filename}, 模式: {import_mode}, query_status: {query_status}")
# 1. 读取Excel
if not os.path.exists(filepath):
raise FileNotFoundError(f"文件不存在: {filepath}")
df = pd.read_excel(filepath)
logger.info(f"[Query导入] 文件 {filename} 包含 {len(df)} 行, 列名: {df.columns.tolist()}")
# 2. 根据模式确定列映射
if import_mode == 'query_only':
# 模式1仅一列query
query_col = df.columns[0]
dept_col = None
keyword_col = None
logger.info(f"[Query导入] 仅导入Query模式使用列: {query_col}")
else:
# 模式2三列 - 科室/关键字/query按位置
dept_col = df.columns[0]
keyword_col = df.columns[1]
query_col = df.columns[2]
logger.info(f"[Query导入] 完整导入模式,科室列: {dept_col}, 关键字列: {keyword_col}, query列: {query_col}")
# 3. 预处理数据:提取所有有效关键词
keyword_list = []
for idx, row in df.iterrows():
query_val = str(row[query_col]).strip() if pd.notna(row[query_col]) else ''
if not query_val or query_val == 'nan':
stats['fail'] += 1
continue
item = {'keyword': query_val, 'department': ''}
if import_mode == 'full_import':
# 提取科室
if dept_col and pd.notna(row[dept_col]):
dept_val = str(row[dept_col]).strip()
if dept_val != 'nan':
item['department'] = dept_val
keyword_list.append(item)
stats['total'] = len(df)
total_valid = len(keyword_list)
logger.info(f"[Query导入] 有效关键词: {total_valid} / {stats['total']}")
# 5. 分批插入
batch_count = (total_valid + self.BATCH_SIZE - 1) // self.BATCH_SIZE
processed = 0
for batch_idx in range(batch_count):
start = batch_idx * self.BATCH_SIZE
end = min(start + self.BATCH_SIZE, total_valid)
batch = keyword_list[start:end]
batch_stats = self.keyword_mgr.batch_insert_keywords(batch, query_status=query_status)
stats['success'] += batch_stats['success']
stats['skip'] += batch_stats['skip']
stats['fail'] += batch_stats['fail']
processed += len(batch)
progress = (processed / total_valid) * 100 if total_valid > 0 else 100
logger.info(
f"[Query导入] [{filename}] 批次 {batch_idx+1}/{batch_count} | "
f"进度: {processed}/{total_valid} ({progress:.1f}%) | "
f"成功: {stats['success']} | 跳过: {stats['skip']} | 失败: {stats['fail']}"
)
# 6. 更新日志
if log_id:
self.log_mgr.update_status(
log_id, 'completed',
total_count=stats['total'],
success_count=stats['success'],
skip_count=stats['skip'],
fail_count=stats['fail']
)
logger.info(
f"[Query导入] 文件 {filename} 导入完成 | "
f"总数: {stats['total']} | 成功: {stats['success']} | "
f"跳过: {stats['skip']} | 失败: {stats['fail']}"
)
return {'success': True, 'stats': stats}
except Exception as e:
error_msg = str(e)
logger.error(f"[Query导入] 文件 {filename} 导入失败: {error_msg}")
if log_id:
self.log_mgr.update_status(
log_id, 'failed',
total_count=stats['total'],
success_count=stats['success'],
skip_count=stats['skip'],
fail_count=stats['fail'],
error_message=error_msg
)
return {'success': False, 'stats': stats, 'error': error_msg}
def scan_and_import(self):
"""
扫描上传目录导入待处理文件
1. 处理数据库中 status='pending' 的记录
2. 扫描目录中未登记的Excel文件并导入
"""
upload_dir = Config.QUERY_UPLOAD_DIR
if not os.path.exists(upload_dir):
os.makedirs(upload_dir, exist_ok=True)
return
# 1. 处理数据库中 pending 状态的记录
pending_logs = self.log_mgr.get_pending_logs()
if pending_logs:
logger.info(f"[Query扫描] 发现 {len(pending_logs)} 个待处理导入任务")
for log in pending_logs:
filepath = log['filepath']
if os.path.exists(filepath):
logger.info(f"[Query扫描] 处理待导入文件: {log['filename']}")
self.import_file(filepath, log_id=log['id'])
else:
logger.warning(f"[Query扫描] 文件不存在,标记为失败: {filepath}")
self.log_mgr.update_status(log['id'], 'failed', error_message='文件不存在')
# 2. 扫描目录中未登记的文件
excel_files = []
for pattern in ['*.xlsx', '*.xls']:
excel_files.extend(glob.glob(os.path.join(upload_dir, pattern)))
for filepath in excel_files:
filepath = os.path.abspath(filepath)
if not self.log_mgr.is_file_logged(filepath):
filename = os.path.basename(filepath)
logger.info(f"[Query扫描] 发现未登记文件: {filename}")
log_id = self.log_mgr.create_log(filename, filepath)
self.import_file(filepath, log_id=log_id)