commit 8a92380b56f9e736757cd4c4cc28d9194a899ab0 Author: shengyudong@yunqueai.net <“shengyudong@yunqueai.net”> Date: Thu Feb 5 21:34:39 2026 +0800 Initial commit: AI关键词导入工具 diff --git a/README.md b/README.md new file mode 100644 index 0000000..3139ec5 --- /dev/null +++ b/README.md @@ -0,0 +1,150 @@ +# AI 关键词导入工具 + +## 项目简介 + +本项目是一个从 Excel 文件批量导入关键词到 MySQL 数据库的工具,支持自动关联科室信息和作者信息。 + +## 功能特性 + +- 从 Excel 文件读取关键词和科室信息 +- 自动去重,避免重复导入 +- 支持批量导入或测试模式(导入指定条数) +- 自动查询科室 ID 和对应的作者信息 +- 导入完成后自动清理已处理的 Excel 文件 +- 详细的日志输出,便于追踪导入过程 + +## 环境要求 + +- Python 3.7+ +- MySQL 数据库 + +## 安装依赖 + +```bash +pip install -r requirements.txt +``` + +## 使用方法 + +### 1. 准备 Excel 文件 + +在项目根目录下创建 `query_upload` 文件夹,并将 Excel 文件放入该文件夹中。 + +Excel 文件需要包含以下列: +- `query` 列:关键词列(必需) +- `科室` 列:科室信息列(可选,如不存在则不关联科室) + +### 2. 配置数据库 + +编辑 `database_config.py` 文件,修改数据库连接配置: + +```python +DB_CONFIG = { + 'host': 'your_host', + 'user': 'your_user', + 'password': 'your_password', + 'database': 'your_database', + 'charset': 'utf8mb4' +} +``` + +### 3. 运行导入工具 + +```bash +python import_keywords.py +``` + +### 4. 命令行参数(可选) + +```bash +python import_keywords.py --host 127.0.0.1 --port 3306 --user root --password your_password --database ai_article +``` + +可用参数: +- `--host`: 数据库主机(默认使用配置文件中的值) +- `--port`: 数据库端口(默认 3306) +- `--user`: 数据库用户名(默认使用配置文件中的值) +- `--password`: 数据库密码(默认使用配置文件中的值) +- `--database`: 数据库名(默认使用配置文件中的值) +- `--batch-size`: 日志批次大小(默认 1) +- `--sleep`: 每条记录间隔时间秒数(默认 0.1) +- `--query-column`: Excel 中的关键词列名(默认 'query') +- `--dept-column`: Excel 中的科室列名(默认 '科室') +- `--seed-id`: 种子 ID(默认 9999) +- `--seed-name`: 种子名称(默认 '手动提交') + +## 数据库表结构 + +### baidu_keyword 表(目标表) + +导入的数据会插入到 `baidu_keyword` 表,字段包括: +- `keyword`: 关键词 +- `seed_id`: 种子 ID +- `seed_name`: 种子名称 +- `crawled`: 是否已爬取 +- `department`: 科室名称 +- `department_id`: 科室 ID +- `query_status`: 查询状态(固定为 'manual_review') +- `author_id`: 作者 ID +- `author_name`: 作者名称 + +### ai_departments 表(科室表) + +需要预先存在,用于查询科室 ID: +- `id`: 科室 ID +- `department_name`: 科室名称 + +### ai_authors 表(作者表) + +需要预先存在,用于查询作者信息: +- `id`: 作者 ID +- `author_name`: 作者名称 +- `department_id`: 所属科室 ID +- `status`: 状态(需为 'active') +- `daily_post_max`: 每日最大发布数(需大于 0) + +## 工作流程 + +1. 检测 `query_upload` 文件夹中的 Excel 文件 +2. 如果有多个文件,提示用户选择 +3. 读取 Excel 文件中的关键词和科室信息 +4. 提示用户选择全部导入或测试模式 +5. 显示前 10 条数据预览 +6. 确认后开始导入: + - 查询关键词是否已存在 + - 如果存在则跳过 + - 如果不存在,根据科室名称查询科室 ID + - 根据科室 ID 查询符合条件的作者信息 + - 插入数据到 `baidu_keyword` 表 +7. 导入成功后自动清理 `query_upload` 文件夹中的 Excel 文件 + +## 注意事项 + +1. 确保 `ai_departments` 表中已存在 Excel 中提到的所有科室 +2. 确保每个科室在 `ai_authors` 表中至少有一个状态为 'active' 且 `daily_post_max > 0` 的作者 +3. 如果科室不存在,导入过程会中断并报错 +4. 导入成功后,原始 Excel 文件会被自动删除 +5. 如果导入失败,Excel 文件会保留以便排查问题 + +## 项目结构 + +``` +ai_import_quary/ +├── database_config.py # 数据库配置和管理模块 +├── import_keywords.py # 关键词导入主程序 +├── requirements.txt # Python 依赖包 +├── README.md # 项目说明文档 +└── query_upload/ # Excel 文件存放目录(需手动创建) +``` + +## 日志说明 + +程序运行过程中会输出详细的日志信息: +- `INFO` 级别:一般信息和进度提示 +- `DEBUG` 级别:详细的调试信息 +- `WARNING` 级别:警告信息 +- `ERROR` 级别:错误信息 + +## 许可证 + +本项目仅供内部使用。 diff --git a/database_config.py b/database_config.py new file mode 100644 index 0000000..d5d7455 --- /dev/null +++ b/database_config.py @@ -0,0 +1,160 @@ +""" +数据库配置管理模块 +统一管理数据库连接和SQL操作 +""" +import pymysql +import logging + +logger = logging.getLogger(__name__) + +# 数据库配置 +DB_CONFIG = { + 'host': '8.149.233.36', + 'user': 'ai_article_read', + 'password': '7aK_H2yvokVumr84lLNDt8fDBp6P', + 'database': 'ai_article', + 'charset': 'utf8mb4' +} + + +class DatabaseManager: + """数据库管理器:统一管理数据库连接和操作""" + + def __init__(self, config=None): + """初始化数据库管理器 + + Args: + config: 数据库配置字典,默认使用 DB_CONFIG + """ + self.config = config or DB_CONFIG + + def get_connection(self, autocommit=False): + """获取数据库连接 + + Args: + autocommit: 是否启用自动提交模式 + + Returns: + pymysql连接对象 + """ + return pymysql.connect(**self.config, autocommit=autocommit) + + def execute_query(self, sql, params=None, fetch_one=False): + """执行查询SQL(SELECT) + + Args: + sql: SQL语句 + params: SQL参数(tuple或list) + fetch_one: True返回单条记录,False返回所有记录 + + Returns: + 查询结果 + """ + conn = None + cursor = None + try: + conn = self.get_connection() + cursor = conn.cursor() + + logger.info(f'[SQL] {sql.strip()} | params: {params}') + cursor.execute(sql, params or ()) + + if fetch_one: + result = cursor.fetchone() + else: + result = cursor.fetchall() + + logger.debug(f'[SQL结果] 返回 {len(result) if not fetch_one and result else (1 if result else 0)} 条记录') + return result + except Exception as e: + logger.error(f'执行查询失败:{e}', exc_info=True) + raise + finally: + if cursor: + cursor.close() + if conn: + conn.close() + + def execute_update(self, sql, params=None, autocommit=True): + """执行更新SQL(INSERT/UPDATE/DELETE) + + Args: + sql: SQL语句 + params: SQL参数(tuple或list) + autocommit: 是否自动提交 + + Returns: + 影响的行数 + """ + conn = None + cursor = None + try: + conn = self.get_connection(autocommit=autocommit) + cursor = conn.cursor() + + logger.info(f'[SQL] {sql.strip()} | params: {params}') + result = cursor.execute(sql, params or ()) + + if not autocommit: + conn.commit() + + logger.info(f'[SQL执行] 影响 {result} 行') + return result + except Exception as e: + if not autocommit and conn: + conn.rollback() + logger.error(f'执行更新失败:{e}', exc_info=True) + raise + finally: + if cursor: + cursor.close() + if conn: + conn.close() + + def execute_many(self, sql, params_list, autocommit=True): + """批量执行SQL + + Args: + sql: SQL语句 + params_list: 参数列表,每个元素是一组参数 + autocommit: 是否自动提交 + + Returns: + 成功执行的行数 + """ + conn = None + cursor = None + try: + conn = self.get_connection(autocommit=autocommit) + cursor = conn.cursor() + + logger.info(f'[SQL批量] {sql.strip()} | 批次数: {len(params_list)}') + + success_count = 0 + for params in params_list: + try: + result = cursor.execute(sql, params) + if result > 0: + success_count += 1 + except Exception as e: + logger.debug(f'批量执行跳过:params={params},错误:{e}') + + if not autocommit: + conn.commit() + + logger.info(f'[SQL批量执行] 成功 {success_count}/{len(params_list)} 条') + return success_count + except Exception as e: + if not autocommit and conn: + conn.rollback() + logger.error(f'批量执行失败:{e}', exc_info=True) + raise + finally: + if cursor: + cursor.close() + if conn: + conn.close() + + +# 创建全局数据库管理器实例 +db_manager = DatabaseManager() diff --git a/import_keywords.py b/import_keywords.py new file mode 100644 index 0000000..1a0ca87 --- /dev/null +++ b/import_keywords.py @@ -0,0 +1,427 @@ +""" +从Excel文件导入关键词到baidu_keyword表 +""" +import pandas as pd +import logging +import argparse +import os +import time +import glob +import shutil +from database_config import DatabaseManager +from datetime import datetime + +# 配置日志 +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def read_excel_keywords_with_department(excel_path, query_column='query', department_column='科室'): + """ + 读取Excel文件中的关键词和部门信息 + + Args: + excel_path: Excel文件路径 + query_column: query列名,默认为'query' + department_column: 部门列名,默认为'科室',如果为None则不读取部门信息 + + Returns: + 包含(keyword, department)元组的列表,如果没有部门列则department为None + """ + try: + # 读取Excel文件 + df = pd.read_excel(excel_path) + logger.info(f"成功读取Excel文件: {excel_path}") + logger.info(f"Excel文件包含 {len(df)} 行数据") + logger.info(f"Excel列名: {df.columns.tolist()}") + + # 检查query列是否存在 + if query_column not in df.columns: + logger.error(f"未找到query列: {query_column}") + return [] + + # 检查是否有部门列 + has_department = department_column and department_column in df.columns + if department_column and not has_department: + logger.warning(f"未找到department列: {department_column},将不使用部门信息") + + # 获取query数据 + query_data = df[query_column].dropna() + query_list = query_data.tolist() + + # 根据是否有部门列,组合数据 + keyword_dept_pairs = [] + if has_department: + # 有部门列,获取部门数据 + department_data = df[department_column].dropna() + # 对齐数据长度,取最短长度 + min_length = min(len(query_data), len(department_data)) + query_list = query_data.iloc[:min_length].tolist() + department_list = department_data.iloc[:min_length].tolist() + + for i in range(min_length): + keyword = str(query_list[i]).strip() + department = str(department_list[i]).strip() + if keyword and department: # 确保关键词和部门都不为空 + keyword_dept_pairs.append((keyword, department)) + else: + # 没有部门列,只提取关键词 + logger.info("没有部门列,将只导入关键词,不指定科室") + for keyword in query_list: + keyword = str(keyword).strip() + if keyword: # 确保关键词不为空 + keyword_dept_pairs.append((keyword, None)) + + # 去除重复项,保留第一个出现的组合 + seen = set() + unique_keyword_dept_pairs = [] + for keyword, dept in keyword_dept_pairs: + if (keyword, dept) not in seen: + seen.add((keyword, dept)) + unique_keyword_dept_pairs.append((keyword, dept)) + + if has_department: + logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词-部门组合") + else: + logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词") + + return unique_keyword_dept_pairs + + except Exception as e: + logger.error(f"读取Excel文件失败: {e}", exc_info=True) + raise + + +def get_department_id(db_manager, department_name): + """ + 根据科室名称从ai_departments表中获取对应的ID + + Args: + db_manager: 数据库管理器实例 + department_name: 科室名称 + + Returns: + 科室ID,如果未找到则抛出异常 + """ + try: + # 查询科室ID - 使用正确的字段名 + sql = "SELECT id FROM ai_departments WHERE department_name = %s" + result = db_manager.execute_query(sql, (department_name,), fetch_one=True) + + if result: + return result[0] # 返回ID + else: + error_msg = f"未找到科室 '{department_name}' 的ID,请先在ai_departments表中添加该科室" + logger.error(error_msg) + raise ValueError(error_msg) + + except Exception as e: + logger.error(f"查询科室ID失败: {e}", exc_info=True) + raise + + +def get_author_info_by_department(db_manager, department_id): + """ + 根据科室ID从ai_authors表中获取任一符合条件的作者信息 + + Args: + db_manager: 数据库管理器实例 + department_id: 科室ID + + Returns: + (author_id, author_name) 元组,如果未找到则返回 (None, None) + """ + try: + # 查询符合条件的作者信息 + sql = "SELECT id, author_name FROM ai_authors WHERE department_id = %s AND status = 'active' AND daily_post_max > 0 LIMIT 1" + result = db_manager.execute_query(sql, (department_id,), fetch_one=True) + + if result: + return result[0], result[1] # 返回 author_id, author_name + else: + logger.warning(f"未找到科室ID {department_id} 下符合条件的活跃作者") + return None, None + + except Exception as e: + logger.error(f"查询作者信息失败: {e}", exc_info=True) + return None, None + + +def import_keywords_to_db(db_manager, keyword_dept_pairs, seed_id=9999, seed_name='手动提交', crawled=1, batch_size=100, sleep_seconds=0.1): + """ + 将关键词批量导入到baidu_keyword表 + + Args: + db_manager: 数据库管理器实例 + keyword_dept_pairs: 包含(keyword, department)元组的列表 + seed_id: 种子ID,默认9999 + seed_name: 种子名称,默认'手动提交' + crawled: 是否已爬取,默认1 + batch_size: 日志批次大小,每多少条记录输出一次进度 + sleep_seconds: 每条记录间隔睡眠时间(秒),默认0.1秒 + + Returns: + 成功导入的数量 + """ + if not keyword_dept_pairs: + logger.warning("没有关键词需要导入") + return 0 + + try: + logger.info(f"开始导入 {len(keyword_dept_pairs)} 个关键词-部门组合到数据库...") + logger.info("采用逐条查询+插入模式,避免重复") + + # 准备SQL语句 + check_sql = "SELECT COUNT(*) FROM baidu_keyword WHERE keyword = %s" + insert_sql = """ + INSERT INTO baidu_keyword (keyword, seed_id, seed_name, crawled, parents_id, created_at, department, department_id, query_status, author_id, author_name) + VALUES (%s, %s, %s, %s, 0, NOW(), %s, %s, %s, %s, %s) + """ + + success_count = 0 + skip_count = 0 + failed_count = 0 + + # 逐条处理 + for idx, (keyword, department) in enumerate(keyword_dept_pairs, 1): + try: + if department: + logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 部门: {department}') + else: + logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 无部门信息') + + # 1. 如果有部门信息,获取科室ID和作者信息 + if department: + dept_id = get_department_id(db_manager, department) + author_id, author_name = get_author_info_by_department(db_manager, dept_id) + else: + # 没有部门信息,使用默认值(空字符串而不是None,避免数据库NOT NULL约束) + department = '' + dept_id = 0 + author_id = 0 + author_name = '' + + # 2. 查询关键词是否存在 + result = db_manager.execute_query(check_sql, (keyword,), fetch_one=True) + exists = result[0] > 0 if result else False + + if exists: + skip_count += 1 + logger.debug(f'[调试] 关键词已存在,跳过: {keyword}') + else: + # 3. 不存在则插入 + if department: + logger.debug(f'[调试] 准备插入: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}, query_status: manual_review') + else: + logger.debug(f'[调试] 准备插入: {keyword}, 无部门信息, query_status: manual_review') + + affected = db_manager.execute_update( + insert_sql, + (keyword, seed_id, seed_name, crawled, department, dept_id, 'manual_review', author_id, author_name), + autocommit=True + ) + + if affected > 0: + success_count += 1 + if department: + logger.debug(f'[调试] 插入成功: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}, query_status: manual_review') + else: + logger.debug(f'[调试] 插入成功: {keyword}, 无部门信息, query_status: manual_review') + + # 5. 输出进度 + if idx % batch_size == 0 or idx == len(keyword_dept_pairs): + progress = (idx / len(keyword_dept_pairs)) * 100 + logger.info(f'[插入进度] {idx}/{len(keyword_dept_pairs)} ({progress:.1f}%) | 成功: {success_count} | 跳过: {skip_count} | 失败: {failed_count}') + + # 6. 每次执行完sleep + time.sleep(sleep_seconds) + + except ValueError as ve: + # 遇到科室不存在的错误,停止整个导入过程 + logger.error(f'[错误] 第 {idx} 条记录遇到错误: {ve}') + raise ve + except Exception as e: + failed_count += 1 + logger.warning(f'[调试] 处理失败 [{idx}/{len(keyword_dept_pairs)}]: keyword={keyword}, 部门={department},错误:{e}') + + logger.info(f"导入完成!成功插入: {success_count} | 跳过已存在: {skip_count} | 失败: {failed_count}") + return success_count + + except Exception as e: + logger.error(f"导入关键词失败: {e}", exc_info=True) + raise + + +def main(): + """主函数""" + # 检查query_upload文件夹是否存在 + upload_folder = 'query_upload' + if not os.path.exists(upload_folder): + logger.error(f'未找到 {upload_folder} 文件夹') + return + + # 查找Excel文件 + excel_patterns = ['*.xlsx', '*.xls'] + excel_files = [] + for pattern in excel_patterns: + excel_files.extend(glob.glob(os.path.join(upload_folder, pattern))) + excel_files.extend(glob.glob(os.path.join(upload_folder, pattern.upper()))) + + if not excel_files: + logger.error(f'{upload_folder} 文件夹中未找到Excel文件 (.xlsx 或 .xls)') + return + + logger.info(f'在 {upload_folder} 文件夹中找到 {len(excel_files)} 个Excel文件:') + for i, file_path in enumerate(excel_files, 1): + logger.info(f' {i}. {os.path.basename(file_path)}') + + # 如果只有一个文件,直接使用;如果有多个文件,让用户选择 + if len(excel_files) == 1: + excel_path = excel_files[0] + logger.info(f'自动选择文件: {os.path.basename(excel_path)}') + else: + print('\n找到多个Excel文件,请选择要使用的文件:') + for i, file_path in enumerate(excel_files, 1): + print(f' {i}. {os.path.basename(file_path)}') + + while True: + try: + choice = int(input(f'请选择文件 (1-{len(excel_files)}): ')) + if 1 <= choice <= len(excel_files): + excel_path = excel_files[choice - 1] + break + else: + print(f'请输入1到{len(excel_files)}之间的数字') + except ValueError: + print('请输入有效的数字') + + # 从database_config.py读取默认配置 + from database_config import DB_CONFIG + + # 解析命令行参数 + parser = argparse.ArgumentParser(description='导入Excel关键词到baidu_keyword表') + parser.add_argument('--host', default=DB_CONFIG['host'], help='数据库主机') + parser.add_argument('--port', type=int, default=3306, help='数据库端口') + parser.add_argument('--user', default=DB_CONFIG['user'], help='数据库用户名') + parser.add_argument('--password', default=DB_CONFIG['password'], help='数据库密码') + parser.add_argument('--database', default=DB_CONFIG['database'], help='数据库名') + parser.add_argument('--batch-size', type=int, default=1, help='日志批次大小') + parser.add_argument('--sleep', type=float, default=0.1, help='每条记录间隔时间(秒)') + parser.add_argument('--query-column', default='query', help='Excel中的query列名,默认为query') + parser.add_argument('--dept-column', default='科室', help='Excel中的部门列名,默认为科室') + parser.add_argument('--seed-id', type=int, default=9999, help='种子ID') + parser.add_argument('--seed-name', default='手动提交', help='种子名称') + # 移除了命令行参数,改为交互式询问 + + args = parser.parse_args() + + # Excel文件路径已经在上面确定了 + # excel_path 已经被赋值 + + # 创建数据库连接配置 + db_config = { + 'host': args.host, + 'port': args.port, + 'user': args.user, + 'password': args.password, + 'database': args.database, + 'charset': DB_CONFIG['charset'] # 使用database_config.py中的字符集配置 + } + + logger.info("=" * 60) + logger.info("开始导入关键词到baidu_keyword表") + logger.info(f"数据库配置: {args.user}@{args.host}:{args.port}/{args.database}") + logger.info(f"Excel文件: {os.path.basename(excel_path)}") + logger.info("=" * 60) + + # 创建数据库管理器 + db_manager = DatabaseManager(db_config) + + try: + # 1. 读取Excel文件 + keyword_dept_pairs = read_excel_keywords_with_department(excel_path, args.query_column, args.dept_column) + + # 询问用户是要导入全部数据还是部分测试 + print(f"\nExcel中共有 {len(keyword_dept_pairs)} 条数据") + while True: + choice = input("请选择导入方式: A) 全部导入 B) 测试模式(输入前N条数据): ").strip().upper() + if choice == 'A': + # 全部导入 + break + elif choice == 'B': + try: + test_count = int(input(f"请输入要测试的条数 (1-{len(keyword_dept_pairs)}): ")) + if 1 <= test_count <= len(keyword_dept_pairs): + keyword_dept_pairs = keyword_dept_pairs[:test_count] + print(f"已选择导入前 {test_count} 条数据进行测试") + break + else: + print(f"输入超出范围,请输入1到{len(keyword_dept_pairs)}之间的数字") + except ValueError: + print("请输入有效的数字") + else: + print("请输入 A 或 B") + + if not keyword_dept_pairs: + logger.warning("没有可导入的关键词,程序退出") + return + + # 打印前10个关键词-部门组合作为预览 + logger.info(f"\n关键词-部门预览(前10个):") + for i, (keyword, department) in enumerate(keyword_dept_pairs[:10], 1): + logger.info(f" {i}. {keyword} (部门: {department})") + + if len(keyword_dept_pairs) > 10: + logger.info(f" ... 还有 {len(keyword_dept_pairs) - 10} 个关键词-部门组合") + + # 2. 确认导入 + print("\n" + "=" * 60) + print(f"即将导入 {len(keyword_dept_pairs)} 个关键词-部门组合到 baidu_keyword 表") + print(f"配置: seed_id={args.seed_id}, seed_name='{args.seed_name}', crawled=1") + confirm = input("确认导入? (y/n): ").strip().lower() + + if confirm != 'y': + logger.info("用户取消导入") + return + + # 3. 执行导入 + success_count = import_keywords_to_db( + db_manager=db_manager, + keyword_dept_pairs=keyword_dept_pairs, + seed_id=args.seed_id, + seed_name=args.seed_name, + crawled=1, + batch_size=args.batch_size, + sleep_seconds=args.sleep + ) + + logger.info("=" * 60) + logger.info(f"✓ 导入完成!共成功导入 {success_count} 个关键词") + logger.info("=" * 60) + + # 运行完成后自动删除 query_upload 文件夹中的所有文件(仅在成功时) + try: + upload_folder = 'query_upload' + for filename in os.listdir(upload_folder): + file_path = os.path.join(upload_folder, filename) + if os.path.isfile(file_path): + os.remove(file_path) + logger.info(f'已删除文件: {filename}') + logger.info('已清理 query_upload 文件夹中的所有文件') + except Exception as e: + logger.error(f'清理 query_upload 文件夹时出错: {e}') + + except Exception as e: + logger.error(f"✗ 导入过程出错: {e}", exc_info=True) + logger.info("=" * 60) + logger.info("✗ 导入失败") + logger.info("=" * 60) + + # 导入失败时不删除源文件,以便排查问题 + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..07d7b0f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +pymysql==1.1.0 +pandas==2.0.3 +openpyxl==3.1.2