commit 273b529d690deeaf00bd904500750c8118d07443 Author: liangguodong Date: Thu Feb 5 21:53:21 2026 +0800 feat: 定频轮询关键词导入系统 diff --git a/README.md b/README.md new file mode 100644 index 0000000..a9c89fb --- /dev/null +++ b/README.md @@ -0,0 +1,195 @@ +# AI关键词导入与统计工具集 + +## 项目简介 + +用于管理 `ai_article` 数据库中百度关键词数据的导入、更新和统计导出。 + +## 环境依赖 + +```bash +pip install pandas pymysql openpyxl +``` + +## 文件说明 + +| 文件 | 功能 | +|------|------| +| `database_config.py` | 数据库配置与连接管理 | +| `import_keywords.py` | 定频轮询导入关键词到baidu_keyword表 | +| `start_import_keywords.sh` | 服务管理脚本(启动/停止/重启/状态) | +| `update_keywords_from_excel.py` | 根据Excel更新已有关键词记录 | +| `export_author_stats.py` | 导出作者发文统计到CSV | + +## 目录结构 + +``` +ai_import_quary/ +├── database_config.py # 数据库配置 +├── import_keywords.py # 关键词导入脚本(定频轮询) +├── start_import_keywords.sh # 服务管理脚本 +├── update_keywords_from_excel.py # 关键词更新脚本 +├── export_author_stats.py # 统计导出脚本 +├── query_upload/ # Excel导入文件存放目录 +└── exports/ # CSV导出文件存放目录 +``` + +## 数据库表关系 + +``` +ai_departments (科室表) + │ id → department_id + ▼ +ai_authors (作者表) + │ id → author_id + ▼ +baidu_keyword (关键词表) + │ keyword + ▼ +ai_articles (文章表) +``` + +## 使用方法 + +### 1. 导入关键词 (import_keywords.py) + +**服务管理**(推荐): +```bash +./start_import_keywords.sh start # 启动服务 +./start_import_keywords.sh stop # 停止服务 +./start_import_keywords.sh restart # 重启服务 +./start_import_keywords.sh status # 查看状态 +./start_import_keywords.sh logs # 查看日志 +./start_import_keywords.sh logs-follow # 实时日志 +``` + +**直接运行**: +```bash +python import_keywords.py +``` + +**运行模式**:定频轮询(默认每60秒轮询一次) + +**工作流程**: +1. 脚本启动后持续监控 `query_upload/` 目录 +2. 发现Excel文件后自动处理 +3. 处理成功后自动删除源文件 + +**Excel格式要求**: +- 必须包含 `query` 列(关键词,不能为空) +- 必须包含 `科室` 列(部门信息) + +**配置参数**(在脚本头部修改): +```python +POLL_INTERVAL = 60 # 轮询间隔(秒) +UPLOAD_FOLDER = 'query_upload' # 监控目录 +SEED_ID = 9999 # 固定值 +SEED_NAME = '手动提交' # 固定值 +CRAWLED = 1 # 固定值 +``` + +**流程图**: + +``` +┌─────────────────┐ +│ 定频轮询启动 │ +└────────┬────────┘ + ▼ +┌─────────────────┐ +│ 读取Excel │ +│ (query+科室) │ +└────────┬────────┘ + ▼ +┌─────────────────┐ +│ 查询科室 │ +│ ai_departments │ +│ → department_id│ +└────────┬────────┘ + ▼ +┌─────────────────┐ +│ 指定科室 │ +│ 获取author信息 │ +│ ai_authors │ +│ → author_id │ +│ → author_name │ +└────────┬────────┘ + ▼ +┌─────────────────┐ +│ 判重 │ +│ keyword不能重复 │ +└────────┬────────┘ + │ + ┌────┴────┐ + │ │ + 重复 不重复 + │ │ + ▼ ▼ +┌──────┐ ┌───────────────────────┐ +│ 跳过 │ │ INSERT baidu_keyword │ +└──────┘ │ ┌───────────────────┐ │ + │ │ keyword = query值 │ │ + │ │ crawled = 1 │ │ + │ │ seed_id = 9999 │ │ + │ │ seed_name = 手动提交│ │ + │ │ department = 科室 │ │ + │ │ department_id = x │ │ + │ │ author_id = 随机 │ │ + │ │ author_name = 随机│ │ + │ │ query_status = │ │ + │ │ manual_review │ │ + │ └───────────────────┘ │ + └───────────────────────┘ +``` + +### 2. 更新关键词 (update_keywords_from_excel.py) + +```bash +python update_keywords_from_excel.py +``` + +根据Excel更新已存在的关键词记录(department、department_id、author_id、author_name)。 + +### 3. 导出统计 (export_author_stats.py) + +```bash +python export_author_stats.py +python export_author_stats.py --date 2026-01-28 +``` + +**参数**: +- `--date`: 目标日期,默认当天 +- `--output-dir`: 输出目录,默认 `./exports` + +**输出文件**: +- `author_review_stats_{date}.csv` - 发文审核统计(所有状态) +- `author_published_stats_{date}.csv` - 发文成功统计(published状态) + +## 数据库配置 + +默认配置在 `database_config.py`: + +```python +DB_CONFIG = { + 'host': '8.149.233.36', + 'user': 'ai_article_read', + 'password': '***', + 'database': 'ai_article', + 'charset': 'utf8mb4' +} +``` + +## 导入记录字段说明 + +`baidu_keyword` 表插入字段: + +| 字段 | 值 | 说明 | +|------|-----|------| +| keyword | Excel中query列 | 关键词 | +| seed_id | 9999 | 种子ID | +| seed_name | 手动提交 | 种子名称 | +| crawled | 1 | 已爬取标记 | +| parents_id | 0 | 父级ID | +| department | Excel中科室列 | 科室名称 | +| department_id | 查表获取 | 科室ID | +| query_status | manual_review | 查询状态 | +| author_id | 查表获取 | 作者ID | +| author_name | 查表获取 | 作者名称 | diff --git a/database_config.py b/database_config.py new file mode 100644 index 0000000..d5d7455 --- /dev/null +++ b/database_config.py @@ -0,0 +1,160 @@ +""" +数据库配置管理模块 +统一管理数据库连接和SQL操作 +""" +import pymysql +import logging + +logger = logging.getLogger(__name__) + +# 数据库配置 +DB_CONFIG = { + 'host': '8.149.233.36', + 'user': 'ai_article_read', + 'password': '7aK_H2yvokVumr84lLNDt8fDBp6P', + 'database': 'ai_article', + 'charset': 'utf8mb4' +} + + +class DatabaseManager: + """数据库管理器:统一管理数据库连接和操作""" + + def __init__(self, config=None): + """初始化数据库管理器 + + Args: + config: 数据库配置字典,默认使用 DB_CONFIG + """ + self.config = config or DB_CONFIG + + def get_connection(self, autocommit=False): + """获取数据库连接 + + Args: + autocommit: 是否启用自动提交模式 + + Returns: + pymysql连接对象 + """ + return pymysql.connect(**self.config, autocommit=autocommit) + + def execute_query(self, sql, params=None, fetch_one=False): + """执行查询SQL(SELECT) + + Args: + sql: SQL语句 + params: SQL参数(tuple或list) + fetch_one: True返回单条记录,False返回所有记录 + + Returns: + 查询结果 + """ + conn = None + cursor = None + try: + conn = self.get_connection() + cursor = conn.cursor() + + logger.info(f'[SQL] {sql.strip()} | params: {params}') + cursor.execute(sql, params or ()) + + if fetch_one: + result = cursor.fetchone() + else: + result = cursor.fetchall() + + logger.debug(f'[SQL结果] 返回 {len(result) if not fetch_one and result else (1 if result else 0)} 条记录') + return result + except Exception as e: + logger.error(f'执行查询失败:{e}', exc_info=True) + raise + finally: + if cursor: + cursor.close() + if conn: + conn.close() + + def execute_update(self, sql, params=None, autocommit=True): + """执行更新SQL(INSERT/UPDATE/DELETE) + + Args: + sql: SQL语句 + params: SQL参数(tuple或list) + autocommit: 是否自动提交 + + Returns: + 影响的行数 + """ + conn = None + cursor = None + try: + conn = self.get_connection(autocommit=autocommit) + cursor = conn.cursor() + + logger.info(f'[SQL] {sql.strip()} | params: {params}') + result = cursor.execute(sql, params or ()) + + if not autocommit: + conn.commit() + + logger.info(f'[SQL执行] 影响 {result} 行') + return result + except Exception as e: + if not autocommit and conn: + conn.rollback() + logger.error(f'执行更新失败:{e}', exc_info=True) + raise + finally: + if cursor: + cursor.close() + if conn: + conn.close() + + def execute_many(self, sql, params_list, autocommit=True): + """批量执行SQL + + Args: + sql: SQL语句 + params_list: 参数列表,每个元素是一组参数 + autocommit: 是否自动提交 + + Returns: + 成功执行的行数 + """ + conn = None + cursor = None + try: + conn = self.get_connection(autocommit=autocommit) + cursor = conn.cursor() + + logger.info(f'[SQL批量] {sql.strip()} | 批次数: {len(params_list)}') + + success_count = 0 + for params in params_list: + try: + result = cursor.execute(sql, params) + if result > 0: + success_count += 1 + except Exception as e: + logger.debug(f'批量执行跳过:params={params},错误:{e}') + + if not autocommit: + conn.commit() + + logger.info(f'[SQL批量执行] 成功 {success_count}/{len(params_list)} 条') + return success_count + except Exception as e: + if not autocommit and conn: + conn.rollback() + logger.error(f'批量执行失败:{e}', exc_info=True) + raise + finally: + if cursor: + cursor.close() + if conn: + conn.close() + + +# 创建全局数据库管理器实例 +db_manager = DatabaseManager() diff --git a/export_author_stats.py b/export_author_stats.py new file mode 100644 index 0000000..da69fb0 --- /dev/null +++ b/export_author_stats.py @@ -0,0 +1,191 @@ +""" +导出作者发文统计数据到CSV +""" +import pandas as pd +import logging +import argparse +from database_config import DatabaseManager +from datetime import datetime, date + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def export_author_review_stats(db_manager, target_date, output_file): + """ + 导出作者发文审核统计(包含所有审核状态) + + Args: + db_manager: 数据库管理器实例 + target_date: 目标日期,格式:'2026-01-21' + output_file: 输出CSV文件路径 + + Returns: + 导出的记录数 + """ + sql = """ + SELECT + a.`author_id`, + COALESCE(a.`author_name`, auth.`author_name`) as `author_name`, + COUNT(*) as `today_published_count` + FROM `ai_articles` a + LEFT JOIN `ai_authors` auth ON a.`author_id` = auth.`id` + WHERE + a.`channel` = 4 + AND a.`status` IN('published', 'published_review', 'pending_review', 'failed') + AND DATE(a.`updated_at`) = %s + GROUP BY a.`author_id`, COALESCE(a.`author_name`, auth.`author_name`) + HAVING COUNT(*) > 0 + ORDER BY `today_published_count` DESC + """ + + try: + logger.info(f"正在查询发文审核统计数据(日期:{target_date})...") + result = db_manager.execute_query(sql, (target_date,)) + + if not result: + logger.warning("未查询到数据") + return 0 + + # 转换为DataFrame + df = pd.DataFrame(result, columns=['author_id', 'author_name', 'today_published_count']) + + # 导出到CSV + df.to_csv(output_file, index=False, encoding='utf-8-sig') + logger.info(f"成功导出 {len(df)} 条记录到: {output_file}") + + # 显示前5条预览 + logger.info(f"\n数据预览(前5条):\n{df.head()}") + + return len(df) + + except Exception as e: + logger.error(f"导出发文审核统计失败: {e}", exc_info=True) + raise + + +def export_author_published_stats(db_manager, target_date, output_file): + """ + 导出作者发文成功统计(仅published状态) + + Args: + db_manager: 数据库管理器实例 + target_date: 目标日期,格式:'2026-01-21' + output_file: 输出CSV文件路径 + + Returns: + 导出的记录数 + """ + sql = """ + SELECT + a.`author_id`, + COALESCE(a.`author_name`, auth.`author_name`) as `author_name`, + COUNT(*) as `today_published_count`, + COALESCE(auth.`daily_post_max`, 0) as `daily_post_max`, + (COALESCE(auth.`daily_post_max`, 0) - COUNT(*)) as `gap` + FROM `ai_articles` a + LEFT JOIN `ai_authors` auth ON a.`author_id` = auth.`id` + WHERE + a.`channel` = 4 + AND a.`status` = 'published' + AND DATE(a.`updated_at`) = %s + GROUP BY a.`author_id`, COALESCE(a.`author_name`, auth.`author_name`), auth.`daily_post_max` + HAVING COUNT(*) > 0 + ORDER BY `gap` ASC + """ + + try: + logger.info(f"正在查询发文成功统计数据(日期:{target_date})...") + result = db_manager.execute_query(sql, (target_date,)) + + if not result: + logger.warning("未查询到数据") + return 0 + + # 转换为DataFrame + df = pd.DataFrame(result, columns=['author_id', 'author_name', 'today_published_count', 'daily_post_max', 'gap']) + + # 导出到CSV + df.to_csv(output_file, index=False, encoding='utf-8-sig') + logger.info(f"成功导出 {len(df)} 条记录到: {output_file}") + + # 显示前5条预览 + logger.info(f"\n数据预览(前5条):\n{df.head()}") + + return len(df) + + except Exception as e: + logger.error(f"导出发文成功统计失败: {e}", exc_info=True) + raise + + +def main(): + """主函数""" + # 解析命令行参数 + parser = argparse.ArgumentParser(description='导出作者发文统计数据到CSV') + parser.add_argument('--host', default='8.149.233.36', help='数据库主机') + parser.add_argument('--port', type=int, default=3306, help='数据库端口') + parser.add_argument('--user', default='ai_articles_read', help='数据库用户名') + parser.add_argument('--password', default='7aK_H2yvokVumr84lLNDt8fDBp6P', help='数据库密码') + parser.add_argument('--database', default='ai_article', help='数据库名') + parser.add_argument('--date', default=date.today().strftime('%Y-%m-%d'), help='目标日期,格式:YYYY-MM-DD') + parser.add_argument('--output-dir', default='./exports', help='输出目录') + + args = parser.parse_args() + + # 创建输出目录 + import os + os.makedirs(args.output_dir, exist_ok=True) + + # 创建数据库连接配置 + db_config = { + 'host': args.host, + 'port': args.port, + 'user': args.user, + 'password': args.password, + 'database': args.database, + 'charset': 'utf8mb4' + } + + logger.info("=" * 60) + logger.info("开始导出作者发文统计数据") + logger.info(f"数据库配置: {args.user}@{args.host}:{args.port}/{args.database}") + logger.info(f"目标日期: {args.date}") + logger.info(f"输出目录: {args.output_dir}") + logger.info("=" * 60) + + # 创建数据库管理器 + db_manager = DatabaseManager(db_config) + + try: + # 生成输出文件名 + review_file = f"{args.output_dir}/author_review_stats_{args.date}.csv" + published_file = f"{args.output_dir}/author_published_stats_{args.date}.csv" + + # 1. 导出发文审核统计 + logger.info("\n[任务1] 导出发文审核统计(所有状态)") + review_count = export_author_review_stats(db_manager, args.date, review_file) + + # 2. 导出发文成功统计 + logger.info("\n[任务2] 导出发文成功统计(published状态)") + published_count = export_author_published_stats(db_manager, args.date, published_file) + + logger.info("\n" + "=" * 60) + logger.info("✓ 导出完成!") + logger.info(f"发文审核统计: {review_count} 条 -> {review_file}") + logger.info(f"发文成功统计: {published_count} 条 -> {published_file}") + logger.info("=" * 60) + + except Exception as e: + logger.error(f"✗ 导出过程出错: {e}", exc_info=True) + logger.info("=" * 60) + logger.info("✗ 导出失败") + logger.info("=" * 60) + + +if __name__ == '__main__': + main() diff --git a/import_keywords.py b/import_keywords.py new file mode 100644 index 0000000..2f144b2 --- /dev/null +++ b/import_keywords.py @@ -0,0 +1,363 @@ +""" +定频轮询脚本:从Excel文件导入关键词到baidu_keyword表 +""" +import pandas as pd +import logging +import os +import time +import glob +from database_config import DatabaseManager, DB_CONFIG +from datetime import datetime + +# 配置 +POLL_INTERVAL = 60 # 轮询间隔(秒) +UPLOAD_FOLDER = 'query_upload' # Excel文件目录 +SEED_ID = 9999 # 固定值 +SEED_NAME = '手动提交' # 固定值 +CRAWLED = 1 # 固定值 +QUERY_COLUMN = 'query' # Excel中query列名 +DEPT_COLUMN = '科室' # Excel中科室列名 + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def read_excel_keywords_with_department(excel_path, query_column='query', department_column='科室'): + """ + 读取Excel文件中的关键词和部门信息 + + Args: + excel_path: Excel文件路径 + query_column: query列名,默认为'query' + department_column: 部门列名,默认为'科室',如果为None则不读取部门信息 + + Returns: + 包含(keyword, department)元组的列表,如果没有部门列则department为None + """ + try: + # 读取Excel文件 + df = pd.read_excel(excel_path) + logger.info(f"成功读取Excel文件: {excel_path}") + logger.info(f"Excel文件包含 {len(df)} 行数据") + logger.info(f"Excel列名: {df.columns.tolist()}") + + # 检查query列是否存在 + if query_column not in df.columns: + logger.error(f"未找到query列: {query_column}") + return [] + + # 检查是否有部门列 + has_department = department_column and department_column in df.columns + if department_column and not has_department: + logger.warning(f"未找到department列: {department_column},将不使用部门信息") + + # 获取query数据 + query_data = df[query_column].dropna() + query_list = query_data.tolist() + + # 根据是否有部门列,组合数据 + keyword_dept_pairs = [] + if has_department: + # 有部门列,获取部门数据 + department_data = df[department_column].dropna() + # 对齐数据长度,取最短长度 + min_length = min(len(query_data), len(department_data)) + query_list = query_data.iloc[:min_length].tolist() + department_list = department_data.iloc[:min_length].tolist() + + for i in range(min_length): + keyword = str(query_list[i]).strip() + department = str(department_list[i]).strip() + if keyword and department: # 确保关键词和部门都不为空 + keyword_dept_pairs.append((keyword, department)) + else: + # 没有部门列,只提取关键词 + logger.info("没有部门列,将只导入关键词,不指定科室") + for keyword in query_list: + keyword = str(keyword).strip() + if keyword: # 确保关键词不为空 + keyword_dept_pairs.append((keyword, None)) + + # 去除重复项,保留第一个出现的组合 + seen = set() + unique_keyword_dept_pairs = [] + for keyword, dept in keyword_dept_pairs: + if (keyword, dept) not in seen: + seen.add((keyword, dept)) + unique_keyword_dept_pairs.append((keyword, dept)) + + if has_department: + logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词-部门组合") + else: + logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词") + + return unique_keyword_dept_pairs + + except Exception as e: + logger.error(f"读取Excel文件失败: {e}", exc_info=True) + raise + + +def get_department_id(db_manager, department_name): + """ + 根据科室名称从ai_departments表中获取对应的ID + + Args: + db_manager: 数据库管理器实例 + department_name: 科室名称 + + Returns: + 科室ID,如果未找到则抛出异常 + """ + try: + # 查询科室ID - 使用正确的字段名 + sql = "SELECT id FROM ai_departments WHERE department_name = %s" + result = db_manager.execute_query(sql, (department_name,), fetch_one=True) + + if result: + return result[0] # 返回ID + else: + error_msg = f"未找到科室 '{department_name}' 的ID,请先在ai_departments表中添加该科室" + logger.error(error_msg) + raise ValueError(error_msg) + + except Exception as e: + logger.error(f"查询科室ID失败: {e}", exc_info=True) + raise + + +def get_author_info_by_department(db_manager, department_id): + """ + 根据科室ID从ai_authors表中随机获取一个符合条件的作者信息 + + Args: + db_manager: 数据库管理器实例 + department_id: 科室ID + + Returns: + (author_id, author_name) 元组,如果未找到则返回 (0, '') + """ + try: + # 随机获取该科室下的一个活跃作者 + sql = "SELECT id, author_name FROM ai_authors WHERE department_id = %s AND status = 'active' AND daily_post_max > 0 ORDER BY RAND() LIMIT 1" + result = db_manager.execute_query(sql, (department_id,), fetch_one=True) + + if result: + return result[0], result[1] + else: + logger.warning(f"未找到科室ID {department_id} 下符合条件的活跃作者") + return 0, '' + + except Exception as e: + logger.error(f"查询作者信息失败: {e}") + return 0, '' + + +def import_keywords_to_db(db_manager, keyword_dept_pairs, seed_id=9999, seed_name='手动提交', crawled=1, batch_size=100, sleep_seconds=0.1): + """ + 将关键词批量导入到baidu_keyword表 + + Args: + db_manager: 数据库管理器实例 + keyword_dept_pairs: 包含(keyword, department)元组的列表 + seed_id: 种子ID,默认9999 + seed_name: 种子名称,默认'手动提交' + crawled: 是否已爬取,默认1 + batch_size: 日志批次大小,每多少条记录输出一次进度 + sleep_seconds: 每条记录间隔睡眠时间(秒),默认0.1秒 + + Returns: + 成功导入的数量 + """ + if not keyword_dept_pairs: + logger.warning("没有关键词需要导入") + return 0 + + try: + logger.info(f"开始导入 {len(keyword_dept_pairs)} 个关键词-部门组合到数据库...") + logger.info("采用逐条查询+插入模式,避免重复") + + # 准备SQL语句 + check_sql = "SELECT COUNT(*) FROM baidu_keyword WHERE keyword = %s" + insert_sql = """ + INSERT INTO baidu_keyword (keyword, seed_id, seed_name, crawled, parents_id, created_at, department, department_id, query_status, author_id, author_name) + VALUES (%s, %s, %s, %s, 0, NOW(), %s, %s, %s, %s, %s) + """ + + success_count = 0 + skip_count = 0 + failed_count = 0 + + # 逐条处理 + for idx, (keyword, department) in enumerate(keyword_dept_pairs, 1): + try: + if department: + logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 部门: {department}') + else: + logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 无部门信息') + + # 1. 如果有部门信息,获取科室ID和作者信息 + if department: + dept_id = get_department_id(db_manager, department) + author_id, author_name = get_author_info_by_department(db_manager, dept_id) + else: + # 没有部门信息,使用默认值(空字符串而不是None,避免数据库NOT NULL约束) + department = '' + dept_id = 0 + author_id = 0 + author_name = '' + + # 2. 查询关键词是否存在 + result = db_manager.execute_query(check_sql, (keyword,), fetch_one=True) + exists = result[0] > 0 if result else False + + if exists: + skip_count += 1 + logger.debug(f'[调试] 关键词已存在,跳过: {keyword}') + else: + # 3. 不存在则插入 + if department: + logger.debug(f'[调试] 准备插入: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}, query_status: manual_review') + else: + logger.debug(f'[调试] 准备插入: {keyword}, 无部门信息, query_status: manual_review') + + affected = db_manager.execute_update( + insert_sql, + (keyword, seed_id, seed_name, crawled, department, dept_id, 'manual_review', author_id, author_name), + autocommit=True + ) + + if affected > 0: + success_count += 1 + if department: + logger.debug(f'[调试] 插入成功: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}, query_status: manual_review') + else: + logger.debug(f'[调试] 插入成功: {keyword}, 无部门信息, query_status: manual_review') + + # 5. 输出进度 + if idx % batch_size == 0 or idx == len(keyword_dept_pairs): + progress = (idx / len(keyword_dept_pairs)) * 100 + logger.info(f'[插入进度] {idx}/{len(keyword_dept_pairs)} ({progress:.1f}%) | 成功: {success_count} | 跳过: {skip_count} | 失败: {failed_count}') + + # 6. 每次执行完sleep + time.sleep(sleep_seconds) + + except ValueError as ve: + # 遇到科室不存在的错误,停止整个导入过程 + logger.error(f'[错误] 第 {idx} 条记录遇到错误: {ve}') + raise ve + except Exception as e: + failed_count += 1 + logger.warning(f'[调试] 处理失败 [{idx}/{len(keyword_dept_pairs)}]: keyword={keyword}, 部门={department},错误:{e}') + + logger.info(f"导入完成!成功插入: {success_count} | 跳过已存在: {skip_count} | 失败: {failed_count}") + return success_count + + except Exception as e: + logger.error(f"导入关键词失败: {e}", exc_info=True) + raise + + +def process_single_file(db_manager, excel_path): + """ + 处理单个Excel文件 + + Args: + db_manager: 数据库管理器实例 + excel_path: Excel文件路径 + + Returns: + 成功导入的数量 + """ + logger.info(f"开始处理文件: {os.path.basename(excel_path)}") + + # 读取Excel文件 + keyword_dept_pairs = read_excel_keywords_with_department(excel_path, QUERY_COLUMN, DEPT_COLUMN) + + if not keyword_dept_pairs: + logger.warning(f"文件 {os.path.basename(excel_path)} 中没有可导入的数据") + return 0 + + # 执行导入 + success_count = import_keywords_to_db( + db_manager=db_manager, + keyword_dept_pairs=keyword_dept_pairs, + seed_id=SEED_ID, + seed_name=SEED_NAME, + crawled=CRAWLED + ) + + return success_count + + +def poll_once(db_manager): + """ + 执行一次轮询 + + Returns: + 处理的文件数 + """ + # 确保目录存在 + if not os.path.exists(UPLOAD_FOLDER): + os.makedirs(UPLOAD_FOLDER) + logger.info(f"创建目录: {UPLOAD_FOLDER}") + return 0 + + # 查找Excel文件 + excel_files = [] + for pattern in ['*.xlsx', '*.xls']: + excel_files.extend(glob.glob(os.path.join(UPLOAD_FOLDER, pattern))) + + if not excel_files: + return 0 + + logger.info(f"发现 {len(excel_files)} 个Excel文件待处理") + + processed_count = 0 + for excel_path in excel_files: + try: + success_count = process_single_file(db_manager, excel_path) + + # 处理成功后删除文件 + os.remove(excel_path) + logger.info(f"✓ 文件处理完成并已删除: {os.path.basename(excel_path)} (导入 {success_count} 条)") + processed_count += 1 + + except Exception as e: + logger.error(f"✗ 处理文件失败: {os.path.basename(excel_path)}, 错误: {e}") + # 失败时不删除文件,保留以便排查 + + return processed_count + + +def main(): + """主函数 - 定频轮询模式""" + logger.info("=" * 60) + logger.info("定频轮询脚本启动") + logger.info(f"轮询间隔: {POLL_INTERVAL} 秒") + logger.info(f"监控目录: {UPLOAD_FOLDER}") + logger.info(f"固定配置: seed_id={SEED_ID}, seed_name='{SEED_NAME}', crawled={CRAWLED}") + logger.info("=" * 60) + + # 创建数据库管理器 + db_manager = DatabaseManager(DB_CONFIG) + + # 定频轮询 + while True: + try: + processed = poll_once(db_manager) + if processed > 0: + logger.info(f"本轮处理完成,共处理 {processed} 个文件") + except Exception as e: + logger.error(f"轮询出错: {e}") + + # 等待下一轮 + time.sleep(POLL_INTERVAL) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/start_import_keywords.sh b/start_import_keywords.sh new file mode 100644 index 0000000..aa6f576 --- /dev/null +++ b/start_import_keywords.sh @@ -0,0 +1,394 @@ +#!/bin/bash + +# ============================================ +# 关键词导入系统管理脚本 +# 支持进程数量控制 +# ============================================ + +# 配置区 +BASE_DIR="/home/work/ai_import_quary" +VENV_PYTHON="/home/work/keyword_crawl/venv/bin/python" + +# import_keywords 配置 +IMPORT_SCRIPT="${BASE_DIR}/import_keywords.py" +IMPORT_PID_FILE="${BASE_DIR}/import_keywords.pid" +IMPORT_LOG_FILE="${BASE_DIR}/import_keywords.log" +IMPORT_MAX_PROCESSES=1 # 限制最多1个进程 + +# 颜色定义 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# 获取脚本正在运行的进程数量 +get_process_count() { + local script_name=$(basename "$1") + pgrep -f "$script_name" 2>/dev/null | wc -l +} + +# 获取所有相关进程的PID +get_all_pids() { + local script_name=$(basename "$1") + pgrep -f "$script_name" 2>/dev/null | tr '\n' ' ' +} + +# 启动服务(带进程数量控制) +start_single() { + local script=$1 + local pid_file=$2 + local log_file=$3 + local name=$4 + local max_processes=$5 + + local script_name=$(basename "$script") + local current_count=$(get_process_count "$script") + + # 检查是否超过最大进程数 + if [ $current_count -ge $max_processes ]; then + echo -e "${YELLOW}${name} 已达到最大进程数 (${current_count}/${max_processes}),跳过启动${NC}" + local first_pid=$(pgrep -f "$script_name" | head -n1) + if [ -n "$first_pid" ]; then + echo "$first_pid" > "$pid_file" + fi + return 0 + fi + + # 检查PID文件记录的进程 + if [ -f "$pid_file" ]; then + local pid=$(cat "$pid_file" 2>/dev/null) + if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then + echo -e "${YELLOW}${name} 已在运行(PID文件记录),PID: ${pid}${NC}" + return 0 + fi + fi + + echo -e "${BLUE}正在启动 ${name}(守护模式)...${NC}" + + # 确保日志目录存在 + mkdir -p "$(dirname "$log_file")" + + # 备份旧日志 + if [ -f "$log_file" ]; then + local backup_log="${log_file}.$(date +%Y%m%d_%H%M%S).bak" + cp "$log_file" "$backup_log" + echo -e "${BLUE}旧日志已备份到: ${backup_log}${NC}" + fi + + # 启动守护进程 + nohup "$VENV_PYTHON" "$script" >> "$log_file" 2>&1 & + local new_pid=$! + + # 等待进程真正启动 + sleep 2 + + # 验证进程是否启动成功 + if kill -0 "$new_pid" 2>/dev/null; then + echo "$new_pid" > "$pid_file" + echo -e "${GREEN}${name} 已启动,PID: ${new_pid}${NC}" + echo -e "${BLUE}日志文件: ${log_file}${NC}" + return 0 + else + echo -e "${RED}${name} 启动失败,请检查日志${NC}" + tail -20 "$log_file" + rm -f "$pid_file" + return 1 + fi +} + +# 停止服务的所有实例 +stop_single_all() { + local script=$1 + local name=$2 + local pid_file=$3 + + local script_name=$(basename "$script") + local pids=$(get_all_pids "$script") + local count=$(get_process_count "$script") + + if [ $count -eq 0 ]; then + echo -e "${YELLOW}${name} 没有运行中的进程${NC}" + rm -f "$pid_file" + return 0 + fi + + echo -e "${BLUE}正在停止 ${name} (${count}个进程)...${NC}" + echo -e "进程PIDs: ${pids}" + + # 首先尝试优雅终止 + for pid in $pids; do + if kill -0 "$pid" 2>/dev/null; then + echo -e " 发送SIGTERM到 PID $pid..." + kill "$pid" + fi + done + + # 等待优雅退出 + local wait_time=10 + local remaining=$count + for i in $(seq 1 $wait_time); do + remaining=$(get_process_count "$script") + if [ $remaining -eq 0 ]; then + break + fi + echo -n "." + sleep 1 + done + + echo "" + + # 检查是否还有进程残留 + remaining=$(get_process_count "$script") + if [ $remaining -gt 0 ]; then + echo -e "${YELLOW}还有 ${remaining} 个进程未退出,强制终止...${NC}" + pids=$(get_all_pids "$script") + for pid in $pids; do + if kill -0 "$pid" 2>/dev/null; then + kill -9 "$pid" 2>/dev/null + fi + done + sleep 2 + fi + + # 验证所有进程都已停止 + remaining=$(get_process_count "$script") + if [ $remaining -eq 0 ]; then + echo -e "${GREEN}${name} 所有进程已停止${NC}" + rm -f "$pid_file" + return 0 + else + echo -e "${RED}警告:仍有 ${remaining} 个进程无法终止${NC}" + return 1 + fi +} + +# 启动服务 +start() { + echo -e "${BLUE}========== 启动关键词导入系统 ==========${NC}" + echo -e "${YELLOW}进程限制:最多启动1个实例${NC}" + echo "" + + start_single "$IMPORT_SCRIPT" "$IMPORT_PID_FILE" "$IMPORT_LOG_FILE" "import_keywords" "$IMPORT_MAX_PROCESSES" + + echo -e "${BLUE}========================================${NC}" +} + +# 停止服务 +stop() { + echo -e "${BLUE}========== 停止关键词导入系统 ==========${NC}" + + stop_single_all "$IMPORT_SCRIPT" "import_keywords" "$IMPORT_PID_FILE" + + echo -e "${BLUE}========================================${NC}" +} + +# 强制停止 +force-stop() { + echo -e "${RED}========== 强制停止关键词导入进程 ==========${NC}" + + # 停止守护进程 + if [ -f "$IMPORT_PID_FILE" ]; then + local pid=$(cat "$IMPORT_PID_FILE" 2>/dev/null) + if [ -n "$pid" ]; then + kill -9 "$pid" 2>/dev/null + fi + fi + + # 停止所有相关进程 + pkill -9 -f "import_keywords.py" 2>/dev/null + + sleep 2 + + rm -f "$IMPORT_PID_FILE" + + local remaining=$(pgrep -f "import_keywords" | wc -l) + if [ $remaining -eq 0 ]; then + echo -e "${GREEN}✅ 所有进程已强制停止${NC}" + else + echo -e "${RED}❌ 仍有 ${remaining} 个进程存活${NC}" + pgrep -f "import_keywords" | xargs ps -fp 2>/dev/null + fi + + echo -e "${RED}==========================================${NC}" +} + +# 重启服务 +restart() { + echo -e "${BLUE}========== 重启关键词导入系统 ==========${NC}" + + stop + if [ $? -eq 0 ]; then + sleep 3 + start + else + echo -e "${RED}停止服务失败,请使用 force-restart${NC}" + return 1 + fi + + echo -e "${BLUE}========================================${NC}" +} + +# 强制重启 +force-restart() { + echo -e "${YELLOW}========== 强制重启关键词导入系统 ==========${NC}" + + force-stop + sleep 3 + start + + echo -e "${YELLOW}============================================${NC}" +} + +# 显示状态 +status() { + echo -e "${BLUE}========== 关键词导入系统状态 ==========${NC}" + echo -e "${BLUE}系统时间: $(date)${NC}" + echo -e "${BLUE}工作目录: ${BASE_DIR}${NC}" + echo "" + + local count=$(get_process_count "$IMPORT_SCRIPT") + echo -e "${YELLOW}📊 进程状态:${NC}" + echo -e " 进程数: ${count}" + + if [ $count -gt 0 ]; then + local pids=$(get_all_pids "$IMPORT_SCRIPT") + echo -e " 进程PIDs: ${pids}" + + # 显示CPU和内存使用 + for pid in $pids; do + local cpu=$(ps -p $pid -o %cpu --no-headers 2>/dev/null | tr -d ' ') + local mem=$(ps -p $pid -o %mem --no-headers 2>/dev/null | tr -d ' ') + local runtime=$(ps -p $pid -o etime --no-headers 2>/dev/null | tr -d ' ') + echo -e " PID ${pid}: CPU ${cpu}%, 内存 ${mem}%, 运行时间 ${runtime}" + done + else + echo -e "${YELLOW}没有运行中的进程${NC}" + fi + + echo "" + echo -e "${YELLOW}📁 PID文件状态:${NC}" + if [ -f "$IMPORT_PID_FILE" ]; then + local pid=$(cat "$IMPORT_PID_FILE" 2>/dev/null) + if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then + echo -e " ${GREEN}✓ import_keywords.pid: 有效 (PID: $pid)${NC}" + else + echo -e " ${RED}✗ import_keywords.pid: 无效或进程不存在${NC}" + fi + else + echo -e " ${YELLOW}○ import_keywords.pid: 不存在${NC}" + fi + + echo "" + echo -e "${YELLOW}📝 最近日志:${NC}" + if [ -f "$IMPORT_LOG_FILE" ]; then + echo -e "${BLUE}--- import_keywords.log (最后10行) ---${NC}" + tail -10 "$IMPORT_LOG_FILE" 2>/dev/null + else + echo -e "${YELLOW}日志文件不存在${NC}" + fi + + echo -e "${BLUE}========================================${NC}" +} + +# 查看日志 +logs() { + local lines=${1:-50} + echo -e "${BLUE}========== 查看日志 (最后 ${lines} 行) ==========${NC}" + + if [ -f "$IMPORT_LOG_FILE" ]; then + tail -$lines "$IMPORT_LOG_FILE" + else + echo -e "${YELLOW}日志文件不存在${NC}" + fi + + echo -e "${BLUE}============================================${NC}" +} + +# 实时查看日志 +logs-follow() { + echo -e "${BLUE}========== 实时查看日志 (Ctrl+C 退出) ==========${NC}" + tail -f "$IMPORT_LOG_FILE" +} + +# 显示帮助 +show_help() { + echo -e "${GREEN}关键词导入系统管理脚本${NC}" + echo "" + echo -e "${YELLOW}当前配置:${NC}" + echo -e " 工作目录: ${BASE_DIR}" + echo -e " 最大进程数: ${IMPORT_MAX_PROCESSES}" + echo -e " 监控目录: query_upload/" + echo -e " 轮询间隔: 60秒" + echo "" + echo -e "${BLUE}用法: $0 {命令}${NC}" + echo "" + echo -e "${GREEN}服务管理:${NC}" + echo -e " ${YELLOW}start${NC} 启动服务(定频轮询模式)" + echo -e " ${YELLOW}stop${NC} 停止服务" + echo -e " ${YELLOW}force-stop${NC} 强制停止所有进程" + echo -e " ${YELLOW}restart${NC} 重启服务" + echo -e " ${YELLOW}force-restart${NC} 强制重启" + echo "" + echo -e "${GREEN}状态查看:${NC}" + echo -e " ${YELLOW}status${NC} 显示进程状态" + echo -e " ${YELLOW}logs [N]${NC} 查看最后N行日志(默认50)" + echo -e " ${YELLOW}logs-follow${NC} 实时查看日志" + echo "" + echo -e "${GREEN}其他:${NC}" + echo -e " ${YELLOW}help${NC} 显示帮助" + echo "" + echo -e "${GREEN}工作流程:${NC}" + echo -e " 1. 定频轮询 query_upload/ 目录" + echo -e " 2. 发现Excel文件后自动处理" + echo -e " 3. 读取query列和科室列" + echo -e " 4. 查询科室ID (ai_departments)" + echo -e " 5. 随机获取作者信息 (ai_authors)" + echo -e " 6. 判重后插入 baidu_keyword 表" + echo -e " 7. 处理完成后删除源文件" + echo "" + echo -e "${GREEN}示例:${NC}" + echo -e " $0 start # 启动定频轮询服务" + echo -e " $0 status # 查看运行状态" + echo -e " $0 logs-follow # 实时查看日志" + echo -e " $0 restart # 重启服务" +} + +# 主逻辑 +case "$1" in + start) + start + ;; + stop) + stop + ;; + force-stop) + force-stop + ;; + restart) + restart + ;; + force-restart) + force-restart + ;; + status) + status + ;; + logs) + logs $2 + ;; + logs-follow) + logs-follow + ;; + help|--help|-h) + show_help + ;; + *) + echo -e "${RED}错误:未知命令 '$1'${NC}" + echo "" + show_help + exit 1 + ;; +esac + +exit 0 diff --git a/update_keywords_from_excel.py b/update_keywords_from_excel.py new file mode 100644 index 0000000..a3090fa --- /dev/null +++ b/update_keywords_from_excel.py @@ -0,0 +1,320 @@ +""" +根据Excel文件更新baidu_keyword表中符合条件的记录 +""" +import pandas as pd +import logging +import os +import time +from database_config import DatabaseManager +from datetime import datetime + +# 配置日志 +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def read_excel_keywords_with_department(excel_path, query_column='query', department_column='科室'): + """ + 读取Excel文件中的关键词和部门信息 + + Args: + excel_path: Excel文件路径 + query_column: query列名,默认为'query' + department_column: 部门列名,默认为'科室' + + Returns: + 包含(keyword, department)元组的列表 + """ + try: + # 读取Excel文件 + df = pd.read_excel(excel_path) + logger.info(f"成功读取Excel文件: {excel_path}") + logger.info(f"Excel文件包含 {len(df)} 行数据") + logger.info(f"Excel列名: {df.columns.tolist()}") + + # 检查query列和department列是否存在 + if query_column not in df.columns: + logger.error(f"未找到query列: {query_column}") + return [] + + if department_column not in df.columns: + logger.error(f"未找到department列: {department_column}") + return [] + + # 获取query和department数据 + query_data = df[query_column].dropna() + department_data = df[department_column].dropna() + + # 对齐数据长度,取最短长度 + min_length = min(len(query_data), len(department_data)) + query_list = query_data.iloc[:min_length].tolist() + department_list = department_data.iloc[:min_length].tolist() + + # 组合关键词和部门信息 + keyword_dept_pairs = [] + for i in range(min_length): + keyword = str(query_list[i]).strip() + department = str(department_list[i]).strip() + if keyword and department: # 确保关键词和部门都不为空 + keyword_dept_pairs.append((keyword, department)) + + # 去除重复项,保留第一个出现的组合 + seen = set() + unique_keyword_dept_pairs = [] + for keyword, dept in keyword_dept_pairs: + if (keyword, dept) not in seen: + seen.add((keyword, dept)) + unique_keyword_dept_pairs.append((keyword, dept)) + + logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词-部门组合") + + return unique_keyword_dept_pairs + + except Exception as e: + logger.error(f"读取Excel文件失败: {e}", exc_info=True) + raise + + +def get_department_id(db_manager, department_name): + """ + 根据科室名称从ai_departments表中获取对应的ID + + Args: + db_manager: 数据库管理器实例 + department_name: 科室名称 + + Returns: + 科室ID,如果未找到则抛出异常 + """ + try: + # 查询科室ID - 使用正确的字段名 + sql = "SELECT id FROM ai_departments WHERE department_name = %s" + result = db_manager.execute_query(sql, (department_name,), fetch_one=True) + + if result: + return result[0] # 返回ID + else: + error_msg = f"未找到科室 '{department_name}' 的ID,请先在ai_departments表中添加该科室" + logger.error(error_msg) + raise ValueError(error_msg) + + except Exception as e: + logger.error(f"查询科室ID失败: {e}", exc_info=True) + raise + + +def get_author_info_by_department(db_manager, department_id): + """ + 根据科室ID从ai_authors表中获取任一符合条件的作者信息 + + Args: + db_manager: 数据库管理器实例 + department_id: 科室ID + + Returns: + (author_id, author_name) 元组,如果未找到则返回 (0, '') + """ + try: + # 查询符合条件的作者信息 + sql = "SELECT id, author_name FROM ai_authors WHERE department_id = %s AND status = 'active' AND daily_post_max > 0 LIMIT 1" + result = db_manager.execute_query(sql, (department_id,), fetch_one=True) + + if result: + return result[0], result[1] # 返回 author_id, author_name + else: + logger.warning(f"未找到科室ID {department_id} 下符合条件的活跃作者") + return 0, '' # 返回默认值而不是None + + except Exception as e: + logger.error(f"查询作者信息失败: {e}", exc_info=True) + return 0, '' # 返回默认值 + + +def update_keywords_from_excel(db_manager, keyword_dept_pairs, batch_size=100, sleep_seconds=0.1): + """ + 根据Excel文件更新baidu_keyword表中符合条件的记录 + + Args: + db_manager: 数据库管理器实例 + keyword_dept_pairs: 包含(keyword, department)元组的列表 + batch_size: 日志批次大小,每多少条记录输出一次进度 + sleep_seconds: 每条记录间隔睡眠时间(秒),默认0.1秒 + + Returns: + 成功更新的数量 + """ + if not keyword_dept_pairs: + logger.warning("没有关键词需要更新") + return 0 + + try: + logger.info(f"开始更新 {len(keyword_dept_pairs)} 个关键词-部门组合到数据库...") + logger.info("采用逐条查询+更新模式,只更新存在的关键词") + + # 准备SQL语句 - 符合指定条件的查询 + check_sql = "SELECT COUNT(*) FROM baidu_keyword WHERE keyword = %s AND seed_id = 9999 AND created_at > '2026-01-28 12:00:00' AND created_at < '2026-01-28 19:53:00' AND query_status = 'manual_review'" + update_sql = """ + UPDATE baidu_keyword + SET department = %s, department_id = %s, author_id = %s, author_name = %s + WHERE keyword = %s AND seed_id = 9999 AND created_at > '2026-01-28 12:00:00' AND created_at < '2026-01-28 19:53:00' AND query_status = 'manual_review' + """ + + success_count = 0 + skip_count = 0 + failed_count = 0 + + # 逐条处理 + for idx, (keyword, department) in enumerate(keyword_dept_pairs, 1): + try: + logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 部门: {department}') + + # 1. 查询关键词是否存在(在指定条件下) + result = db_manager.execute_query(check_sql, (keyword,), fetch_one=True) + exists = result[0] > 0 if result else False + + if not exists: + skip_count += 1 + logger.debug(f'[调试] 关键词不存在于指定条件中,跳过: {keyword}') + continue # 跳过不存在的关键词 + + # 2. 获取科室ID(必须存在,否则抛出异常) + dept_id = get_department_id(db_manager, department) + + # 3. 获取作者信息 + author_id, author_name = get_author_info_by_department(db_manager, dept_id) + + # 4. 存在则更新 + logger.debug(f'[调试] 准备更新: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}') + affected = db_manager.execute_update( + update_sql, + (department, dept_id, author_id, author_name, keyword), + autocommit=True + ) + + if affected > 0: + success_count += 1 + logger.debug(f'[调试] 更新成功: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}') + + # 5. 输出进度 + if idx % batch_size == 0 or idx == len(keyword_dept_pairs): + progress = (idx / len(keyword_dept_pairs)) * 100 + logger.info(f'[更新进度] {idx}/{len(keyword_dept_pairs)} ({progress:.1f}%) | 成功: {success_count} | 跳过: {skip_count} | 失败: {failed_count}') + + # 6. 每次执行完sleep + time.sleep(sleep_seconds) + + except ValueError as ve: + # 遇到科室不存在的错误,跳过该条记录继续 + logger.error(f'[错误] 第 {idx} 条记录遇到错误: {ve}') + failed_count += 1 + continue + except Exception as e: + failed_count += 1 + logger.warning(f'[调试] 处理失败 [{idx}/{len(keyword_dept_pairs)}]: keyword={keyword}, 部门={department},错误:{e}') + + logger.info(f"更新完成!成功更新: {success_count} | 跳过不存在: {skip_count} | 失败: {failed_count}") + return success_count + + except Exception as e: + logger.error(f"更新关键词失败: {e}", exc_info=True) + raise + + +def main(): + """主函数""" + # Excel文件路径 + excel_path = '/home/work/ai_improt_quary/副本query表-0128第一批.xlsx' + + # 创建数据库连接配置 + db_config = { + 'host': '8.149.233.36', + 'port': 3306, + 'user': 'ai_article_read', + 'password': '7aK_H2yvokVumr84lLNDt8fDBp6P', + 'database': 'ai_article', + 'charset': 'utf8mb4' + } + + logger.info("=" * 60) + logger.info("开始根据Excel更新baidu_keyword表中符合条件的记录") + logger.info(f"数据库配置: {db_config['user']}@{db_config['host']}:3306/{db_config['database']}") + logger.info("=" * 60) + + # 创建数据库管理器 + db_manager = DatabaseManager(db_config) + + try: + # 1. 读取Excel文件 + keyword_dept_pairs = read_excel_keywords_with_department(excel_path, 'query', '科室') + + if not keyword_dept_pairs: + logger.warning("没有可更新的关键词,程序退出") + return + + # 询问用户是要更新全部数据还是部分测试 + print(f"\nExcel中共有 {len(keyword_dept_pairs)} 条数据") + while True: + choice = input("请选择更新方式: A) 全部更新 B) 测试模式(输入前N条数据): ").strip().upper() + if choice == 'A': + # 全部更新 + break + elif choice == 'B': + try: + test_count = int(input(f"请输入要测试的条数 (1-{len(keyword_dept_pairs)}): ")) + if 1 <= test_count <= len(keyword_dept_pairs): + keyword_dept_pairs = keyword_dept_pairs[:test_count] + print(f"已选择更新前 {test_count} 条数据进行测试") + break + else: + print(f"输入超出范围,请输入1到{len(keyword_dept_pairs)}之间的数字") + except ValueError: + print("请输入有效的数字") + else: + print("请输入 A 或 B") + + if not keyword_dept_pairs: + logger.warning("没有可更新的关键词,程序退出") + return + + # 打印前10个关键词-部门组合作为预览 + logger.info(f"\n关键词-部门预览(前10个):") + for i, (keyword, department) in enumerate(keyword_dept_pairs[:10], 1): + logger.info(f" {i}. {keyword} (部门: {department})") + + if len(keyword_dept_pairs) > 10: + logger.info(f" ... 还有 {len(keyword_dept_pairs) - 10} 个关键词-部门组合") + + # 2. 确认更新 + print("\n" + "=" * 60) + print(f"即将更新 {len(keyword_dept_pairs)} 个关键词-部门组合到 baidu_keyword 表") + print(f"条件: seed_id=9999 AND created_at BETWEEN '2026-01-28 12:00:00' AND '2026-01-28 19:53:00' AND query_status='manual_review'") + confirm = input("确认更新? (y/n): ").strip().lower() + + if confirm != 'y': + logger.info("用户取消更新") + return + + # 3. 执行更新 + success_count = update_keywords_from_excel( + db_manager=db_manager, + keyword_dept_pairs=keyword_dept_pairs, + batch_size=1, + sleep_seconds=0.1 + ) + + logger.info("=" * 60) + logger.info(f"✓ 更新完成!共成功更新 {success_count} 个关键词") + logger.info("=" * 60) + + except Exception as e: + logger.error(f"✗ 更新过程出错: {e}", exc_info=True) + logger.info("=" * 60) + logger.info("✗ 更新失败") + logger.info("=" * 60) + + +if __name__ == '__main__': + main() \ No newline at end of file