205 lines
8.3 KiB
Python
205 lines
8.3 KiB
Python
|
|
"""
|
|||
|
|
Query关键词导入模块
|
|||
|
|
从Excel文件读取关键词,批量导入到baidu_keyword表
|
|||
|
|
"""
|
|||
|
|
import os
|
|||
|
|
import glob
|
|||
|
|
import time
|
|||
|
|
import pandas as pd
|
|||
|
|
from datetime import datetime
|
|||
|
|
from loguru import logger
|
|||
|
|
from config import Config
|
|||
|
|
from db_manager import QueryKeywordManager, QueryImportLogManager
|
|||
|
|
|
|||
|
|
|
|||
|
|
class QueryKeywordImporter:
|
|||
|
|
"""Query关键词导入器"""
|
|||
|
|
|
|||
|
|
# 支持的query列名
|
|||
|
|
QUERY_COLUMNS = ['query', 'Query', '查询词', 'keyword', '关键词']
|
|||
|
|
# 支持的科室列名
|
|||
|
|
DEPT_COLUMNS = ['科室', 'department', 'Department', '部门']
|
|||
|
|
# 批量插入大小
|
|||
|
|
BATCH_SIZE = 500
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
self.keyword_mgr = QueryKeywordManager()
|
|||
|
|
self.log_mgr = QueryImportLogManager()
|
|||
|
|
|
|||
|
|
def _detect_column(self, df_columns, candidates):
|
|||
|
|
"""从DataFrame列中检测匹配的列名"""
|
|||
|
|
for col in candidates:
|
|||
|
|
if col in df_columns:
|
|||
|
|
return col
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def import_file(self, filepath: str, log_id: int = None, import_mode: str = 'query_only') -> dict:
|
|||
|
|
"""
|
|||
|
|
导入单个Excel文件到baidu_keyword表(批量模式)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
filepath: Excel文件路径
|
|||
|
|
log_id: 导入日志ID(可选,如果已创建)
|
|||
|
|
import_mode: 导入模式
|
|||
|
|
- 'query_only': 仅导入query列,query_status=draft
|
|||
|
|
- 'full_import': 三列(科室/关键字/query),query_status=manual_review
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
{'success': bool, 'stats': {...}, 'error': str}
|
|||
|
|
"""
|
|||
|
|
filename = os.path.basename(filepath)
|
|||
|
|
stats = {'total': 0, 'success': 0, 'skip': 0, 'fail': 0}
|
|||
|
|
|
|||
|
|
# 根据模式确定 query_status
|
|||
|
|
query_status = 'draft' if import_mode == 'query_only' else 'manual_review'
|
|||
|
|
|
|||
|
|
# 创建或获取日志记录
|
|||
|
|
if log_id is None:
|
|||
|
|
log_id = self.log_mgr.create_log(filename, filepath)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 更新状态为运行中
|
|||
|
|
if log_id:
|
|||
|
|
self.log_mgr.update_status(log_id, 'running')
|
|||
|
|
|
|||
|
|
logger.info(f"[Query导入] 开始导入文件: {filename}, 模式: {import_mode}, query_status: {query_status}")
|
|||
|
|
|
|||
|
|
# 1. 读取Excel
|
|||
|
|
if not os.path.exists(filepath):
|
|||
|
|
raise FileNotFoundError(f"文件不存在: {filepath}")
|
|||
|
|
|
|||
|
|
df = pd.read_excel(filepath)
|
|||
|
|
logger.info(f"[Query导入] 文件 {filename} 包含 {len(df)} 行, 列名: {df.columns.tolist()}")
|
|||
|
|
|
|||
|
|
# 2. 根据模式确定列映射
|
|||
|
|
if import_mode == 'query_only':
|
|||
|
|
# 模式1:仅一列query
|
|||
|
|
query_col = df.columns[0]
|
|||
|
|
dept_col = None
|
|||
|
|
keyword_col = None
|
|||
|
|
logger.info(f"[Query导入] 仅导入Query模式,使用列: {query_col}")
|
|||
|
|
else:
|
|||
|
|
# 模式2:三列 - 科室/关键字/query(按位置)
|
|||
|
|
dept_col = df.columns[0]
|
|||
|
|
keyword_col = df.columns[1]
|
|||
|
|
query_col = df.columns[2]
|
|||
|
|
logger.info(f"[Query导入] 完整导入模式,科室列: {dept_col}, 关键字列: {keyword_col}, query列: {query_col}")
|
|||
|
|
|
|||
|
|
# 3. 预处理数据:提取所有有效关键词
|
|||
|
|
keyword_list = []
|
|||
|
|
for idx, row in df.iterrows():
|
|||
|
|
query_val = str(row[query_col]).strip() if pd.notna(row[query_col]) else ''
|
|||
|
|
if not query_val or query_val == 'nan':
|
|||
|
|
stats['fail'] += 1
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
item = {'keyword': query_val, 'department': ''}
|
|||
|
|
|
|||
|
|
if import_mode == 'full_import':
|
|||
|
|
# 提取科室
|
|||
|
|
if dept_col and pd.notna(row[dept_col]):
|
|||
|
|
dept_val = str(row[dept_col]).strip()
|
|||
|
|
if dept_val != 'nan':
|
|||
|
|
item['department'] = dept_val
|
|||
|
|
|
|||
|
|
keyword_list.append(item)
|
|||
|
|
|
|||
|
|
stats['total'] = len(df)
|
|||
|
|
total_valid = len(keyword_list)
|
|||
|
|
logger.info(f"[Query导入] 有效关键词: {total_valid} / {stats['total']}")
|
|||
|
|
|
|||
|
|
# 5. 分批插入
|
|||
|
|
batch_count = (total_valid + self.BATCH_SIZE - 1) // self.BATCH_SIZE
|
|||
|
|
processed = 0
|
|||
|
|
|
|||
|
|
for batch_idx in range(batch_count):
|
|||
|
|
start = batch_idx * self.BATCH_SIZE
|
|||
|
|
end = min(start + self.BATCH_SIZE, total_valid)
|
|||
|
|
batch = keyword_list[start:end]
|
|||
|
|
|
|||
|
|
batch_stats = self.keyword_mgr.batch_insert_keywords(batch, query_status=query_status)
|
|||
|
|
stats['success'] += batch_stats['success']
|
|||
|
|
stats['skip'] += batch_stats['skip']
|
|||
|
|
stats['fail'] += batch_stats['fail']
|
|||
|
|
processed += len(batch)
|
|||
|
|
|
|||
|
|
progress = (processed / total_valid) * 100 if total_valid > 0 else 100
|
|||
|
|
logger.info(
|
|||
|
|
f"[Query导入] [{filename}] 批次 {batch_idx+1}/{batch_count} | "
|
|||
|
|
f"进度: {processed}/{total_valid} ({progress:.1f}%) | "
|
|||
|
|
f"成功: {stats['success']} | 跳过: {stats['skip']} | 失败: {stats['fail']}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 6. 更新日志
|
|||
|
|
if log_id:
|
|||
|
|
self.log_mgr.update_status(
|
|||
|
|
log_id, 'completed',
|
|||
|
|
total_count=stats['total'],
|
|||
|
|
success_count=stats['success'],
|
|||
|
|
skip_count=stats['skip'],
|
|||
|
|
fail_count=stats['fail']
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
logger.info(
|
|||
|
|
f"[Query导入] 文件 {filename} 导入完成 | "
|
|||
|
|
f"总数: {stats['total']} | 成功: {stats['success']} | "
|
|||
|
|
f"跳过: {stats['skip']} | 失败: {stats['fail']}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return {'success': True, 'stats': stats}
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
error_msg = str(e)
|
|||
|
|
logger.error(f"[Query导入] 文件 {filename} 导入失败: {error_msg}")
|
|||
|
|
|
|||
|
|
if log_id:
|
|||
|
|
self.log_mgr.update_status(
|
|||
|
|
log_id, 'failed',
|
|||
|
|
total_count=stats['total'],
|
|||
|
|
success_count=stats['success'],
|
|||
|
|
skip_count=stats['skip'],
|
|||
|
|
fail_count=stats['fail'],
|
|||
|
|
error_message=error_msg
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return {'success': False, 'stats': stats, 'error': error_msg}
|
|||
|
|
|
|||
|
|
def scan_and_import(self):
|
|||
|
|
"""
|
|||
|
|
扫描上传目录,导入待处理文件
|
|||
|
|
|
|||
|
|
1. 处理数据库中 status='pending' 的记录
|
|||
|
|
2. 扫描目录中未登记的Excel文件并导入
|
|||
|
|
"""
|
|||
|
|
upload_dir = Config.QUERY_UPLOAD_DIR
|
|||
|
|
|
|||
|
|
if not os.path.exists(upload_dir):
|
|||
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 1. 处理数据库中 pending 状态的记录
|
|||
|
|
pending_logs = self.log_mgr.get_pending_logs()
|
|||
|
|
if pending_logs:
|
|||
|
|
logger.info(f"[Query扫描] 发现 {len(pending_logs)} 个待处理导入任务")
|
|||
|
|
for log in pending_logs:
|
|||
|
|
filepath = log['filepath']
|
|||
|
|
if os.path.exists(filepath):
|
|||
|
|
logger.info(f"[Query扫描] 处理待导入文件: {log['filename']}")
|
|||
|
|
self.import_file(filepath, log_id=log['id'])
|
|||
|
|
else:
|
|||
|
|
logger.warning(f"[Query扫描] 文件不存在,标记为失败: {filepath}")
|
|||
|
|
self.log_mgr.update_status(log['id'], 'failed', error_message='文件不存在')
|
|||
|
|
|
|||
|
|
# 2. 扫描目录中未登记的文件
|
|||
|
|
excel_files = []
|
|||
|
|
for pattern in ['*.xlsx', '*.xls']:
|
|||
|
|
excel_files.extend(glob.glob(os.path.join(upload_dir, pattern)))
|
|||
|
|
|
|||
|
|
for filepath in excel_files:
|
|||
|
|
filepath = os.path.abspath(filepath)
|
|||
|
|
if not self.log_mgr.is_file_logged(filepath):
|
|||
|
|
filename = os.path.basename(filepath)
|
|||
|
|
logger.info(f"[Query扫描] 发现未登记文件: {filename}")
|
|||
|
|
log_id = self.log_mgr.create_log(filename, filepath)
|
|||
|
|
self.import_file(filepath, log_id=log_id)
|