""" Query关键词导入模块 从Excel文件读取关键词,批量导入到baidu_keyword表 """ import os import glob import time import pandas as pd from datetime import datetime from loguru import logger from config import Config from db_manager import QueryKeywordManager, QueryImportLogManager class QueryKeywordImporter: """Query关键词导入器""" # 支持的query列名 QUERY_COLUMNS = ['query', 'Query', '查询词', 'keyword', '关键词'] # 支持的科室列名 DEPT_COLUMNS = ['科室', 'department', 'Department', '部门'] # 批量插入大小 BATCH_SIZE = 500 def __init__(self): self.keyword_mgr = QueryKeywordManager() self.log_mgr = QueryImportLogManager() def _detect_column(self, df_columns, candidates): """从DataFrame列中检测匹配的列名""" for col in candidates: if col in df_columns: return col return None def import_file(self, filepath: str, log_id: int = None, import_mode: str = 'query_only') -> dict: """ 导入单个Excel文件到baidu_keyword表(批量模式) Args: filepath: Excel文件路径 log_id: 导入日志ID(可选,如果已创建) import_mode: 导入模式 - 'query_only': 仅导入query列,query_status=draft - 'full_import': 三列(科室/关键字/query),query_status=manual_review Returns: {'success': bool, 'stats': {...}, 'error': str} """ filename = os.path.basename(filepath) stats = {'total': 0, 'success': 0, 'skip': 0, 'fail': 0} # 根据模式确定 query_status query_status = 'draft' if import_mode == 'query_only' else 'manual_review' # 创建或获取日志记录 if log_id is None: log_id = self.log_mgr.create_log(filename, filepath) try: # 更新状态为运行中 if log_id: self.log_mgr.update_status(log_id, 'running') logger.info(f"[Query导入] 开始导入文件: {filename}, 模式: {import_mode}, query_status: {query_status}") # 1. 读取Excel if not os.path.exists(filepath): raise FileNotFoundError(f"文件不存在: {filepath}") df = pd.read_excel(filepath) logger.info(f"[Query导入] 文件 {filename} 包含 {len(df)} 行, 列名: {df.columns.tolist()}") # 2. 根据模式确定列映射 if import_mode == 'query_only': # 模式1:仅一列query query_col = df.columns[0] dept_col = None keyword_col = None logger.info(f"[Query导入] 仅导入Query模式,使用列: {query_col}") else: # 模式2:三列 - 科室/关键字/query(按位置) dept_col = df.columns[0] keyword_col = df.columns[1] query_col = df.columns[2] logger.info(f"[Query导入] 完整导入模式,科室列: {dept_col}, 关键字列: {keyword_col}, query列: {query_col}") # 3. 预处理数据:提取所有有效关键词 keyword_list = [] for idx, row in df.iterrows(): query_val = str(row[query_col]).strip() if pd.notna(row[query_col]) else '' if not query_val or query_val == 'nan': stats['fail'] += 1 continue item = {'keyword': query_val, 'department': ''} if import_mode == 'full_import': # 提取科室 if dept_col and pd.notna(row[dept_col]): dept_val = str(row[dept_col]).strip() if dept_val != 'nan': item['department'] = dept_val keyword_list.append(item) stats['total'] = len(df) total_valid = len(keyword_list) logger.info(f"[Query导入] 有效关键词: {total_valid} / {stats['total']}") # 5. 分批插入 batch_count = (total_valid + self.BATCH_SIZE - 1) // self.BATCH_SIZE processed = 0 for batch_idx in range(batch_count): start = batch_idx * self.BATCH_SIZE end = min(start + self.BATCH_SIZE, total_valid) batch = keyword_list[start:end] batch_stats = self.keyword_mgr.batch_insert_keywords(batch, query_status=query_status) stats['success'] += batch_stats['success'] stats['skip'] += batch_stats['skip'] stats['fail'] += batch_stats['fail'] processed += len(batch) progress = (processed / total_valid) * 100 if total_valid > 0 else 100 logger.info( f"[Query导入] [{filename}] 批次 {batch_idx+1}/{batch_count} | " f"进度: {processed}/{total_valid} ({progress:.1f}%) | " f"成功: {stats['success']} | 跳过: {stats['skip']} | 失败: {stats['fail']}" ) # 6. 更新日志 if log_id: self.log_mgr.update_status( log_id, 'completed', total_count=stats['total'], success_count=stats['success'], skip_count=stats['skip'], fail_count=stats['fail'] ) logger.info( f"[Query导入] 文件 {filename} 导入完成 | " f"总数: {stats['total']} | 成功: {stats['success']} | " f"跳过: {stats['skip']} | 失败: {stats['fail']}" ) return {'success': True, 'stats': stats} except Exception as e: error_msg = str(e) logger.error(f"[Query导入] 文件 {filename} 导入失败: {error_msg}") if log_id: self.log_mgr.update_status( log_id, 'failed', total_count=stats['total'], success_count=stats['success'], skip_count=stats['skip'], fail_count=stats['fail'], error_message=error_msg ) return {'success': False, 'stats': stats, 'error': error_msg} def scan_and_import(self): """ 扫描上传目录,导入待处理文件 1. 处理数据库中 status='pending' 的记录 2. 扫描目录中未登记的Excel文件并导入 """ upload_dir = Config.QUERY_UPLOAD_DIR if not os.path.exists(upload_dir): os.makedirs(upload_dir, exist_ok=True) return # 1. 处理数据库中 pending 状态的记录 pending_logs = self.log_mgr.get_pending_logs() if pending_logs: logger.info(f"[Query扫描] 发现 {len(pending_logs)} 个待处理导入任务") for log in pending_logs: filepath = log['filepath'] if os.path.exists(filepath): logger.info(f"[Query扫描] 处理待导入文件: {log['filename']}") self.import_file(filepath, log_id=log['id']) else: logger.warning(f"[Query扫描] 文件不存在,标记为失败: {filepath}") self.log_mgr.update_status(log['id'], 'failed', error_message='文件不存在') # 2. 扫描目录中未登记的文件 excel_files = [] for pattern in ['*.xlsx', '*.xls']: excel_files.extend(glob.glob(os.path.join(upload_dir, pattern))) for filepath in excel_files: filepath = os.path.abspath(filepath) if not self.log_mgr.is_file_logged(filepath): filename = os.path.basename(filepath) logger.info(f"[Query扫描] 发现未登记文件: {filename}") log_id = self.log_mgr.create_log(filename, filepath) self.import_file(filepath, log_id=log_id)