Files
ai_mip/query_keyword_importer.py
2026-02-24 12:46:35 +08:00

205 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Query关键词导入模块
从Excel文件读取关键词批量导入到baidu_keyword表
"""
import os
import glob
import time
import pandas as pd
from datetime import datetime
from loguru import logger
from config import Config
from db_manager import QueryKeywordManager, QueryImportLogManager
class QueryKeywordImporter:
"""Query关键词导入器"""
# 支持的query列名
QUERY_COLUMNS = ['query', 'Query', '查询词', 'keyword', '关键词']
# 支持的科室列名
DEPT_COLUMNS = ['科室', 'department', 'Department', '部门']
# 批量插入大小
BATCH_SIZE = 500
def __init__(self):
self.keyword_mgr = QueryKeywordManager()
self.log_mgr = QueryImportLogManager()
def _detect_column(self, df_columns, candidates):
"""从DataFrame列中检测匹配的列名"""
for col in candidates:
if col in df_columns:
return col
return None
def import_file(self, filepath: str, log_id: int = None, import_mode: str = 'query_only') -> dict:
"""
导入单个Excel文件到baidu_keyword表批量模式
Args:
filepath: Excel文件路径
log_id: 导入日志ID可选如果已创建
import_mode: 导入模式
- 'query_only': 仅导入query列query_status=draft
- 'full_import': 三列(科室/关键字/queryquery_status=manual_review
Returns:
{'success': bool, 'stats': {...}, 'error': str}
"""
filename = os.path.basename(filepath)
stats = {'total': 0, 'success': 0, 'skip': 0, 'fail': 0}
# 根据模式确定 query_status
query_status = 'draft' if import_mode == 'query_only' else 'manual_review'
# 创建或获取日志记录
if log_id is None:
log_id = self.log_mgr.create_log(filename, filepath)
try:
# 更新状态为运行中
if log_id:
self.log_mgr.update_status(log_id, 'running')
logger.info(f"[Query导入] 开始导入文件: {filename}, 模式: {import_mode}, query_status: {query_status}")
# 1. 读取Excel
if not os.path.exists(filepath):
raise FileNotFoundError(f"文件不存在: {filepath}")
df = pd.read_excel(filepath)
logger.info(f"[Query导入] 文件 {filename} 包含 {len(df)} 行, 列名: {df.columns.tolist()}")
# 2. 根据模式确定列映射
if import_mode == 'query_only':
# 模式1仅一列query
query_col = df.columns[0]
dept_col = None
keyword_col = None
logger.info(f"[Query导入] 仅导入Query模式使用列: {query_col}")
else:
# 模式2三列 - 科室/关键字/query按位置
dept_col = df.columns[0]
keyword_col = df.columns[1]
query_col = df.columns[2]
logger.info(f"[Query导入] 完整导入模式,科室列: {dept_col}, 关键字列: {keyword_col}, query列: {query_col}")
# 3. 预处理数据:提取所有有效关键词
keyword_list = []
for idx, row in df.iterrows():
query_val = str(row[query_col]).strip() if pd.notna(row[query_col]) else ''
if not query_val or query_val == 'nan':
stats['fail'] += 1
continue
item = {'keyword': query_val, 'department': ''}
if import_mode == 'full_import':
# 提取科室
if dept_col and pd.notna(row[dept_col]):
dept_val = str(row[dept_col]).strip()
if dept_val != 'nan':
item['department'] = dept_val
keyword_list.append(item)
stats['total'] = len(df)
total_valid = len(keyword_list)
logger.info(f"[Query导入] 有效关键词: {total_valid} / {stats['total']}")
# 5. 分批插入
batch_count = (total_valid + self.BATCH_SIZE - 1) // self.BATCH_SIZE
processed = 0
for batch_idx in range(batch_count):
start = batch_idx * self.BATCH_SIZE
end = min(start + self.BATCH_SIZE, total_valid)
batch = keyword_list[start:end]
batch_stats = self.keyword_mgr.batch_insert_keywords(batch, query_status=query_status)
stats['success'] += batch_stats['success']
stats['skip'] += batch_stats['skip']
stats['fail'] += batch_stats['fail']
processed += len(batch)
progress = (processed / total_valid) * 100 if total_valid > 0 else 100
logger.info(
f"[Query导入] [{filename}] 批次 {batch_idx+1}/{batch_count} | "
f"进度: {processed}/{total_valid} ({progress:.1f}%) | "
f"成功: {stats['success']} | 跳过: {stats['skip']} | 失败: {stats['fail']}"
)
# 6. 更新日志
if log_id:
self.log_mgr.update_status(
log_id, 'completed',
total_count=stats['total'],
success_count=stats['success'],
skip_count=stats['skip'],
fail_count=stats['fail']
)
logger.info(
f"[Query导入] 文件 {filename} 导入完成 | "
f"总数: {stats['total']} | 成功: {stats['success']} | "
f"跳过: {stats['skip']} | 失败: {stats['fail']}"
)
return {'success': True, 'stats': stats}
except Exception as e:
error_msg = str(e)
logger.error(f"[Query导入] 文件 {filename} 导入失败: {error_msg}")
if log_id:
self.log_mgr.update_status(
log_id, 'failed',
total_count=stats['total'],
success_count=stats['success'],
skip_count=stats['skip'],
fail_count=stats['fail'],
error_message=error_msg
)
return {'success': False, 'stats': stats, 'error': error_msg}
def scan_and_import(self):
"""
扫描上传目录,导入待处理文件
1. 处理数据库中 status='pending' 的记录
2. 扫描目录中未登记的Excel文件并导入
"""
upload_dir = Config.QUERY_UPLOAD_DIR
if not os.path.exists(upload_dir):
os.makedirs(upload_dir, exist_ok=True)
return
# 1. 处理数据库中 pending 状态的记录
pending_logs = self.log_mgr.get_pending_logs()
if pending_logs:
logger.info(f"[Query扫描] 发现 {len(pending_logs)} 个待处理导入任务")
for log in pending_logs:
filepath = log['filepath']
if os.path.exists(filepath):
logger.info(f"[Query扫描] 处理待导入文件: {log['filename']}")
self.import_file(filepath, log_id=log['id'])
else:
logger.warning(f"[Query扫描] 文件不存在,标记为失败: {filepath}")
self.log_mgr.update_status(log['id'], 'failed', error_message='文件不存在')
# 2. 扫描目录中未登记的文件
excel_files = []
for pattern in ['*.xlsx', '*.xls']:
excel_files.extend(glob.glob(os.path.join(upload_dir, pattern)))
for filepath in excel_files:
filepath = os.path.abspath(filepath)
if not self.log_mgr.is_file_logged(filepath):
filename = os.path.basename(filepath)
logger.info(f"[Query扫描] 发现未登记文件: {filename}")
log_id = self.log_mgr.create_log(filename, filepath)
self.import_file(filepath, log_id=log_id)