Files
ai_image_quary/import_keywords.py

363 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
定频轮询脚本从Excel文件导入关键词到baidu_keyword表
"""
import pandas as pd
import logging
import os
import time
import glob
from database_config import DatabaseManager, DB_CONFIG
from datetime import datetime
# 配置
POLL_INTERVAL = 60 # 轮询间隔(秒)
UPLOAD_FOLDER = 'query_upload' # Excel文件目录
SEED_ID = 9999 # 固定值
SEED_NAME = '手动提交' # 固定值
CRAWLED = 1 # 固定值
QUERY_COLUMN = 'query' # Excel中query列名
DEPT_COLUMN = '科室' # Excel中科室列名
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def read_excel_keywords_with_department(excel_path, query_column='query', department_column='科室'):
"""
读取Excel文件中的关键词和部门信息
Args:
excel_path: Excel文件路径
query_column: query列名默认为'query'
department_column: 部门列名,默认为'科室'如果为None则不读取部门信息
Returns:
包含(keyword, department)元组的列表如果没有部门列则department为None
"""
try:
# 读取Excel文件
df = pd.read_excel(excel_path)
logger.info(f"成功读取Excel文件: {excel_path}")
logger.info(f"Excel文件包含 {len(df)} 行数据")
logger.info(f"Excel列名: {df.columns.tolist()}")
# 检查query列是否存在
if query_column not in df.columns:
logger.error(f"未找到query列: {query_column}")
return []
# 检查是否有部门列
has_department = department_column and department_column in df.columns
if department_column and not has_department:
logger.warning(f"未找到department列: {department_column},将不使用部门信息")
# 获取query数据
query_data = df[query_column].dropna()
query_list = query_data.tolist()
# 根据是否有部门列,组合数据
keyword_dept_pairs = []
if has_department:
# 有部门列,获取部门数据
department_data = df[department_column].dropna()
# 对齐数据长度,取最短长度
min_length = min(len(query_data), len(department_data))
query_list = query_data.iloc[:min_length].tolist()
department_list = department_data.iloc[:min_length].tolist()
for i in range(min_length):
keyword = str(query_list[i]).strip()
department = str(department_list[i]).strip()
if keyword and department: # 确保关键词和部门都不为空
keyword_dept_pairs.append((keyword, department))
else:
# 没有部门列,只提取关键词
logger.info("没有部门列,将只导入关键词,不指定科室")
for keyword in query_list:
keyword = str(keyword).strip()
if keyword: # 确保关键词不为空
keyword_dept_pairs.append((keyword, None))
# 去除重复项,保留第一个出现的组合
seen = set()
unique_keyword_dept_pairs = []
for keyword, dept in keyword_dept_pairs:
if (keyword, dept) not in seen:
seen.add((keyword, dept))
unique_keyword_dept_pairs.append((keyword, dept))
if has_department:
logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词-部门组合")
else:
logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词")
return unique_keyword_dept_pairs
except Exception as e:
logger.error(f"读取Excel文件失败: {e}", exc_info=True)
raise
def get_department_id(db_manager, department_name):
"""
根据科室名称从ai_departments表中获取对应的ID
Args:
db_manager: 数据库管理器实例
department_name: 科室名称
Returns:
科室ID如果未找到则抛出异常
"""
try:
# 查询科室ID - 使用正确的字段名
sql = "SELECT id FROM ai_departments WHERE department_name = %s"
result = db_manager.execute_query(sql, (department_name,), fetch_one=True)
if result:
return result[0] # 返回ID
else:
error_msg = f"未找到科室 '{department_name}' 的ID请先在ai_departments表中添加该科室"
logger.error(error_msg)
raise ValueError(error_msg)
except Exception as e:
logger.error(f"查询科室ID失败: {e}", exc_info=True)
raise
def get_author_info_by_department(db_manager, department_id):
"""
根据科室ID从ai_authors表中随机获取一个符合条件的作者信息
Args:
db_manager: 数据库管理器实例
department_id: 科室ID
Returns:
(author_id, author_name) 元组,如果未找到则返回 (0, '')
"""
try:
# 随机获取该科室下的一个活跃作者
sql = "SELECT id, author_name FROM ai_authors WHERE department_id = %s AND status = 'active' AND daily_post_max > 0 ORDER BY RAND() LIMIT 1"
result = db_manager.execute_query(sql, (department_id,), fetch_one=True)
if result:
return result[0], result[1]
else:
logger.warning(f"未找到科室ID {department_id} 下符合条件的活跃作者")
return 0, ''
except Exception as e:
logger.error(f"查询作者信息失败: {e}")
return 0, ''
def import_keywords_to_db(db_manager, keyword_dept_pairs, seed_id=9999, seed_name='手动提交', crawled=1, batch_size=100, sleep_seconds=0.1):
"""
将关键词批量导入到baidu_keyword表
Args:
db_manager: 数据库管理器实例
keyword_dept_pairs: 包含(keyword, department)元组的列表
seed_id: 种子ID默认9999
seed_name: 种子名称,默认'手动提交'
crawled: 是否已爬取默认1
batch_size: 日志批次大小,每多少条记录输出一次进度
sleep_seconds: 每条记录间隔睡眠时间默认0.1秒
Returns:
成功导入的数量
"""
if not keyword_dept_pairs:
logger.warning("没有关键词需要导入")
return 0
try:
logger.info(f"开始导入 {len(keyword_dept_pairs)} 个关键词-部门组合到数据库...")
logger.info("采用逐条查询+插入模式,避免重复")
# 准备SQL语句
check_sql = "SELECT COUNT(*) FROM baidu_keyword WHERE keyword = %s"
insert_sql = """
INSERT INTO baidu_keyword (keyword, seed_id, seed_name, crawled, parents_id, created_at, department, department_id, query_status, author_id, author_name)
VALUES (%s, %s, %s, %s, 0, NOW(), %s, %s, %s, %s, %s)
"""
success_count = 0
skip_count = 0
failed_count = 0
# 逐条处理
for idx, (keyword, department) in enumerate(keyword_dept_pairs, 1):
try:
if department:
logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 部门: {department}')
else:
logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 无部门信息')
# 1. 如果有部门信息获取科室ID和作者信息
if department:
dept_id = get_department_id(db_manager, department)
author_id, author_name = get_author_info_by_department(db_manager, dept_id)
else:
# 没有部门信息使用默认值空字符串而不是None避免数据库NOT NULL约束
department = ''
dept_id = 0
author_id = 0
author_name = ''
# 2. 查询关键词是否存在
result = db_manager.execute_query(check_sql, (keyword,), fetch_one=True)
exists = result[0] > 0 if result else False
if exists:
skip_count += 1
logger.debug(f'[调试] 关键词已存在,跳过: {keyword}')
else:
# 3. 不存在则插入
if department:
logger.debug(f'[调试] 准备插入: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}, query_status: manual_review')
else:
logger.debug(f'[调试] 准备插入: {keyword}, 无部门信息, query_status: manual_review')
affected = db_manager.execute_update(
insert_sql,
(keyword, seed_id, seed_name, crawled, department, dept_id, 'manual_review', author_id, author_name),
autocommit=True
)
if affected > 0:
success_count += 1
if department:
logger.debug(f'[调试] 插入成功: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}, query_status: manual_review')
else:
logger.debug(f'[调试] 插入成功: {keyword}, 无部门信息, query_status: manual_review')
# 5. 输出进度
if idx % batch_size == 0 or idx == len(keyword_dept_pairs):
progress = (idx / len(keyword_dept_pairs)) * 100
logger.info(f'[插入进度] {idx}/{len(keyword_dept_pairs)} ({progress:.1f}%) | 成功: {success_count} | 跳过: {skip_count} | 失败: {failed_count}')
# 6. 每次执行完sleep
time.sleep(sleep_seconds)
except ValueError as ve:
# 遇到科室不存在的错误,停止整个导入过程
logger.error(f'[错误] 第 {idx} 条记录遇到错误: {ve}')
raise ve
except Exception as e:
failed_count += 1
logger.warning(f'[调试] 处理失败 [{idx}/{len(keyword_dept_pairs)}]: keyword={keyword}, 部门={department},错误:{e}')
logger.info(f"导入完成!成功插入: {success_count} | 跳过已存在: {skip_count} | 失败: {failed_count}")
return success_count
except Exception as e:
logger.error(f"导入关键词失败: {e}", exc_info=True)
raise
def process_single_file(db_manager, excel_path):
"""
处理单个Excel文件
Args:
db_manager: 数据库管理器实例
excel_path: Excel文件路径
Returns:
成功导入的数量
"""
logger.info(f"开始处理文件: {os.path.basename(excel_path)}")
# 读取Excel文件
keyword_dept_pairs = read_excel_keywords_with_department(excel_path, QUERY_COLUMN, DEPT_COLUMN)
if not keyword_dept_pairs:
logger.warning(f"文件 {os.path.basename(excel_path)} 中没有可导入的数据")
return 0
# 执行导入
success_count = import_keywords_to_db(
db_manager=db_manager,
keyword_dept_pairs=keyword_dept_pairs,
seed_id=SEED_ID,
seed_name=SEED_NAME,
crawled=CRAWLED
)
return success_count
def poll_once(db_manager):
"""
执行一次轮询
Returns:
处理的文件数
"""
# 确保目录存在
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
logger.info(f"创建目录: {UPLOAD_FOLDER}")
return 0
# 查找Excel文件
excel_files = []
for pattern in ['*.xlsx', '*.xls']:
excel_files.extend(glob.glob(os.path.join(UPLOAD_FOLDER, pattern)))
if not excel_files:
return 0
logger.info(f"发现 {len(excel_files)} 个Excel文件待处理")
processed_count = 0
for excel_path in excel_files:
try:
success_count = process_single_file(db_manager, excel_path)
# 处理成功后删除文件
os.remove(excel_path)
logger.info(f"✓ 文件处理完成并已删除: {os.path.basename(excel_path)} (导入 {success_count} 条)")
processed_count += 1
except Exception as e:
logger.error(f"✗ 处理文件失败: {os.path.basename(excel_path)}, 错误: {e}")
# 失败时不删除文件,保留以便排查
return processed_count
def main():
"""主函数 - 定频轮询模式"""
logger.info("=" * 60)
logger.info("定频轮询脚本启动")
logger.info(f"轮询间隔: {POLL_INTERVAL}")
logger.info(f"监控目录: {UPLOAD_FOLDER}")
logger.info(f"固定配置: seed_id={SEED_ID}, seed_name='{SEED_NAME}', crawled={CRAWLED}")
logger.info("=" * 60)
# 创建数据库管理器
db_manager = DatabaseManager(DB_CONFIG)
# 定频轮询
while True:
try:
processed = poll_once(db_manager)
if processed > 0:
logger.info(f"本轮处理完成,共处理 {processed} 个文件")
except Exception as e:
logger.error(f"轮询出错: {e}")
# 等待下一轮
time.sleep(POLL_INTERVAL)
if __name__ == '__main__':
main()