247 lines
9.7 KiB
Python
247 lines
9.7 KiB
Python
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
import time
|
||
import logging
|
||
from datetime import datetime
|
||
from multiprocessing import Process
|
||
from log_config import setup_baidu_seed_logger
|
||
from database_config import db_manager
|
||
|
||
# 使用统一的日志配置
|
||
logger = setup_baidu_seed_logger()
|
||
|
||
|
||
def init_db():
|
||
# 只做连接关闭,去除建表语句
|
||
pass
|
||
|
||
def insert_keywords(words, parent_id=0, seed_id=0, seed_name=''):
|
||
"""插入关键词到baidu_keyword表(自动提交模式,高效快速)
|
||
|
||
Args:
|
||
words: 关键词列表
|
||
parent_id: 父层级ID
|
||
seed_id: 种子ID
|
||
seed_name: 种子名称
|
||
"""
|
||
if not words:
|
||
return
|
||
|
||
try:
|
||
# (1)先强制删除 parents_id=0 且 seed_id=当前ID 的记录
|
||
delete_sql = "DELETE FROM baidu_keyword WHERE parents_id=0 AND seed_id=%s LIMIT 1"
|
||
deleted = db_manager.execute_update(delete_sql, (seed_id,), autocommit=True)
|
||
if deleted > 0:
|
||
logger.info(f'[删除旧记录] 种子ID:{seed_id}, parents_id=0, 删除{deleted}条')
|
||
|
||
inserted_count = 0
|
||
for word in words:
|
||
# (2)插入新记录,不使用 IGNORE,重复就报错
|
||
insert_sql = """INSERT INTO baidu_keyword
|
||
(keyword, parents_id, seed_id, seed_name)
|
||
VALUES (%s, %s, %s, %s)"""
|
||
try:
|
||
db_manager.execute_update(insert_sql, (word, parent_id, seed_id, seed_name), autocommit=True)
|
||
inserted_count += 1
|
||
logger.info(f'[插入成功] {word} (父ID:{parent_id}, 种子ID:{seed_id})')
|
||
except Exception as insert_err:
|
||
logger.warning(f'[插入失败] {word} - {insert_err}')
|
||
|
||
if inserted_count > 0:
|
||
logger.info(f'成功插入 {inserted_count}/{len(words)} 个新关键词 (父ID:{parent_id}, 种子:{seed_name})')
|
||
|
||
except Exception as e:
|
||
logger.error(f'插入关键词异常:{e},父ID:{parent_id},种子:{seed_name}')
|
||
|
||
def get_seed_keywords_batch(batch_size=4):
|
||
"""从 baidu_seed_keywords表获取一批未抓取的种子词
|
||
|
||
Args:
|
||
batch_size: 批次大小
|
||
|
||
Returns:
|
||
list: 种子词列表,每个元素为(id, keyword)
|
||
"""
|
||
try:
|
||
sql = "SELECT id, keyword FROM baidu_seed_keywords WHERE crawled=0 AND status = 'ready' LIMIT %s"
|
||
results = db_manager.execute_query(sql, (batch_size,))
|
||
return results or []
|
||
except Exception as e:
|
||
logger.error(f'查询种子词失败:{e}')
|
||
return []
|
||
|
||
|
||
def get_keyword_info(keyword):
|
||
"""获取关键词的详细信息
|
||
|
||
Args:
|
||
keyword: 关键词
|
||
|
||
Returns:
|
||
tuple: (id, keyword, parents_id, seed_id, seed_name) 或 None
|
||
"""
|
||
try:
|
||
sql = "SELECT id, keyword, parents_id, seed_id, seed_name FROM baidu_keyword WHERE keyword=%s"
|
||
result = db_manager.execute_query(sql, (keyword,), fetch_one=True)
|
||
return result
|
||
except Exception as e:
|
||
logger.error(f'查询关键词信息失败:{e}')
|
||
return None
|
||
|
||
|
||
def process_seed_keywords():
|
||
"""处理种子关键词:一次只处理2个种子词,插入到baidu_keyword表中且crawled=0"""
|
||
logger.info('\n========== 处理2个种子关键词 ==========')
|
||
|
||
# 获取一批种子词(2个)
|
||
seeds = get_seed_keywords_batch(batch_size=2)
|
||
|
||
if not seeds:
|
||
logger.info('当前无未处理的种子词')
|
||
return False
|
||
|
||
logger.info(f'\n[批次开始] 获取到 {len(seeds)} 个种子词:{[s[1] for s in seeds]}')
|
||
|
||
for seed_id, seed_keyword in seeds:
|
||
# 将种子词插入到baidu_keyword表中,crawled=0, parents_id=0
|
||
insert_keywords(
|
||
[seed_keyword],
|
||
parent_id=0,
|
||
seed_id=seed_id,
|
||
seed_name=seed_keyword
|
||
)
|
||
|
||
# 更新种子表中的状态为doing
|
||
try:
|
||
sql = "UPDATE baidu_seed_keywords SET status='doing' WHERE id=%s"
|
||
db_manager.execute_update(sql, (seed_id,), autocommit=True)
|
||
|
||
logger.info(f' ✓ 种子词已插入: {seed_keyword} (种子ID:{seed_id}, crawled=0, parents_id=0, status=doing)')
|
||
except Exception as e:
|
||
logger.error(f'更新种子表状态失败:{e}')
|
||
|
||
logger.info(f'[批次完成] 本批 {len(seeds)} 个种子词已插入baidu_keyword表,等待baidu_crawl.py处理')
|
||
return True
|
||
|
||
|
||
def check_and_mark_finished_seeds():
|
||
"""检查并标记已完成的种子:
|
||
- 条件1:seed_id下没有crawled=0的记录
|
||
- 条件2:seed_id下crawled=1的最后一条记录超过30分钟
|
||
- 满足以上两个条件,则将parents_id=0且crawled=2的记录更新为crawled=1
|
||
- 同步更新 baidu_seed_keywords 表
|
||
"""
|
||
try:
|
||
# 查找所有有 parents_id=0, crawled=2 的种子(候选种子)
|
||
sql = """SELECT DISTINCT seed_id, seed_name
|
||
FROM baidu_keyword
|
||
WHERE parents_id=0 AND crawled=2 AND seed_id>0"""
|
||
candidate_seeds = db_manager.execute_query(sql)
|
||
|
||
if not candidate_seeds:
|
||
return 0
|
||
|
||
logger.info(f'\n[检查候选种子] 找到 {len(candidate_seeds)} 个候选种子,开始检查完成条件...')
|
||
|
||
marked_count = 0
|
||
for seed_id, seed_name in candidate_seeds:
|
||
try:
|
||
# 条件1:检查是否有 crawled=0 的记录
|
||
sql = "SELECT COUNT(*) FROM baidu_keyword WHERE seed_id=%s AND crawled=0"
|
||
result = db_manager.execute_query(sql, (seed_id,), fetch_one=True)
|
||
count_crawled_0 = result[0] if result else 0
|
||
|
||
if count_crawled_0 > 0:
|
||
logger.info(f' - 种子{seed_id}({seed_name}) 还有 {count_crawled_0} 条crawled=0的记录,跳过')
|
||
continue
|
||
|
||
# 条件2:检查 crawled=1 的最后一条记录是否超过30分钟
|
||
sql = """SELECT created_at
|
||
FROM baidu_keyword
|
||
WHERE seed_id=%s AND crawled=1
|
||
ORDER BY created_at DESC
|
||
LIMIT 1"""
|
||
last_record = db_manager.execute_query(sql, (seed_id,), fetch_one=True)
|
||
|
||
if not last_record:
|
||
logger.warning(f' - 种子{seed_id}({seed_name}) 没有crawled=1的记录,跳过')
|
||
continue
|
||
|
||
last_created_at = last_record[0]
|
||
time_diff = (datetime.now() - last_created_at).total_seconds()
|
||
|
||
if time_diff <= 1800: # 30分钟 = 1800秒
|
||
logger.info(f' - 种子{seed_id}({seed_name}) 最后一条记录时间差:{time_diff/60:.1f}分钟,未超过30分钟,跳过')
|
||
continue
|
||
|
||
# 满足两个条件,开始标记完成
|
||
logger.info(f' ✓ 种子{seed_id}({seed_name}) 满足完成条件:')
|
||
logger.info(f' - crawled=0记录数: 0')
|
||
logger.info(f' - 最后一条记录时间差: {time_diff/60:.1f}分钟')
|
||
|
||
# 更新baidu_keyword表:将parents_id=0且crawled=2改为1
|
||
sql1 = "UPDATE baidu_keyword SET crawled=1 WHERE parents_id=0 AND seed_id=%s AND crawled=2"
|
||
result_count = db_manager.execute_update(sql1, (seed_id,), autocommit=True)
|
||
logger.info(f' - 已将 {result_count} 条parents_id=0的记录从crawled=2更新为1')
|
||
|
||
# 更新baidu_seed_keywords表
|
||
sql2 = "UPDATE baidu_seed_keywords SET crawled=1, status='finished' WHERE id=%s"
|
||
db_manager.execute_update(sql2, (seed_id,), autocommit=True)
|
||
|
||
logger.info(f' ✓ 种子{seed_id}({seed_name}) 已标记为完成,status=finished')
|
||
marked_count += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f' ✗ 种子{seed_id} 检查失败:{e}')
|
||
|
||
return marked_count
|
||
except Exception as e:
|
||
logger.error(f'检查已完成种子失败:{e}')
|
||
return 0
|
||
|
||
|
||
def main():
|
||
"""
|
||
主流程:
|
||
1. 监听baidu_seed_keywords表,获取2个种子同步到baidu_keyword表
|
||
2. 监听baidu_keyword表,检查并标记已完成的种子
|
||
3. 循环执行,每3秒检查一次
|
||
"""
|
||
logger.info('='*70)
|
||
logger.info('百度种子词监听系统启动 - ' + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
|
||
logger.info('='*70)
|
||
|
||
try:
|
||
cycle_count = 0
|
||
|
||
while True:
|
||
cycle_count += 1
|
||
logger.info(f'\n\n{"="*70}')
|
||
logger.info(f'【第{cycle_count}轮循环】开始监听')
|
||
logger.info(f'{"="*70}')
|
||
|
||
|
||
# (2)监听baidu_keyword,检查并标记已完成的种子
|
||
marked_count = check_and_mark_finished_seeds()
|
||
if marked_count > 0:
|
||
logger.info(f'\n✅ 本轮共标记 {marked_count} 个种子为完成')
|
||
|
||
# (1)监听baidu_seed_keywords,处理2个种子词
|
||
has_seeds = process_seed_keywords()
|
||
|
||
# 等待3秒后继续下一轮
|
||
logger.info(f'\n{"="*70}')
|
||
logger.info(f'【第{cycle_count}轮循环】完成,3秒后继续...')
|
||
logger.info(f'{"="*70}\n')
|
||
time.sleep(10)
|
||
|
||
except KeyboardInterrupt:
|
||
logger.info('\n接收到停止信号,程序退出')
|
||
except Exception as e:
|
||
logger.error(f'程序异常退出:{e}', exc_info=True)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|