363 lines
14 KiB
Python
363 lines
14 KiB
Python
"""
|
||
定频轮询脚本:从Excel文件导入关键词到baidu_keyword表
|
||
"""
|
||
import pandas as pd
|
||
import logging
|
||
import os
|
||
import time
|
||
import glob
|
||
from database_config import DatabaseManager, DB_CONFIG
|
||
from datetime import datetime
|
||
|
||
# 配置
|
||
POLL_INTERVAL = 60 # 轮询间隔(秒)
|
||
UPLOAD_FOLDER = 'query_upload' # Excel文件目录
|
||
SEED_ID = 9999 # 固定值
|
||
SEED_NAME = '手动提交' # 固定值
|
||
CRAWLED = 1 # 固定值
|
||
QUERY_COLUMN = 'query' # Excel中query列名
|
||
DEPT_COLUMN = '科室' # Excel中科室列名
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def read_excel_keywords_with_department(excel_path, query_column='query', department_column='科室'):
|
||
"""
|
||
读取Excel文件中的关键词和部门信息
|
||
|
||
Args:
|
||
excel_path: Excel文件路径
|
||
query_column: query列名,默认为'query'
|
||
department_column: 部门列名,默认为'科室',如果为None则不读取部门信息
|
||
|
||
Returns:
|
||
包含(keyword, department)元组的列表,如果没有部门列则department为None
|
||
"""
|
||
try:
|
||
# 读取Excel文件
|
||
df = pd.read_excel(excel_path)
|
||
logger.info(f"成功读取Excel文件: {excel_path}")
|
||
logger.info(f"Excel文件包含 {len(df)} 行数据")
|
||
logger.info(f"Excel列名: {df.columns.tolist()}")
|
||
|
||
# 检查query列是否存在
|
||
if query_column not in df.columns:
|
||
logger.error(f"未找到query列: {query_column}")
|
||
return []
|
||
|
||
# 检查是否有部门列
|
||
has_department = department_column and department_column in df.columns
|
||
if department_column and not has_department:
|
||
logger.warning(f"未找到department列: {department_column},将不使用部门信息")
|
||
|
||
# 获取query数据
|
||
query_data = df[query_column].dropna()
|
||
query_list = query_data.tolist()
|
||
|
||
# 根据是否有部门列,组合数据
|
||
keyword_dept_pairs = []
|
||
if has_department:
|
||
# 有部门列,获取部门数据
|
||
department_data = df[department_column].dropna()
|
||
# 对齐数据长度,取最短长度
|
||
min_length = min(len(query_data), len(department_data))
|
||
query_list = query_data.iloc[:min_length].tolist()
|
||
department_list = department_data.iloc[:min_length].tolist()
|
||
|
||
for i in range(min_length):
|
||
keyword = str(query_list[i]).strip()
|
||
department = str(department_list[i]).strip()
|
||
if keyword and department: # 确保关键词和部门都不为空
|
||
keyword_dept_pairs.append((keyword, department))
|
||
else:
|
||
# 没有部门列,只提取关键词
|
||
logger.info("没有部门列,将只导入关键词,不指定科室")
|
||
for keyword in query_list:
|
||
keyword = str(keyword).strip()
|
||
if keyword: # 确保关键词不为空
|
||
keyword_dept_pairs.append((keyword, None))
|
||
|
||
# 去除重复项,保留第一个出现的组合
|
||
seen = set()
|
||
unique_keyword_dept_pairs = []
|
||
for keyword, dept in keyword_dept_pairs:
|
||
if (keyword, dept) not in seen:
|
||
seen.add((keyword, dept))
|
||
unique_keyword_dept_pairs.append((keyword, dept))
|
||
|
||
if has_department:
|
||
logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词-部门组合")
|
||
else:
|
||
logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词")
|
||
|
||
return unique_keyword_dept_pairs
|
||
|
||
except Exception as e:
|
||
logger.error(f"读取Excel文件失败: {e}", exc_info=True)
|
||
raise
|
||
|
||
|
||
def get_department_id(db_manager, department_name):
|
||
"""
|
||
根据科室名称从ai_departments表中获取对应的ID
|
||
|
||
Args:
|
||
db_manager: 数据库管理器实例
|
||
department_name: 科室名称
|
||
|
||
Returns:
|
||
科室ID,如果未找到则抛出异常
|
||
"""
|
||
try:
|
||
# 查询科室ID - 使用正确的字段名
|
||
sql = "SELECT id FROM ai_departments WHERE department_name = %s"
|
||
result = db_manager.execute_query(sql, (department_name,), fetch_one=True)
|
||
|
||
if result:
|
||
return result[0] # 返回ID
|
||
else:
|
||
error_msg = f"未找到科室 '{department_name}' 的ID,请先在ai_departments表中添加该科室"
|
||
logger.error(error_msg)
|
||
raise ValueError(error_msg)
|
||
|
||
except Exception as e:
|
||
logger.error(f"查询科室ID失败: {e}", exc_info=True)
|
||
raise
|
||
|
||
|
||
def get_author_info_by_department(db_manager, department_id):
|
||
"""
|
||
根据科室ID从ai_authors表中随机获取一个符合条件的作者信息
|
||
|
||
Args:
|
||
db_manager: 数据库管理器实例
|
||
department_id: 科室ID
|
||
|
||
Returns:
|
||
(author_id, author_name) 元组,如果未找到则返回 (0, '')
|
||
"""
|
||
try:
|
||
# 随机获取该科室下的一个活跃作者
|
||
sql = "SELECT id, author_name FROM ai_authors WHERE department_id = %s AND status = 'active' AND daily_post_max > 0 ORDER BY RAND() LIMIT 1"
|
||
result = db_manager.execute_query(sql, (department_id,), fetch_one=True)
|
||
|
||
if result:
|
||
return result[0], result[1]
|
||
else:
|
||
logger.warning(f"未找到科室ID {department_id} 下符合条件的活跃作者")
|
||
return 0, ''
|
||
|
||
except Exception as e:
|
||
logger.error(f"查询作者信息失败: {e}")
|
||
return 0, ''
|
||
|
||
|
||
def import_keywords_to_db(db_manager, keyword_dept_pairs, seed_id=9999, seed_name='手动提交', crawled=1, batch_size=100, sleep_seconds=0.1):
|
||
"""
|
||
将关键词批量导入到baidu_keyword表
|
||
|
||
Args:
|
||
db_manager: 数据库管理器实例
|
||
keyword_dept_pairs: 包含(keyword, department)元组的列表
|
||
seed_id: 种子ID,默认9999
|
||
seed_name: 种子名称,默认'手动提交'
|
||
crawled: 是否已爬取,默认1
|
||
batch_size: 日志批次大小,每多少条记录输出一次进度
|
||
sleep_seconds: 每条记录间隔睡眠时间(秒),默认0.1秒
|
||
|
||
Returns:
|
||
成功导入的数量
|
||
"""
|
||
if not keyword_dept_pairs:
|
||
logger.warning("没有关键词需要导入")
|
||
return 0
|
||
|
||
try:
|
||
logger.info(f"开始导入 {len(keyword_dept_pairs)} 个关键词-部门组合到数据库...")
|
||
logger.info("采用逐条查询+插入模式,避免重复")
|
||
|
||
# 准备SQL语句
|
||
check_sql = "SELECT COUNT(*) FROM baidu_keyword WHERE keyword = %s"
|
||
insert_sql = """
|
||
INSERT INTO baidu_keyword (keyword, seed_id, seed_name, crawled, parents_id, created_at, department, department_id, query_status, author_id, author_name)
|
||
VALUES (%s, %s, %s, %s, 0, NOW(), %s, %s, %s, %s, %s)
|
||
"""
|
||
|
||
success_count = 0
|
||
skip_count = 0
|
||
failed_count = 0
|
||
|
||
# 逐条处理
|
||
for idx, (keyword, department) in enumerate(keyword_dept_pairs, 1):
|
||
try:
|
||
if department:
|
||
logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 部门: {department}')
|
||
else:
|
||
logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 无部门信息')
|
||
|
||
# 1. 如果有部门信息,获取科室ID和作者信息
|
||
if department:
|
||
dept_id = get_department_id(db_manager, department)
|
||
author_id, author_name = get_author_info_by_department(db_manager, dept_id)
|
||
else:
|
||
# 没有部门信息,使用默认值(空字符串而不是None,避免数据库NOT NULL约束)
|
||
department = ''
|
||
dept_id = 0
|
||
author_id = 0
|
||
author_name = ''
|
||
|
||
# 2. 查询关键词是否存在
|
||
result = db_manager.execute_query(check_sql, (keyword,), fetch_one=True)
|
||
exists = result[0] > 0 if result else False
|
||
|
||
if exists:
|
||
skip_count += 1
|
||
logger.debug(f'[调试] 关键词已存在,跳过: {keyword}')
|
||
else:
|
||
# 3. 不存在则插入
|
||
if department:
|
||
logger.debug(f'[调试] 准备插入: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}, query_status: manual_review')
|
||
else:
|
||
logger.debug(f'[调试] 准备插入: {keyword}, 无部门信息, query_status: manual_review')
|
||
|
||
affected = db_manager.execute_update(
|
||
insert_sql,
|
||
(keyword, seed_id, seed_name, crawled, department, dept_id, 'manual_review', author_id, author_name),
|
||
autocommit=True
|
||
)
|
||
|
||
if affected > 0:
|
||
success_count += 1
|
||
if department:
|
||
logger.debug(f'[调试] 插入成功: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}, query_status: manual_review')
|
||
else:
|
||
logger.debug(f'[调试] 插入成功: {keyword}, 无部门信息, query_status: manual_review')
|
||
|
||
# 5. 输出进度
|
||
if idx % batch_size == 0 or idx == len(keyword_dept_pairs):
|
||
progress = (idx / len(keyword_dept_pairs)) * 100
|
||
logger.info(f'[插入进度] {idx}/{len(keyword_dept_pairs)} ({progress:.1f}%) | 成功: {success_count} | 跳过: {skip_count} | 失败: {failed_count}')
|
||
|
||
# 6. 每次执行完sleep
|
||
time.sleep(sleep_seconds)
|
||
|
||
except ValueError as ve:
|
||
# 遇到科室不存在的错误,停止整个导入过程
|
||
logger.error(f'[错误] 第 {idx} 条记录遇到错误: {ve}')
|
||
raise ve
|
||
except Exception as e:
|
||
failed_count += 1
|
||
logger.warning(f'[调试] 处理失败 [{idx}/{len(keyword_dept_pairs)}]: keyword={keyword}, 部门={department},错误:{e}')
|
||
|
||
logger.info(f"导入完成!成功插入: {success_count} | 跳过已存在: {skip_count} | 失败: {failed_count}")
|
||
return success_count
|
||
|
||
except Exception as e:
|
||
logger.error(f"导入关键词失败: {e}", exc_info=True)
|
||
raise
|
||
|
||
|
||
def process_single_file(db_manager, excel_path):
|
||
"""
|
||
处理单个Excel文件
|
||
|
||
Args:
|
||
db_manager: 数据库管理器实例
|
||
excel_path: Excel文件路径
|
||
|
||
Returns:
|
||
成功导入的数量
|
||
"""
|
||
logger.info(f"开始处理文件: {os.path.basename(excel_path)}")
|
||
|
||
# 读取Excel文件
|
||
keyword_dept_pairs = read_excel_keywords_with_department(excel_path, QUERY_COLUMN, DEPT_COLUMN)
|
||
|
||
if not keyword_dept_pairs:
|
||
logger.warning(f"文件 {os.path.basename(excel_path)} 中没有可导入的数据")
|
||
return 0
|
||
|
||
# 执行导入
|
||
success_count = import_keywords_to_db(
|
||
db_manager=db_manager,
|
||
keyword_dept_pairs=keyword_dept_pairs,
|
||
seed_id=SEED_ID,
|
||
seed_name=SEED_NAME,
|
||
crawled=CRAWLED
|
||
)
|
||
|
||
return success_count
|
||
|
||
|
||
def poll_once(db_manager):
|
||
"""
|
||
执行一次轮询
|
||
|
||
Returns:
|
||
处理的文件数
|
||
"""
|
||
# 确保目录存在
|
||
if not os.path.exists(UPLOAD_FOLDER):
|
||
os.makedirs(UPLOAD_FOLDER)
|
||
logger.info(f"创建目录: {UPLOAD_FOLDER}")
|
||
return 0
|
||
|
||
# 查找Excel文件
|
||
excel_files = []
|
||
for pattern in ['*.xlsx', '*.xls']:
|
||
excel_files.extend(glob.glob(os.path.join(UPLOAD_FOLDER, pattern)))
|
||
|
||
if not excel_files:
|
||
return 0
|
||
|
||
logger.info(f"发现 {len(excel_files)} 个Excel文件待处理")
|
||
|
||
processed_count = 0
|
||
for excel_path in excel_files:
|
||
try:
|
||
success_count = process_single_file(db_manager, excel_path)
|
||
|
||
# 处理成功后删除文件
|
||
os.remove(excel_path)
|
||
logger.info(f"✓ 文件处理完成并已删除: {os.path.basename(excel_path)} (导入 {success_count} 条)")
|
||
processed_count += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"✗ 处理文件失败: {os.path.basename(excel_path)}, 错误: {e}")
|
||
# 失败时不删除文件,保留以便排查
|
||
|
||
return processed_count
|
||
|
||
|
||
def main():
|
||
"""主函数 - 定频轮询模式"""
|
||
logger.info("=" * 60)
|
||
logger.info("定频轮询脚本启动")
|
||
logger.info(f"轮询间隔: {POLL_INTERVAL} 秒")
|
||
logger.info(f"监控目录: {UPLOAD_FOLDER}")
|
||
logger.info(f"固定配置: seed_id={SEED_ID}, seed_name='{SEED_NAME}', crawled={CRAWLED}")
|
||
logger.info("=" * 60)
|
||
|
||
# 创建数据库管理器
|
||
db_manager = DatabaseManager(DB_CONFIG)
|
||
|
||
# 定频轮询
|
||
while True:
|
||
try:
|
||
processed = poll_once(db_manager)
|
||
if processed > 0:
|
||
logger.info(f"本轮处理完成,共处理 {processed} 个文件")
|
||
except Exception as e:
|
||
logger.error(f"轮询出错: {e}")
|
||
|
||
# 等待下一轮
|
||
time.sleep(POLL_INTERVAL)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main() |