feat: 定频轮询关键词导入系统
This commit is contained in:
195
README.md
Normal file
195
README.md
Normal file
@@ -0,0 +1,195 @@
|
||||
# AI关键词导入与统计工具集
|
||||
|
||||
## 项目简介
|
||||
|
||||
用于管理 `ai_article` 数据库中百度关键词数据的导入、更新和统计导出。
|
||||
|
||||
## 环境依赖
|
||||
|
||||
```bash
|
||||
pip install pandas pymysql openpyxl
|
||||
```
|
||||
|
||||
## 文件说明
|
||||
|
||||
| 文件 | 功能 |
|
||||
|------|------|
|
||||
| `database_config.py` | 数据库配置与连接管理 |
|
||||
| `import_keywords.py` | 定频轮询导入关键词到baidu_keyword表 |
|
||||
| `start_import_keywords.sh` | 服务管理脚本(启动/停止/重启/状态) |
|
||||
| `update_keywords_from_excel.py` | 根据Excel更新已有关键词记录 |
|
||||
| `export_author_stats.py` | 导出作者发文统计到CSV |
|
||||
|
||||
## 目录结构
|
||||
|
||||
```
|
||||
ai_import_quary/
|
||||
├── database_config.py # 数据库配置
|
||||
├── import_keywords.py # 关键词导入脚本(定频轮询)
|
||||
├── start_import_keywords.sh # 服务管理脚本
|
||||
├── update_keywords_from_excel.py # 关键词更新脚本
|
||||
├── export_author_stats.py # 统计导出脚本
|
||||
├── query_upload/ # Excel导入文件存放目录
|
||||
└── exports/ # CSV导出文件存放目录
|
||||
```
|
||||
|
||||
## 数据库表关系
|
||||
|
||||
```
|
||||
ai_departments (科室表)
|
||||
│ id → department_id
|
||||
▼
|
||||
ai_authors (作者表)
|
||||
│ id → author_id
|
||||
▼
|
||||
baidu_keyword (关键词表)
|
||||
│ keyword
|
||||
▼
|
||||
ai_articles (文章表)
|
||||
```
|
||||
|
||||
## 使用方法
|
||||
|
||||
### 1. 导入关键词 (import_keywords.py)
|
||||
|
||||
**服务管理**(推荐):
|
||||
```bash
|
||||
./start_import_keywords.sh start # 启动服务
|
||||
./start_import_keywords.sh stop # 停止服务
|
||||
./start_import_keywords.sh restart # 重启服务
|
||||
./start_import_keywords.sh status # 查看状态
|
||||
./start_import_keywords.sh logs # 查看日志
|
||||
./start_import_keywords.sh logs-follow # 实时日志
|
||||
```
|
||||
|
||||
**直接运行**:
|
||||
```bash
|
||||
python import_keywords.py
|
||||
```
|
||||
|
||||
**运行模式**:定频轮询(默认每60秒轮询一次)
|
||||
|
||||
**工作流程**:
|
||||
1. 脚本启动后持续监控 `query_upload/` 目录
|
||||
2. 发现Excel文件后自动处理
|
||||
3. 处理成功后自动删除源文件
|
||||
|
||||
**Excel格式要求**:
|
||||
- 必须包含 `query` 列(关键词,不能为空)
|
||||
- 必须包含 `科室` 列(部门信息)
|
||||
|
||||
**配置参数**(在脚本头部修改):
|
||||
```python
|
||||
POLL_INTERVAL = 60 # 轮询间隔(秒)
|
||||
UPLOAD_FOLDER = 'query_upload' # 监控目录
|
||||
SEED_ID = 9999 # 固定值
|
||||
SEED_NAME = '手动提交' # 固定值
|
||||
CRAWLED = 1 # 固定值
|
||||
```
|
||||
|
||||
**流程图**:
|
||||
|
||||
```
|
||||
┌─────────────────┐
|
||||
│ 定频轮询启动 │
|
||||
└────────┬────────┘
|
||||
▼
|
||||
┌─────────────────┐
|
||||
│ 读取Excel │
|
||||
│ (query+科室) │
|
||||
└────────┬────────┘
|
||||
▼
|
||||
┌─────────────────┐
|
||||
│ 查询科室 │
|
||||
│ ai_departments │
|
||||
│ → department_id│
|
||||
└────────┬────────┘
|
||||
▼
|
||||
┌─────────────────┐
|
||||
│ 指定科室 │
|
||||
│ 获取author信息 │
|
||||
│ ai_authors │
|
||||
│ → author_id │
|
||||
│ → author_name │
|
||||
└────────┬────────┘
|
||||
▼
|
||||
┌─────────────────┐
|
||||
│ 判重 │
|
||||
│ keyword不能重复 │
|
||||
└────────┬────────┘
|
||||
│
|
||||
┌────┴────┐
|
||||
│ │
|
||||
重复 不重复
|
||||
│ │
|
||||
▼ ▼
|
||||
┌──────┐ ┌───────────────────────┐
|
||||
│ 跳过 │ │ INSERT baidu_keyword │
|
||||
└──────┘ │ ┌───────────────────┐ │
|
||||
│ │ keyword = query值 │ │
|
||||
│ │ crawled = 1 │ │
|
||||
│ │ seed_id = 9999 │ │
|
||||
│ │ seed_name = 手动提交│ │
|
||||
│ │ department = 科室 │ │
|
||||
│ │ department_id = x │ │
|
||||
│ │ author_id = 随机 │ │
|
||||
│ │ author_name = 随机│ │
|
||||
│ │ query_status = │ │
|
||||
│ │ manual_review │ │
|
||||
│ └───────────────────┘ │
|
||||
└───────────────────────┘
|
||||
```
|
||||
|
||||
### 2. 更新关键词 (update_keywords_from_excel.py)
|
||||
|
||||
```bash
|
||||
python update_keywords_from_excel.py
|
||||
```
|
||||
|
||||
根据Excel更新已存在的关键词记录(department、department_id、author_id、author_name)。
|
||||
|
||||
### 3. 导出统计 (export_author_stats.py)
|
||||
|
||||
```bash
|
||||
python export_author_stats.py
|
||||
python export_author_stats.py --date 2026-01-28
|
||||
```
|
||||
|
||||
**参数**:
|
||||
- `--date`: 目标日期,默认当天
|
||||
- `--output-dir`: 输出目录,默认 `./exports`
|
||||
|
||||
**输出文件**:
|
||||
- `author_review_stats_{date}.csv` - 发文审核统计(所有状态)
|
||||
- `author_published_stats_{date}.csv` - 发文成功统计(published状态)
|
||||
|
||||
## 数据库配置
|
||||
|
||||
默认配置在 `database_config.py`:
|
||||
|
||||
```python
|
||||
DB_CONFIG = {
|
||||
'host': '8.149.233.36',
|
||||
'user': 'ai_article_read',
|
||||
'password': '***',
|
||||
'database': 'ai_article',
|
||||
'charset': 'utf8mb4'
|
||||
}
|
||||
```
|
||||
|
||||
## 导入记录字段说明
|
||||
|
||||
`baidu_keyword` 表插入字段:
|
||||
|
||||
| 字段 | 值 | 说明 |
|
||||
|------|-----|------|
|
||||
| keyword | Excel中query列 | 关键词 |
|
||||
| seed_id | 9999 | 种子ID |
|
||||
| seed_name | 手动提交 | 种子名称 |
|
||||
| crawled | 1 | 已爬取标记 |
|
||||
| parents_id | 0 | 父级ID |
|
||||
| department | Excel中科室列 | 科室名称 |
|
||||
| department_id | 查表获取 | 科室ID |
|
||||
| query_status | manual_review | 查询状态 |
|
||||
| author_id | 查表获取 | 作者ID |
|
||||
| author_name | 查表获取 | 作者名称 |
|
||||
160
database_config.py
Normal file
160
database_config.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""
|
||||
数据库配置管理模块
|
||||
统一管理数据库连接和SQL操作
|
||||
"""
|
||||
import pymysql
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 数据库配置
|
||||
DB_CONFIG = {
|
||||
'host': '8.149.233.36',
|
||||
'user': 'ai_article_read',
|
||||
'password': '7aK_H2yvokVumr84lLNDt8fDBp6P',
|
||||
'database': 'ai_article',
|
||||
'charset': 'utf8mb4'
|
||||
}
|
||||
|
||||
|
||||
class DatabaseManager:
|
||||
"""数据库管理器:统一管理数据库连接和操作"""
|
||||
|
||||
def __init__(self, config=None):
|
||||
"""初始化数据库管理器
|
||||
|
||||
Args:
|
||||
config: 数据库配置字典,默认使用 DB_CONFIG
|
||||
"""
|
||||
self.config = config or DB_CONFIG
|
||||
|
||||
def get_connection(self, autocommit=False):
|
||||
"""获取数据库连接
|
||||
|
||||
Args:
|
||||
autocommit: 是否启用自动提交模式
|
||||
|
||||
Returns:
|
||||
pymysql连接对象
|
||||
"""
|
||||
return pymysql.connect(**self.config, autocommit=autocommit)
|
||||
|
||||
def execute_query(self, sql, params=None, fetch_one=False):
|
||||
"""执行查询SQL(SELECT)
|
||||
|
||||
Args:
|
||||
sql: SQL语句
|
||||
params: SQL参数(tuple或list)
|
||||
fetch_one: True返回单条记录,False返回所有记录
|
||||
|
||||
Returns:
|
||||
查询结果
|
||||
"""
|
||||
conn = None
|
||||
cursor = None
|
||||
try:
|
||||
conn = self.get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
logger.info(f'[SQL] {sql.strip()} | params: {params}')
|
||||
cursor.execute(sql, params or ())
|
||||
|
||||
if fetch_one:
|
||||
result = cursor.fetchone()
|
||||
else:
|
||||
result = cursor.fetchall()
|
||||
|
||||
logger.debug(f'[SQL结果] 返回 {len(result) if not fetch_one and result else (1 if result else 0)} 条记录')
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f'执行查询失败:{e}', exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
if cursor:
|
||||
cursor.close()
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
def execute_update(self, sql, params=None, autocommit=True):
|
||||
"""执行更新SQL(INSERT/UPDATE/DELETE)
|
||||
|
||||
Args:
|
||||
sql: SQL语句
|
||||
params: SQL参数(tuple或list)
|
||||
autocommit: 是否自动提交
|
||||
|
||||
Returns:
|
||||
影响的行数
|
||||
"""
|
||||
conn = None
|
||||
cursor = None
|
||||
try:
|
||||
conn = self.get_connection(autocommit=autocommit)
|
||||
cursor = conn.cursor()
|
||||
|
||||
logger.info(f'[SQL] {sql.strip()} | params: {params}')
|
||||
result = cursor.execute(sql, params or ())
|
||||
|
||||
if not autocommit:
|
||||
conn.commit()
|
||||
|
||||
logger.info(f'[SQL执行] 影响 {result} 行')
|
||||
return result
|
||||
except Exception as e:
|
||||
if not autocommit and conn:
|
||||
conn.rollback()
|
||||
logger.error(f'执行更新失败:{e}', exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
if cursor:
|
||||
cursor.close()
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
def execute_many(self, sql, params_list, autocommit=True):
|
||||
"""批量执行SQL
|
||||
|
||||
Args:
|
||||
sql: SQL语句
|
||||
params_list: 参数列表,每个元素是一组参数
|
||||
autocommit: 是否自动提交
|
||||
|
||||
Returns:
|
||||
成功执行的行数
|
||||
"""
|
||||
conn = None
|
||||
cursor = None
|
||||
try:
|
||||
conn = self.get_connection(autocommit=autocommit)
|
||||
cursor = conn.cursor()
|
||||
|
||||
logger.info(f'[SQL批量] {sql.strip()} | 批次数: {len(params_list)}')
|
||||
|
||||
success_count = 0
|
||||
for params in params_list:
|
||||
try:
|
||||
result = cursor.execute(sql, params)
|
||||
if result > 0:
|
||||
success_count += 1
|
||||
except Exception as e:
|
||||
logger.debug(f'批量执行跳过:params={params},错误:{e}')
|
||||
|
||||
if not autocommit:
|
||||
conn.commit()
|
||||
|
||||
logger.info(f'[SQL批量执行] 成功 {success_count}/{len(params_list)} 条')
|
||||
return success_count
|
||||
except Exception as e:
|
||||
if not autocommit and conn:
|
||||
conn.rollback()
|
||||
logger.error(f'批量执行失败:{e}', exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
if cursor:
|
||||
cursor.close()
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
|
||||
# 创建全局数据库管理器实例
|
||||
db_manager = DatabaseManager()
|
||||
191
export_author_stats.py
Normal file
191
export_author_stats.py
Normal file
@@ -0,0 +1,191 @@
|
||||
"""
|
||||
导出作者发文统计数据到CSV
|
||||
"""
|
||||
import pandas as pd
|
||||
import logging
|
||||
import argparse
|
||||
from database_config import DatabaseManager
|
||||
from datetime import datetime, date
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def export_author_review_stats(db_manager, target_date, output_file):
|
||||
"""
|
||||
导出作者发文审核统计(包含所有审核状态)
|
||||
|
||||
Args:
|
||||
db_manager: 数据库管理器实例
|
||||
target_date: 目标日期,格式:'2026-01-21'
|
||||
output_file: 输出CSV文件路径
|
||||
|
||||
Returns:
|
||||
导出的记录数
|
||||
"""
|
||||
sql = """
|
||||
SELECT
|
||||
a.`author_id`,
|
||||
COALESCE(a.`author_name`, auth.`author_name`) as `author_name`,
|
||||
COUNT(*) as `today_published_count`
|
||||
FROM `ai_articles` a
|
||||
LEFT JOIN `ai_authors` auth ON a.`author_id` = auth.`id`
|
||||
WHERE
|
||||
a.`channel` = 4
|
||||
AND a.`status` IN('published', 'published_review', 'pending_review', 'failed')
|
||||
AND DATE(a.`updated_at`) = %s
|
||||
GROUP BY a.`author_id`, COALESCE(a.`author_name`, auth.`author_name`)
|
||||
HAVING COUNT(*) > 0
|
||||
ORDER BY `today_published_count` DESC
|
||||
"""
|
||||
|
||||
try:
|
||||
logger.info(f"正在查询发文审核统计数据(日期:{target_date})...")
|
||||
result = db_manager.execute_query(sql, (target_date,))
|
||||
|
||||
if not result:
|
||||
logger.warning("未查询到数据")
|
||||
return 0
|
||||
|
||||
# 转换为DataFrame
|
||||
df = pd.DataFrame(result, columns=['author_id', 'author_name', 'today_published_count'])
|
||||
|
||||
# 导出到CSV
|
||||
df.to_csv(output_file, index=False, encoding='utf-8-sig')
|
||||
logger.info(f"成功导出 {len(df)} 条记录到: {output_file}")
|
||||
|
||||
# 显示前5条预览
|
||||
logger.info(f"\n数据预览(前5条):\n{df.head()}")
|
||||
|
||||
return len(df)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"导出发文审核统计失败: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
def export_author_published_stats(db_manager, target_date, output_file):
|
||||
"""
|
||||
导出作者发文成功统计(仅published状态)
|
||||
|
||||
Args:
|
||||
db_manager: 数据库管理器实例
|
||||
target_date: 目标日期,格式:'2026-01-21'
|
||||
output_file: 输出CSV文件路径
|
||||
|
||||
Returns:
|
||||
导出的记录数
|
||||
"""
|
||||
sql = """
|
||||
SELECT
|
||||
a.`author_id`,
|
||||
COALESCE(a.`author_name`, auth.`author_name`) as `author_name`,
|
||||
COUNT(*) as `today_published_count`,
|
||||
COALESCE(auth.`daily_post_max`, 0) as `daily_post_max`,
|
||||
(COALESCE(auth.`daily_post_max`, 0) - COUNT(*)) as `gap`
|
||||
FROM `ai_articles` a
|
||||
LEFT JOIN `ai_authors` auth ON a.`author_id` = auth.`id`
|
||||
WHERE
|
||||
a.`channel` = 4
|
||||
AND a.`status` = 'published'
|
||||
AND DATE(a.`updated_at`) = %s
|
||||
GROUP BY a.`author_id`, COALESCE(a.`author_name`, auth.`author_name`), auth.`daily_post_max`
|
||||
HAVING COUNT(*) > 0
|
||||
ORDER BY `gap` ASC
|
||||
"""
|
||||
|
||||
try:
|
||||
logger.info(f"正在查询发文成功统计数据(日期:{target_date})...")
|
||||
result = db_manager.execute_query(sql, (target_date,))
|
||||
|
||||
if not result:
|
||||
logger.warning("未查询到数据")
|
||||
return 0
|
||||
|
||||
# 转换为DataFrame
|
||||
df = pd.DataFrame(result, columns=['author_id', 'author_name', 'today_published_count', 'daily_post_max', 'gap'])
|
||||
|
||||
# 导出到CSV
|
||||
df.to_csv(output_file, index=False, encoding='utf-8-sig')
|
||||
logger.info(f"成功导出 {len(df)} 条记录到: {output_file}")
|
||||
|
||||
# 显示前5条预览
|
||||
logger.info(f"\n数据预览(前5条):\n{df.head()}")
|
||||
|
||||
return len(df)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"导出发文成功统计失败: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
# 解析命令行参数
|
||||
parser = argparse.ArgumentParser(description='导出作者发文统计数据到CSV')
|
||||
parser.add_argument('--host', default='8.149.233.36', help='数据库主机')
|
||||
parser.add_argument('--port', type=int, default=3306, help='数据库端口')
|
||||
parser.add_argument('--user', default='ai_articles_read', help='数据库用户名')
|
||||
parser.add_argument('--password', default='7aK_H2yvokVumr84lLNDt8fDBp6P', help='数据库密码')
|
||||
parser.add_argument('--database', default='ai_article', help='数据库名')
|
||||
parser.add_argument('--date', default=date.today().strftime('%Y-%m-%d'), help='目标日期,格式:YYYY-MM-DD')
|
||||
parser.add_argument('--output-dir', default='./exports', help='输出目录')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# 创建输出目录
|
||||
import os
|
||||
os.makedirs(args.output_dir, exist_ok=True)
|
||||
|
||||
# 创建数据库连接配置
|
||||
db_config = {
|
||||
'host': args.host,
|
||||
'port': args.port,
|
||||
'user': args.user,
|
||||
'password': args.password,
|
||||
'database': args.database,
|
||||
'charset': 'utf8mb4'
|
||||
}
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("开始导出作者发文统计数据")
|
||||
logger.info(f"数据库配置: {args.user}@{args.host}:{args.port}/{args.database}")
|
||||
logger.info(f"目标日期: {args.date}")
|
||||
logger.info(f"输出目录: {args.output_dir}")
|
||||
logger.info("=" * 60)
|
||||
|
||||
# 创建数据库管理器
|
||||
db_manager = DatabaseManager(db_config)
|
||||
|
||||
try:
|
||||
# 生成输出文件名
|
||||
review_file = f"{args.output_dir}/author_review_stats_{args.date}.csv"
|
||||
published_file = f"{args.output_dir}/author_published_stats_{args.date}.csv"
|
||||
|
||||
# 1. 导出发文审核统计
|
||||
logger.info("\n[任务1] 导出发文审核统计(所有状态)")
|
||||
review_count = export_author_review_stats(db_manager, args.date, review_file)
|
||||
|
||||
# 2. 导出发文成功统计
|
||||
logger.info("\n[任务2] 导出发文成功统计(published状态)")
|
||||
published_count = export_author_published_stats(db_manager, args.date, published_file)
|
||||
|
||||
logger.info("\n" + "=" * 60)
|
||||
logger.info("✓ 导出完成!")
|
||||
logger.info(f"发文审核统计: {review_count} 条 -> {review_file}")
|
||||
logger.info(f"发文成功统计: {published_count} 条 -> {published_file}")
|
||||
logger.info("=" * 60)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"✗ 导出过程出错: {e}", exc_info=True)
|
||||
logger.info("=" * 60)
|
||||
logger.info("✗ 导出失败")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
363
import_keywords.py
Normal file
363
import_keywords.py
Normal file
@@ -0,0 +1,363 @@
|
||||
"""
|
||||
定频轮询脚本:从Excel文件导入关键词到baidu_keyword表
|
||||
"""
|
||||
import pandas as pd
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import glob
|
||||
from database_config import DatabaseManager, DB_CONFIG
|
||||
from datetime import datetime
|
||||
|
||||
# 配置
|
||||
POLL_INTERVAL = 60 # 轮询间隔(秒)
|
||||
UPLOAD_FOLDER = 'query_upload' # Excel文件目录
|
||||
SEED_ID = 9999 # 固定值
|
||||
SEED_NAME = '手动提交' # 固定值
|
||||
CRAWLED = 1 # 固定值
|
||||
QUERY_COLUMN = 'query' # Excel中query列名
|
||||
DEPT_COLUMN = '科室' # Excel中科室列名
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def read_excel_keywords_with_department(excel_path, query_column='query', department_column='科室'):
|
||||
"""
|
||||
读取Excel文件中的关键词和部门信息
|
||||
|
||||
Args:
|
||||
excel_path: Excel文件路径
|
||||
query_column: query列名,默认为'query'
|
||||
department_column: 部门列名,默认为'科室',如果为None则不读取部门信息
|
||||
|
||||
Returns:
|
||||
包含(keyword, department)元组的列表,如果没有部门列则department为None
|
||||
"""
|
||||
try:
|
||||
# 读取Excel文件
|
||||
df = pd.read_excel(excel_path)
|
||||
logger.info(f"成功读取Excel文件: {excel_path}")
|
||||
logger.info(f"Excel文件包含 {len(df)} 行数据")
|
||||
logger.info(f"Excel列名: {df.columns.tolist()}")
|
||||
|
||||
# 检查query列是否存在
|
||||
if query_column not in df.columns:
|
||||
logger.error(f"未找到query列: {query_column}")
|
||||
return []
|
||||
|
||||
# 检查是否有部门列
|
||||
has_department = department_column and department_column in df.columns
|
||||
if department_column and not has_department:
|
||||
logger.warning(f"未找到department列: {department_column},将不使用部门信息")
|
||||
|
||||
# 获取query数据
|
||||
query_data = df[query_column].dropna()
|
||||
query_list = query_data.tolist()
|
||||
|
||||
# 根据是否有部门列,组合数据
|
||||
keyword_dept_pairs = []
|
||||
if has_department:
|
||||
# 有部门列,获取部门数据
|
||||
department_data = df[department_column].dropna()
|
||||
# 对齐数据长度,取最短长度
|
||||
min_length = min(len(query_data), len(department_data))
|
||||
query_list = query_data.iloc[:min_length].tolist()
|
||||
department_list = department_data.iloc[:min_length].tolist()
|
||||
|
||||
for i in range(min_length):
|
||||
keyword = str(query_list[i]).strip()
|
||||
department = str(department_list[i]).strip()
|
||||
if keyword and department: # 确保关键词和部门都不为空
|
||||
keyword_dept_pairs.append((keyword, department))
|
||||
else:
|
||||
# 没有部门列,只提取关键词
|
||||
logger.info("没有部门列,将只导入关键词,不指定科室")
|
||||
for keyword in query_list:
|
||||
keyword = str(keyword).strip()
|
||||
if keyword: # 确保关键词不为空
|
||||
keyword_dept_pairs.append((keyword, None))
|
||||
|
||||
# 去除重复项,保留第一个出现的组合
|
||||
seen = set()
|
||||
unique_keyword_dept_pairs = []
|
||||
for keyword, dept in keyword_dept_pairs:
|
||||
if (keyword, dept) not in seen:
|
||||
seen.add((keyword, dept))
|
||||
unique_keyword_dept_pairs.append((keyword, dept))
|
||||
|
||||
if has_department:
|
||||
logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词-部门组合")
|
||||
else:
|
||||
logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词")
|
||||
|
||||
return unique_keyword_dept_pairs
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"读取Excel文件失败: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
def get_department_id(db_manager, department_name):
|
||||
"""
|
||||
根据科室名称从ai_departments表中获取对应的ID
|
||||
|
||||
Args:
|
||||
db_manager: 数据库管理器实例
|
||||
department_name: 科室名称
|
||||
|
||||
Returns:
|
||||
科室ID,如果未找到则抛出异常
|
||||
"""
|
||||
try:
|
||||
# 查询科室ID - 使用正确的字段名
|
||||
sql = "SELECT id FROM ai_departments WHERE department_name = %s"
|
||||
result = db_manager.execute_query(sql, (department_name,), fetch_one=True)
|
||||
|
||||
if result:
|
||||
return result[0] # 返回ID
|
||||
else:
|
||||
error_msg = f"未找到科室 '{department_name}' 的ID,请先在ai_departments表中添加该科室"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"查询科室ID失败: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
def get_author_info_by_department(db_manager, department_id):
|
||||
"""
|
||||
根据科室ID从ai_authors表中随机获取一个符合条件的作者信息
|
||||
|
||||
Args:
|
||||
db_manager: 数据库管理器实例
|
||||
department_id: 科室ID
|
||||
|
||||
Returns:
|
||||
(author_id, author_name) 元组,如果未找到则返回 (0, '')
|
||||
"""
|
||||
try:
|
||||
# 随机获取该科室下的一个活跃作者
|
||||
sql = "SELECT id, author_name FROM ai_authors WHERE department_id = %s AND status = 'active' AND daily_post_max > 0 ORDER BY RAND() LIMIT 1"
|
||||
result = db_manager.execute_query(sql, (department_id,), fetch_one=True)
|
||||
|
||||
if result:
|
||||
return result[0], result[1]
|
||||
else:
|
||||
logger.warning(f"未找到科室ID {department_id} 下符合条件的活跃作者")
|
||||
return 0, ''
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"查询作者信息失败: {e}")
|
||||
return 0, ''
|
||||
|
||||
|
||||
def import_keywords_to_db(db_manager, keyword_dept_pairs, seed_id=9999, seed_name='手动提交', crawled=1, batch_size=100, sleep_seconds=0.1):
|
||||
"""
|
||||
将关键词批量导入到baidu_keyword表
|
||||
|
||||
Args:
|
||||
db_manager: 数据库管理器实例
|
||||
keyword_dept_pairs: 包含(keyword, department)元组的列表
|
||||
seed_id: 种子ID,默认9999
|
||||
seed_name: 种子名称,默认'手动提交'
|
||||
crawled: 是否已爬取,默认1
|
||||
batch_size: 日志批次大小,每多少条记录输出一次进度
|
||||
sleep_seconds: 每条记录间隔睡眠时间(秒),默认0.1秒
|
||||
|
||||
Returns:
|
||||
成功导入的数量
|
||||
"""
|
||||
if not keyword_dept_pairs:
|
||||
logger.warning("没有关键词需要导入")
|
||||
return 0
|
||||
|
||||
try:
|
||||
logger.info(f"开始导入 {len(keyword_dept_pairs)} 个关键词-部门组合到数据库...")
|
||||
logger.info("采用逐条查询+插入模式,避免重复")
|
||||
|
||||
# 准备SQL语句
|
||||
check_sql = "SELECT COUNT(*) FROM baidu_keyword WHERE keyword = %s"
|
||||
insert_sql = """
|
||||
INSERT INTO baidu_keyword (keyword, seed_id, seed_name, crawled, parents_id, created_at, department, department_id, query_status, author_id, author_name)
|
||||
VALUES (%s, %s, %s, %s, 0, NOW(), %s, %s, %s, %s, %s)
|
||||
"""
|
||||
|
||||
success_count = 0
|
||||
skip_count = 0
|
||||
failed_count = 0
|
||||
|
||||
# 逐条处理
|
||||
for idx, (keyword, department) in enumerate(keyword_dept_pairs, 1):
|
||||
try:
|
||||
if department:
|
||||
logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 部门: {department}')
|
||||
else:
|
||||
logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 无部门信息')
|
||||
|
||||
# 1. 如果有部门信息,获取科室ID和作者信息
|
||||
if department:
|
||||
dept_id = get_department_id(db_manager, department)
|
||||
author_id, author_name = get_author_info_by_department(db_manager, dept_id)
|
||||
else:
|
||||
# 没有部门信息,使用默认值(空字符串而不是None,避免数据库NOT NULL约束)
|
||||
department = ''
|
||||
dept_id = 0
|
||||
author_id = 0
|
||||
author_name = ''
|
||||
|
||||
# 2. 查询关键词是否存在
|
||||
result = db_manager.execute_query(check_sql, (keyword,), fetch_one=True)
|
||||
exists = result[0] > 0 if result else False
|
||||
|
||||
if exists:
|
||||
skip_count += 1
|
||||
logger.debug(f'[调试] 关键词已存在,跳过: {keyword}')
|
||||
else:
|
||||
# 3. 不存在则插入
|
||||
if department:
|
||||
logger.debug(f'[调试] 准备插入: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}, query_status: manual_review')
|
||||
else:
|
||||
logger.debug(f'[调试] 准备插入: {keyword}, 无部门信息, query_status: manual_review')
|
||||
|
||||
affected = db_manager.execute_update(
|
||||
insert_sql,
|
||||
(keyword, seed_id, seed_name, crawled, department, dept_id, 'manual_review', author_id, author_name),
|
||||
autocommit=True
|
||||
)
|
||||
|
||||
if affected > 0:
|
||||
success_count += 1
|
||||
if department:
|
||||
logger.debug(f'[调试] 插入成功: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}, query_status: manual_review')
|
||||
else:
|
||||
logger.debug(f'[调试] 插入成功: {keyword}, 无部门信息, query_status: manual_review')
|
||||
|
||||
# 5. 输出进度
|
||||
if idx % batch_size == 0 or idx == len(keyword_dept_pairs):
|
||||
progress = (idx / len(keyword_dept_pairs)) * 100
|
||||
logger.info(f'[插入进度] {idx}/{len(keyword_dept_pairs)} ({progress:.1f}%) | 成功: {success_count} | 跳过: {skip_count} | 失败: {failed_count}')
|
||||
|
||||
# 6. 每次执行完sleep
|
||||
time.sleep(sleep_seconds)
|
||||
|
||||
except ValueError as ve:
|
||||
# 遇到科室不存在的错误,停止整个导入过程
|
||||
logger.error(f'[错误] 第 {idx} 条记录遇到错误: {ve}')
|
||||
raise ve
|
||||
except Exception as e:
|
||||
failed_count += 1
|
||||
logger.warning(f'[调试] 处理失败 [{idx}/{len(keyword_dept_pairs)}]: keyword={keyword}, 部门={department},错误:{e}')
|
||||
|
||||
logger.info(f"导入完成!成功插入: {success_count} | 跳过已存在: {skip_count} | 失败: {failed_count}")
|
||||
return success_count
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"导入关键词失败: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
def process_single_file(db_manager, excel_path):
|
||||
"""
|
||||
处理单个Excel文件
|
||||
|
||||
Args:
|
||||
db_manager: 数据库管理器实例
|
||||
excel_path: Excel文件路径
|
||||
|
||||
Returns:
|
||||
成功导入的数量
|
||||
"""
|
||||
logger.info(f"开始处理文件: {os.path.basename(excel_path)}")
|
||||
|
||||
# 读取Excel文件
|
||||
keyword_dept_pairs = read_excel_keywords_with_department(excel_path, QUERY_COLUMN, DEPT_COLUMN)
|
||||
|
||||
if not keyword_dept_pairs:
|
||||
logger.warning(f"文件 {os.path.basename(excel_path)} 中没有可导入的数据")
|
||||
return 0
|
||||
|
||||
# 执行导入
|
||||
success_count = import_keywords_to_db(
|
||||
db_manager=db_manager,
|
||||
keyword_dept_pairs=keyword_dept_pairs,
|
||||
seed_id=SEED_ID,
|
||||
seed_name=SEED_NAME,
|
||||
crawled=CRAWLED
|
||||
)
|
||||
|
||||
return success_count
|
||||
|
||||
|
||||
def poll_once(db_manager):
|
||||
"""
|
||||
执行一次轮询
|
||||
|
||||
Returns:
|
||||
处理的文件数
|
||||
"""
|
||||
# 确保目录存在
|
||||
if not os.path.exists(UPLOAD_FOLDER):
|
||||
os.makedirs(UPLOAD_FOLDER)
|
||||
logger.info(f"创建目录: {UPLOAD_FOLDER}")
|
||||
return 0
|
||||
|
||||
# 查找Excel文件
|
||||
excel_files = []
|
||||
for pattern in ['*.xlsx', '*.xls']:
|
||||
excel_files.extend(glob.glob(os.path.join(UPLOAD_FOLDER, pattern)))
|
||||
|
||||
if not excel_files:
|
||||
return 0
|
||||
|
||||
logger.info(f"发现 {len(excel_files)} 个Excel文件待处理")
|
||||
|
||||
processed_count = 0
|
||||
for excel_path in excel_files:
|
||||
try:
|
||||
success_count = process_single_file(db_manager, excel_path)
|
||||
|
||||
# 处理成功后删除文件
|
||||
os.remove(excel_path)
|
||||
logger.info(f"✓ 文件处理完成并已删除: {os.path.basename(excel_path)} (导入 {success_count} 条)")
|
||||
processed_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"✗ 处理文件失败: {os.path.basename(excel_path)}, 错误: {e}")
|
||||
# 失败时不删除文件,保留以便排查
|
||||
|
||||
return processed_count
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数 - 定频轮询模式"""
|
||||
logger.info("=" * 60)
|
||||
logger.info("定频轮询脚本启动")
|
||||
logger.info(f"轮询间隔: {POLL_INTERVAL} 秒")
|
||||
logger.info(f"监控目录: {UPLOAD_FOLDER}")
|
||||
logger.info(f"固定配置: seed_id={SEED_ID}, seed_name='{SEED_NAME}', crawled={CRAWLED}")
|
||||
logger.info("=" * 60)
|
||||
|
||||
# 创建数据库管理器
|
||||
db_manager = DatabaseManager(DB_CONFIG)
|
||||
|
||||
# 定频轮询
|
||||
while True:
|
||||
try:
|
||||
processed = poll_once(db_manager)
|
||||
if processed > 0:
|
||||
logger.info(f"本轮处理完成,共处理 {processed} 个文件")
|
||||
except Exception as e:
|
||||
logger.error(f"轮询出错: {e}")
|
||||
|
||||
# 等待下一轮
|
||||
time.sleep(POLL_INTERVAL)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
394
start_import_keywords.sh
Normal file
394
start_import_keywords.sh
Normal file
@@ -0,0 +1,394 @@
|
||||
#!/bin/bash
|
||||
|
||||
# ============================================
|
||||
# 关键词导入系统管理脚本
|
||||
# 支持进程数量控制
|
||||
# ============================================
|
||||
|
||||
# 配置区
|
||||
BASE_DIR="/home/work/ai_import_quary"
|
||||
VENV_PYTHON="/home/work/keyword_crawl/venv/bin/python"
|
||||
|
||||
# import_keywords 配置
|
||||
IMPORT_SCRIPT="${BASE_DIR}/import_keywords.py"
|
||||
IMPORT_PID_FILE="${BASE_DIR}/import_keywords.pid"
|
||||
IMPORT_LOG_FILE="${BASE_DIR}/import_keywords.log"
|
||||
IMPORT_MAX_PROCESSES=1 # 限制最多1个进程
|
||||
|
||||
# 颜色定义
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
BLUE='\033[0;34m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# 获取脚本正在运行的进程数量
|
||||
get_process_count() {
|
||||
local script_name=$(basename "$1")
|
||||
pgrep -f "$script_name" 2>/dev/null | wc -l
|
||||
}
|
||||
|
||||
# 获取所有相关进程的PID
|
||||
get_all_pids() {
|
||||
local script_name=$(basename "$1")
|
||||
pgrep -f "$script_name" 2>/dev/null | tr '\n' ' '
|
||||
}
|
||||
|
||||
# 启动服务(带进程数量控制)
|
||||
start_single() {
|
||||
local script=$1
|
||||
local pid_file=$2
|
||||
local log_file=$3
|
||||
local name=$4
|
||||
local max_processes=$5
|
||||
|
||||
local script_name=$(basename "$script")
|
||||
local current_count=$(get_process_count "$script")
|
||||
|
||||
# 检查是否超过最大进程数
|
||||
if [ $current_count -ge $max_processes ]; then
|
||||
echo -e "${YELLOW}${name} 已达到最大进程数 (${current_count}/${max_processes}),跳过启动${NC}"
|
||||
local first_pid=$(pgrep -f "$script_name" | head -n1)
|
||||
if [ -n "$first_pid" ]; then
|
||||
echo "$first_pid" > "$pid_file"
|
||||
fi
|
||||
return 0
|
||||
fi
|
||||
|
||||
# 检查PID文件记录的进程
|
||||
if [ -f "$pid_file" ]; then
|
||||
local pid=$(cat "$pid_file" 2>/dev/null)
|
||||
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
|
||||
echo -e "${YELLOW}${name} 已在运行(PID文件记录),PID: ${pid}${NC}"
|
||||
return 0
|
||||
fi
|
||||
fi
|
||||
|
||||
echo -e "${BLUE}正在启动 ${name}(守护模式)...${NC}"
|
||||
|
||||
# 确保日志目录存在
|
||||
mkdir -p "$(dirname "$log_file")"
|
||||
|
||||
# 备份旧日志
|
||||
if [ -f "$log_file" ]; then
|
||||
local backup_log="${log_file}.$(date +%Y%m%d_%H%M%S).bak"
|
||||
cp "$log_file" "$backup_log"
|
||||
echo -e "${BLUE}旧日志已备份到: ${backup_log}${NC}"
|
||||
fi
|
||||
|
||||
# 启动守护进程
|
||||
nohup "$VENV_PYTHON" "$script" >> "$log_file" 2>&1 &
|
||||
local new_pid=$!
|
||||
|
||||
# 等待进程真正启动
|
||||
sleep 2
|
||||
|
||||
# 验证进程是否启动成功
|
||||
if kill -0 "$new_pid" 2>/dev/null; then
|
||||
echo "$new_pid" > "$pid_file"
|
||||
echo -e "${GREEN}${name} 已启动,PID: ${new_pid}${NC}"
|
||||
echo -e "${BLUE}日志文件: ${log_file}${NC}"
|
||||
return 0
|
||||
else
|
||||
echo -e "${RED}${name} 启动失败,请检查日志${NC}"
|
||||
tail -20 "$log_file"
|
||||
rm -f "$pid_file"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
# 停止服务的所有实例
|
||||
stop_single_all() {
|
||||
local script=$1
|
||||
local name=$2
|
||||
local pid_file=$3
|
||||
|
||||
local script_name=$(basename "$script")
|
||||
local pids=$(get_all_pids "$script")
|
||||
local count=$(get_process_count "$script")
|
||||
|
||||
if [ $count -eq 0 ]; then
|
||||
echo -e "${YELLOW}${name} 没有运行中的进程${NC}"
|
||||
rm -f "$pid_file"
|
||||
return 0
|
||||
fi
|
||||
|
||||
echo -e "${BLUE}正在停止 ${name} (${count}个进程)...${NC}"
|
||||
echo -e "进程PIDs: ${pids}"
|
||||
|
||||
# 首先尝试优雅终止
|
||||
for pid in $pids; do
|
||||
if kill -0 "$pid" 2>/dev/null; then
|
||||
echo -e " 发送SIGTERM到 PID $pid..."
|
||||
kill "$pid"
|
||||
fi
|
||||
done
|
||||
|
||||
# 等待优雅退出
|
||||
local wait_time=10
|
||||
local remaining=$count
|
||||
for i in $(seq 1 $wait_time); do
|
||||
remaining=$(get_process_count "$script")
|
||||
if [ $remaining -eq 0 ]; then
|
||||
break
|
||||
fi
|
||||
echo -n "."
|
||||
sleep 1
|
||||
done
|
||||
|
||||
echo ""
|
||||
|
||||
# 检查是否还有进程残留
|
||||
remaining=$(get_process_count "$script")
|
||||
if [ $remaining -gt 0 ]; then
|
||||
echo -e "${YELLOW}还有 ${remaining} 个进程未退出,强制终止...${NC}"
|
||||
pids=$(get_all_pids "$script")
|
||||
for pid in $pids; do
|
||||
if kill -0 "$pid" 2>/dev/null; then
|
||||
kill -9 "$pid" 2>/dev/null
|
||||
fi
|
||||
done
|
||||
sleep 2
|
||||
fi
|
||||
|
||||
# 验证所有进程都已停止
|
||||
remaining=$(get_process_count "$script")
|
||||
if [ $remaining -eq 0 ]; then
|
||||
echo -e "${GREEN}${name} 所有进程已停止${NC}"
|
||||
rm -f "$pid_file"
|
||||
return 0
|
||||
else
|
||||
echo -e "${RED}警告:仍有 ${remaining} 个进程无法终止${NC}"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
# 启动服务
|
||||
start() {
|
||||
echo -e "${BLUE}========== 启动关键词导入系统 ==========${NC}"
|
||||
echo -e "${YELLOW}进程限制:最多启动1个实例${NC}"
|
||||
echo ""
|
||||
|
||||
start_single "$IMPORT_SCRIPT" "$IMPORT_PID_FILE" "$IMPORT_LOG_FILE" "import_keywords" "$IMPORT_MAX_PROCESSES"
|
||||
|
||||
echo -e "${BLUE}========================================${NC}"
|
||||
}
|
||||
|
||||
# 停止服务
|
||||
stop() {
|
||||
echo -e "${BLUE}========== 停止关键词导入系统 ==========${NC}"
|
||||
|
||||
stop_single_all "$IMPORT_SCRIPT" "import_keywords" "$IMPORT_PID_FILE"
|
||||
|
||||
echo -e "${BLUE}========================================${NC}"
|
||||
}
|
||||
|
||||
# 强制停止
|
||||
force-stop() {
|
||||
echo -e "${RED}========== 强制停止关键词导入进程 ==========${NC}"
|
||||
|
||||
# 停止守护进程
|
||||
if [ -f "$IMPORT_PID_FILE" ]; then
|
||||
local pid=$(cat "$IMPORT_PID_FILE" 2>/dev/null)
|
||||
if [ -n "$pid" ]; then
|
||||
kill -9 "$pid" 2>/dev/null
|
||||
fi
|
||||
fi
|
||||
|
||||
# 停止所有相关进程
|
||||
pkill -9 -f "import_keywords.py" 2>/dev/null
|
||||
|
||||
sleep 2
|
||||
|
||||
rm -f "$IMPORT_PID_FILE"
|
||||
|
||||
local remaining=$(pgrep -f "import_keywords" | wc -l)
|
||||
if [ $remaining -eq 0 ]; then
|
||||
echo -e "${GREEN}✅ 所有进程已强制停止${NC}"
|
||||
else
|
||||
echo -e "${RED}❌ 仍有 ${remaining} 个进程存活${NC}"
|
||||
pgrep -f "import_keywords" | xargs ps -fp 2>/dev/null
|
||||
fi
|
||||
|
||||
echo -e "${RED}==========================================${NC}"
|
||||
}
|
||||
|
||||
# 重启服务
|
||||
restart() {
|
||||
echo -e "${BLUE}========== 重启关键词导入系统 ==========${NC}"
|
||||
|
||||
stop
|
||||
if [ $? -eq 0 ]; then
|
||||
sleep 3
|
||||
start
|
||||
else
|
||||
echo -e "${RED}停止服务失败,请使用 force-restart${NC}"
|
||||
return 1
|
||||
fi
|
||||
|
||||
echo -e "${BLUE}========================================${NC}"
|
||||
}
|
||||
|
||||
# 强制重启
|
||||
force-restart() {
|
||||
echo -e "${YELLOW}========== 强制重启关键词导入系统 ==========${NC}"
|
||||
|
||||
force-stop
|
||||
sleep 3
|
||||
start
|
||||
|
||||
echo -e "${YELLOW}============================================${NC}"
|
||||
}
|
||||
|
||||
# 显示状态
|
||||
status() {
|
||||
echo -e "${BLUE}========== 关键词导入系统状态 ==========${NC}"
|
||||
echo -e "${BLUE}系统时间: $(date)${NC}"
|
||||
echo -e "${BLUE}工作目录: ${BASE_DIR}${NC}"
|
||||
echo ""
|
||||
|
||||
local count=$(get_process_count "$IMPORT_SCRIPT")
|
||||
echo -e "${YELLOW}📊 进程状态:${NC}"
|
||||
echo -e " 进程数: ${count}"
|
||||
|
||||
if [ $count -gt 0 ]; then
|
||||
local pids=$(get_all_pids "$IMPORT_SCRIPT")
|
||||
echo -e " 进程PIDs: ${pids}"
|
||||
|
||||
# 显示CPU和内存使用
|
||||
for pid in $pids; do
|
||||
local cpu=$(ps -p $pid -o %cpu --no-headers 2>/dev/null | tr -d ' ')
|
||||
local mem=$(ps -p $pid -o %mem --no-headers 2>/dev/null | tr -d ' ')
|
||||
local runtime=$(ps -p $pid -o etime --no-headers 2>/dev/null | tr -d ' ')
|
||||
echo -e " PID ${pid}: CPU ${cpu}%, 内存 ${mem}%, 运行时间 ${runtime}"
|
||||
done
|
||||
else
|
||||
echo -e "${YELLOW}没有运行中的进程${NC}"
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo -e "${YELLOW}📁 PID文件状态:${NC}"
|
||||
if [ -f "$IMPORT_PID_FILE" ]; then
|
||||
local pid=$(cat "$IMPORT_PID_FILE" 2>/dev/null)
|
||||
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
|
||||
echo -e " ${GREEN}✓ import_keywords.pid: 有效 (PID: $pid)${NC}"
|
||||
else
|
||||
echo -e " ${RED}✗ import_keywords.pid: 无效或进程不存在${NC}"
|
||||
fi
|
||||
else
|
||||
echo -e " ${YELLOW}○ import_keywords.pid: 不存在${NC}"
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo -e "${YELLOW}📝 最近日志:${NC}"
|
||||
if [ -f "$IMPORT_LOG_FILE" ]; then
|
||||
echo -e "${BLUE}--- import_keywords.log (最后10行) ---${NC}"
|
||||
tail -10 "$IMPORT_LOG_FILE" 2>/dev/null
|
||||
else
|
||||
echo -e "${YELLOW}日志文件不存在${NC}"
|
||||
fi
|
||||
|
||||
echo -e "${BLUE}========================================${NC}"
|
||||
}
|
||||
|
||||
# 查看日志
|
||||
logs() {
|
||||
local lines=${1:-50}
|
||||
echo -e "${BLUE}========== 查看日志 (最后 ${lines} 行) ==========${NC}"
|
||||
|
||||
if [ -f "$IMPORT_LOG_FILE" ]; then
|
||||
tail -$lines "$IMPORT_LOG_FILE"
|
||||
else
|
||||
echo -e "${YELLOW}日志文件不存在${NC}"
|
||||
fi
|
||||
|
||||
echo -e "${BLUE}============================================${NC}"
|
||||
}
|
||||
|
||||
# 实时查看日志
|
||||
logs-follow() {
|
||||
echo -e "${BLUE}========== 实时查看日志 (Ctrl+C 退出) ==========${NC}"
|
||||
tail -f "$IMPORT_LOG_FILE"
|
||||
}
|
||||
|
||||
# 显示帮助
|
||||
show_help() {
|
||||
echo -e "${GREEN}关键词导入系统管理脚本${NC}"
|
||||
echo ""
|
||||
echo -e "${YELLOW}当前配置:${NC}"
|
||||
echo -e " 工作目录: ${BASE_DIR}"
|
||||
echo -e " 最大进程数: ${IMPORT_MAX_PROCESSES}"
|
||||
echo -e " 监控目录: query_upload/"
|
||||
echo -e " 轮询间隔: 60秒"
|
||||
echo ""
|
||||
echo -e "${BLUE}用法: $0 {命令}${NC}"
|
||||
echo ""
|
||||
echo -e "${GREEN}服务管理:${NC}"
|
||||
echo -e " ${YELLOW}start${NC} 启动服务(定频轮询模式)"
|
||||
echo -e " ${YELLOW}stop${NC} 停止服务"
|
||||
echo -e " ${YELLOW}force-stop${NC} 强制停止所有进程"
|
||||
echo -e " ${YELLOW}restart${NC} 重启服务"
|
||||
echo -e " ${YELLOW}force-restart${NC} 强制重启"
|
||||
echo ""
|
||||
echo -e "${GREEN}状态查看:${NC}"
|
||||
echo -e " ${YELLOW}status${NC} 显示进程状态"
|
||||
echo -e " ${YELLOW}logs [N]${NC} 查看最后N行日志(默认50)"
|
||||
echo -e " ${YELLOW}logs-follow${NC} 实时查看日志"
|
||||
echo ""
|
||||
echo -e "${GREEN}其他:${NC}"
|
||||
echo -e " ${YELLOW}help${NC} 显示帮助"
|
||||
echo ""
|
||||
echo -e "${GREEN}工作流程:${NC}"
|
||||
echo -e " 1. 定频轮询 query_upload/ 目录"
|
||||
echo -e " 2. 发现Excel文件后自动处理"
|
||||
echo -e " 3. 读取query列和科室列"
|
||||
echo -e " 4. 查询科室ID (ai_departments)"
|
||||
echo -e " 5. 随机获取作者信息 (ai_authors)"
|
||||
echo -e " 6. 判重后插入 baidu_keyword 表"
|
||||
echo -e " 7. 处理完成后删除源文件"
|
||||
echo ""
|
||||
echo -e "${GREEN}示例:${NC}"
|
||||
echo -e " $0 start # 启动定频轮询服务"
|
||||
echo -e " $0 status # 查看运行状态"
|
||||
echo -e " $0 logs-follow # 实时查看日志"
|
||||
echo -e " $0 restart # 重启服务"
|
||||
}
|
||||
|
||||
# 主逻辑
|
||||
case "$1" in
|
||||
start)
|
||||
start
|
||||
;;
|
||||
stop)
|
||||
stop
|
||||
;;
|
||||
force-stop)
|
||||
force-stop
|
||||
;;
|
||||
restart)
|
||||
restart
|
||||
;;
|
||||
force-restart)
|
||||
force-restart
|
||||
;;
|
||||
status)
|
||||
status
|
||||
;;
|
||||
logs)
|
||||
logs $2
|
||||
;;
|
||||
logs-follow)
|
||||
logs-follow
|
||||
;;
|
||||
help|--help|-h)
|
||||
show_help
|
||||
;;
|
||||
*)
|
||||
echo -e "${RED}错误:未知命令 '$1'${NC}"
|
||||
echo ""
|
||||
show_help
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
exit 0
|
||||
320
update_keywords_from_excel.py
Normal file
320
update_keywords_from_excel.py
Normal file
@@ -0,0 +1,320 @@
|
||||
"""
|
||||
根据Excel文件更新baidu_keyword表中符合条件的记录
|
||||
"""
|
||||
import pandas as pd
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from database_config import DatabaseManager
|
||||
from datetime import datetime
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def read_excel_keywords_with_department(excel_path, query_column='query', department_column='科室'):
|
||||
"""
|
||||
读取Excel文件中的关键词和部门信息
|
||||
|
||||
Args:
|
||||
excel_path: Excel文件路径
|
||||
query_column: query列名,默认为'query'
|
||||
department_column: 部门列名,默认为'科室'
|
||||
|
||||
Returns:
|
||||
包含(keyword, department)元组的列表
|
||||
"""
|
||||
try:
|
||||
# 读取Excel文件
|
||||
df = pd.read_excel(excel_path)
|
||||
logger.info(f"成功读取Excel文件: {excel_path}")
|
||||
logger.info(f"Excel文件包含 {len(df)} 行数据")
|
||||
logger.info(f"Excel列名: {df.columns.tolist()}")
|
||||
|
||||
# 检查query列和department列是否存在
|
||||
if query_column not in df.columns:
|
||||
logger.error(f"未找到query列: {query_column}")
|
||||
return []
|
||||
|
||||
if department_column not in df.columns:
|
||||
logger.error(f"未找到department列: {department_column}")
|
||||
return []
|
||||
|
||||
# 获取query和department数据
|
||||
query_data = df[query_column].dropna()
|
||||
department_data = df[department_column].dropna()
|
||||
|
||||
# 对齐数据长度,取最短长度
|
||||
min_length = min(len(query_data), len(department_data))
|
||||
query_list = query_data.iloc[:min_length].tolist()
|
||||
department_list = department_data.iloc[:min_length].tolist()
|
||||
|
||||
# 组合关键词和部门信息
|
||||
keyword_dept_pairs = []
|
||||
for i in range(min_length):
|
||||
keyword = str(query_list[i]).strip()
|
||||
department = str(department_list[i]).strip()
|
||||
if keyword and department: # 确保关键词和部门都不为空
|
||||
keyword_dept_pairs.append((keyword, department))
|
||||
|
||||
# 去除重复项,保留第一个出现的组合
|
||||
seen = set()
|
||||
unique_keyword_dept_pairs = []
|
||||
for keyword, dept in keyword_dept_pairs:
|
||||
if (keyword, dept) not in seen:
|
||||
seen.add((keyword, dept))
|
||||
unique_keyword_dept_pairs.append((keyword, dept))
|
||||
|
||||
logger.info(f"提取到 {len(unique_keyword_dept_pairs)} 个唯一的关键词-部门组合")
|
||||
|
||||
return unique_keyword_dept_pairs
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"读取Excel文件失败: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
def get_department_id(db_manager, department_name):
|
||||
"""
|
||||
根据科室名称从ai_departments表中获取对应的ID
|
||||
|
||||
Args:
|
||||
db_manager: 数据库管理器实例
|
||||
department_name: 科室名称
|
||||
|
||||
Returns:
|
||||
科室ID,如果未找到则抛出异常
|
||||
"""
|
||||
try:
|
||||
# 查询科室ID - 使用正确的字段名
|
||||
sql = "SELECT id FROM ai_departments WHERE department_name = %s"
|
||||
result = db_manager.execute_query(sql, (department_name,), fetch_one=True)
|
||||
|
||||
if result:
|
||||
return result[0] # 返回ID
|
||||
else:
|
||||
error_msg = f"未找到科室 '{department_name}' 的ID,请先在ai_departments表中添加该科室"
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"查询科室ID失败: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
def get_author_info_by_department(db_manager, department_id):
|
||||
"""
|
||||
根据科室ID从ai_authors表中获取任一符合条件的作者信息
|
||||
|
||||
Args:
|
||||
db_manager: 数据库管理器实例
|
||||
department_id: 科室ID
|
||||
|
||||
Returns:
|
||||
(author_id, author_name) 元组,如果未找到则返回 (0, '')
|
||||
"""
|
||||
try:
|
||||
# 查询符合条件的作者信息
|
||||
sql = "SELECT id, author_name FROM ai_authors WHERE department_id = %s AND status = 'active' AND daily_post_max > 0 LIMIT 1"
|
||||
result = db_manager.execute_query(sql, (department_id,), fetch_one=True)
|
||||
|
||||
if result:
|
||||
return result[0], result[1] # 返回 author_id, author_name
|
||||
else:
|
||||
logger.warning(f"未找到科室ID {department_id} 下符合条件的活跃作者")
|
||||
return 0, '' # 返回默认值而不是None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"查询作者信息失败: {e}", exc_info=True)
|
||||
return 0, '' # 返回默认值
|
||||
|
||||
|
||||
def update_keywords_from_excel(db_manager, keyword_dept_pairs, batch_size=100, sleep_seconds=0.1):
|
||||
"""
|
||||
根据Excel文件更新baidu_keyword表中符合条件的记录
|
||||
|
||||
Args:
|
||||
db_manager: 数据库管理器实例
|
||||
keyword_dept_pairs: 包含(keyword, department)元组的列表
|
||||
batch_size: 日志批次大小,每多少条记录输出一次进度
|
||||
sleep_seconds: 每条记录间隔睡眠时间(秒),默认0.1秒
|
||||
|
||||
Returns:
|
||||
成功更新的数量
|
||||
"""
|
||||
if not keyword_dept_pairs:
|
||||
logger.warning("没有关键词需要更新")
|
||||
return 0
|
||||
|
||||
try:
|
||||
logger.info(f"开始更新 {len(keyword_dept_pairs)} 个关键词-部门组合到数据库...")
|
||||
logger.info("采用逐条查询+更新模式,只更新存在的关键词")
|
||||
|
||||
# 准备SQL语句 - 符合指定条件的查询
|
||||
check_sql = "SELECT COUNT(*) FROM baidu_keyword WHERE keyword = %s AND seed_id = 9999 AND created_at > '2026-01-28 12:00:00' AND created_at < '2026-01-28 19:53:00' AND query_status = 'manual_review'"
|
||||
update_sql = """
|
||||
UPDATE baidu_keyword
|
||||
SET department = %s, department_id = %s, author_id = %s, author_name = %s
|
||||
WHERE keyword = %s AND seed_id = 9999 AND created_at > '2026-01-28 12:00:00' AND created_at < '2026-01-28 19:53:00' AND query_status = 'manual_review'
|
||||
"""
|
||||
|
||||
success_count = 0
|
||||
skip_count = 0
|
||||
failed_count = 0
|
||||
|
||||
# 逐条处理
|
||||
for idx, (keyword, department) in enumerate(keyword_dept_pairs, 1):
|
||||
try:
|
||||
logger.debug(f'[调试] 处理第 {idx}/{len(keyword_dept_pairs)} 条: {keyword}, 部门: {department}')
|
||||
|
||||
# 1. 查询关键词是否存在(在指定条件下)
|
||||
result = db_manager.execute_query(check_sql, (keyword,), fetch_one=True)
|
||||
exists = result[0] > 0 if result else False
|
||||
|
||||
if not exists:
|
||||
skip_count += 1
|
||||
logger.debug(f'[调试] 关键词不存在于指定条件中,跳过: {keyword}')
|
||||
continue # 跳过不存在的关键词
|
||||
|
||||
# 2. 获取科室ID(必须存在,否则抛出异常)
|
||||
dept_id = get_department_id(db_manager, department)
|
||||
|
||||
# 3. 获取作者信息
|
||||
author_id, author_name = get_author_info_by_department(db_manager, dept_id)
|
||||
|
||||
# 4. 存在则更新
|
||||
logger.debug(f'[调试] 准备更新: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}')
|
||||
affected = db_manager.execute_update(
|
||||
update_sql,
|
||||
(department, dept_id, author_id, author_name, keyword),
|
||||
autocommit=True
|
||||
)
|
||||
|
||||
if affected > 0:
|
||||
success_count += 1
|
||||
logger.debug(f'[调试] 更新成功: {keyword}, 部门: {department}, 部门ID: {dept_id}, 作者ID: {author_id}, 作者名: {author_name}')
|
||||
|
||||
# 5. 输出进度
|
||||
if idx % batch_size == 0 or idx == len(keyword_dept_pairs):
|
||||
progress = (idx / len(keyword_dept_pairs)) * 100
|
||||
logger.info(f'[更新进度] {idx}/{len(keyword_dept_pairs)} ({progress:.1f}%) | 成功: {success_count} | 跳过: {skip_count} | 失败: {failed_count}')
|
||||
|
||||
# 6. 每次执行完sleep
|
||||
time.sleep(sleep_seconds)
|
||||
|
||||
except ValueError as ve:
|
||||
# 遇到科室不存在的错误,跳过该条记录继续
|
||||
logger.error(f'[错误] 第 {idx} 条记录遇到错误: {ve}')
|
||||
failed_count += 1
|
||||
continue
|
||||
except Exception as e:
|
||||
failed_count += 1
|
||||
logger.warning(f'[调试] 处理失败 [{idx}/{len(keyword_dept_pairs)}]: keyword={keyword}, 部门={department},错误:{e}')
|
||||
|
||||
logger.info(f"更新完成!成功更新: {success_count} | 跳过不存在: {skip_count} | 失败: {failed_count}")
|
||||
return success_count
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"更新关键词失败: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
# Excel文件路径
|
||||
excel_path = '/home/work/ai_improt_quary/副本query表-0128第一批.xlsx'
|
||||
|
||||
# 创建数据库连接配置
|
||||
db_config = {
|
||||
'host': '8.149.233.36',
|
||||
'port': 3306,
|
||||
'user': 'ai_article_read',
|
||||
'password': '7aK_H2yvokVumr84lLNDt8fDBp6P',
|
||||
'database': 'ai_article',
|
||||
'charset': 'utf8mb4'
|
||||
}
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("开始根据Excel更新baidu_keyword表中符合条件的记录")
|
||||
logger.info(f"数据库配置: {db_config['user']}@{db_config['host']}:3306/{db_config['database']}")
|
||||
logger.info("=" * 60)
|
||||
|
||||
# 创建数据库管理器
|
||||
db_manager = DatabaseManager(db_config)
|
||||
|
||||
try:
|
||||
# 1. 读取Excel文件
|
||||
keyword_dept_pairs = read_excel_keywords_with_department(excel_path, 'query', '科室')
|
||||
|
||||
if not keyword_dept_pairs:
|
||||
logger.warning("没有可更新的关键词,程序退出")
|
||||
return
|
||||
|
||||
# 询问用户是要更新全部数据还是部分测试
|
||||
print(f"\nExcel中共有 {len(keyword_dept_pairs)} 条数据")
|
||||
while True:
|
||||
choice = input("请选择更新方式: A) 全部更新 B) 测试模式(输入前N条数据): ").strip().upper()
|
||||
if choice == 'A':
|
||||
# 全部更新
|
||||
break
|
||||
elif choice == 'B':
|
||||
try:
|
||||
test_count = int(input(f"请输入要测试的条数 (1-{len(keyword_dept_pairs)}): "))
|
||||
if 1 <= test_count <= len(keyword_dept_pairs):
|
||||
keyword_dept_pairs = keyword_dept_pairs[:test_count]
|
||||
print(f"已选择更新前 {test_count} 条数据进行测试")
|
||||
break
|
||||
else:
|
||||
print(f"输入超出范围,请输入1到{len(keyword_dept_pairs)}之间的数字")
|
||||
except ValueError:
|
||||
print("请输入有效的数字")
|
||||
else:
|
||||
print("请输入 A 或 B")
|
||||
|
||||
if not keyword_dept_pairs:
|
||||
logger.warning("没有可更新的关键词,程序退出")
|
||||
return
|
||||
|
||||
# 打印前10个关键词-部门组合作为预览
|
||||
logger.info(f"\n关键词-部门预览(前10个):")
|
||||
for i, (keyword, department) in enumerate(keyword_dept_pairs[:10], 1):
|
||||
logger.info(f" {i}. {keyword} (部门: {department})")
|
||||
|
||||
if len(keyword_dept_pairs) > 10:
|
||||
logger.info(f" ... 还有 {len(keyword_dept_pairs) - 10} 个关键词-部门组合")
|
||||
|
||||
# 2. 确认更新
|
||||
print("\n" + "=" * 60)
|
||||
print(f"即将更新 {len(keyword_dept_pairs)} 个关键词-部门组合到 baidu_keyword 表")
|
||||
print(f"条件: seed_id=9999 AND created_at BETWEEN '2026-01-28 12:00:00' AND '2026-01-28 19:53:00' AND query_status='manual_review'")
|
||||
confirm = input("确认更新? (y/n): ").strip().lower()
|
||||
|
||||
if confirm != 'y':
|
||||
logger.info("用户取消更新")
|
||||
return
|
||||
|
||||
# 3. 执行更新
|
||||
success_count = update_keywords_from_excel(
|
||||
db_manager=db_manager,
|
||||
keyword_dept_pairs=keyword_dept_pairs,
|
||||
batch_size=1,
|
||||
sleep_seconds=0.1
|
||||
)
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"✓ 更新完成!共成功更新 {success_count} 个关键词")
|
||||
logger.info("=" * 60)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"✗ 更新过程出错: {e}", exc_info=True)
|
||||
logger.info("=" * 60)
|
||||
logger.info("✗ 更新失败")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user