Files

247 lines
9.7 KiB
Python
Raw Permalink Normal View History

2025-12-25 11:16:59 +08:00
import requests
from bs4 import BeautifulSoup
import time
import logging
from datetime import datetime
from multiprocessing import Process
from log_config import setup_baidu_seed_logger
from database_config import db_manager
# 使用统一的日志配置
logger = setup_baidu_seed_logger()
def init_db():
# 只做连接关闭,去除建表语句
pass
def insert_keywords(words, parent_id=0, seed_id=0, seed_name=''):
"""插入关键词到baidu_keyword表自动提交模式高效快速
Args:
words: 关键词列表
parent_id: 父层级ID
seed_id: 种子ID
seed_name: 种子名称
"""
if not words:
return
try:
# 1先强制删除 parents_id=0 且 seed_id=当前ID 的记录
delete_sql = "DELETE FROM baidu_keyword WHERE parents_id=0 AND seed_id=%s LIMIT 1"
deleted = db_manager.execute_update(delete_sql, (seed_id,), autocommit=True)
if deleted > 0:
logger.info(f'[删除旧记录] 种子ID:{seed_id}, parents_id=0, 删除{deleted}')
inserted_count = 0
for word in words:
# 2插入新记录不使用 IGNORE重复就报错
insert_sql = """INSERT INTO baidu_keyword
(keyword, parents_id, seed_id, seed_name)
VALUES (%s, %s, %s, %s)"""
try:
db_manager.execute_update(insert_sql, (word, parent_id, seed_id, seed_name), autocommit=True)
inserted_count += 1
logger.info(f'[插入成功] {word} (父ID:{parent_id}, 种子ID:{seed_id})')
except Exception as insert_err:
logger.warning(f'[插入失败] {word} - {insert_err}')
if inserted_count > 0:
logger.info(f'成功插入 {inserted_count}/{len(words)} 个新关键词 (父ID:{parent_id}, 种子:{seed_name})')
except Exception as e:
logger.error(f'插入关键词异常:{e}父ID:{parent_id},种子:{seed_name}')
def get_seed_keywords_batch(batch_size=4):
"""从 baidu_seed_keywords表获取一批未抓取的种子词
Args:
batch_size: 批次大小
Returns:
list: 种子词列表每个元素为(id, keyword)
"""
try:
sql = "SELECT id, keyword FROM baidu_seed_keywords WHERE crawled=0 AND status = 'ready' LIMIT %s"
results = db_manager.execute_query(sql, (batch_size,))
return results or []
except Exception as e:
logger.error(f'查询种子词失败:{e}')
return []
def get_keyword_info(keyword):
"""获取关键词的详细信息
Args:
keyword: 关键词
Returns:
tuple: (id, keyword, parents_id, seed_id, seed_name) None
"""
try:
sql = "SELECT id, keyword, parents_id, seed_id, seed_name FROM baidu_keyword WHERE keyword=%s"
result = db_manager.execute_query(sql, (keyword,), fetch_one=True)
return result
except Exception as e:
logger.error(f'查询关键词信息失败:{e}')
return None
def process_seed_keywords():
"""处理种子关键词一次只处理2个种子词插入到baidu_keyword表中且crawled=0"""
logger.info('\n========== 处理2个种子关键词 ==========')
# 获取一批种子词2个
seeds = get_seed_keywords_batch(batch_size=2)
if not seeds:
logger.info('当前无未处理的种子词')
return False
logger.info(f'\n[批次开始] 获取到 {len(seeds)} 个种子词:{[s[1] for s in seeds]}')
for seed_id, seed_keyword in seeds:
# 将种子词插入到baidu_keyword表中crawled=0, parents_id=0
insert_keywords(
[seed_keyword],
parent_id=0,
seed_id=seed_id,
seed_name=seed_keyword
)
# 更新种子表中的状态为doing
try:
sql = "UPDATE baidu_seed_keywords SET status='doing' WHERE id=%s"
db_manager.execute_update(sql, (seed_id,), autocommit=True)
logger.info(f' ✓ 种子词已插入: {seed_keyword} (种子ID:{seed_id}, crawled=0, parents_id=0, status=doing)')
except Exception as e:
logger.error(f'更新种子表状态失败:{e}')
logger.info(f'[批次完成] 本批 {len(seeds)} 个种子词已插入baidu_keyword表等待baidu_crawl.py处理')
return True
def check_and_mark_finished_seeds():
"""检查并标记已完成的种子:
- 条件1seed_id下没有crawled=0的记录
- 条件2seed_id下crawled=1的最后一条记录超过30分钟
- 满足以上两个条件则将parents_id=0且crawled=2的记录更新为crawled=1
- 同步更新 baidu_seed_keywords
"""
try:
# 查找所有有 parents_id=0, crawled=2 的种子(候选种子)
sql = """SELECT DISTINCT seed_id, seed_name
FROM baidu_keyword
WHERE parents_id=0 AND crawled=2 AND seed_id>0"""
candidate_seeds = db_manager.execute_query(sql)
if not candidate_seeds:
return 0
logger.info(f'\n[检查候选种子] 找到 {len(candidate_seeds)} 个候选种子,开始检查完成条件...')
marked_count = 0
for seed_id, seed_name in candidate_seeds:
try:
# 条件1检查是否有 crawled=0 的记录
sql = "SELECT COUNT(*) FROM baidu_keyword WHERE seed_id=%s AND crawled=0"
result = db_manager.execute_query(sql, (seed_id,), fetch_one=True)
count_crawled_0 = result[0] if result else 0
if count_crawled_0 > 0:
logger.info(f' - 种子{seed_id}({seed_name}) 还有 {count_crawled_0} 条crawled=0的记录跳过')
continue
# 条件2检查 crawled=1 的最后一条记录是否超过30分钟
sql = """SELECT created_at
FROM baidu_keyword
WHERE seed_id=%s AND crawled=1
ORDER BY created_at DESC
LIMIT 1"""
last_record = db_manager.execute_query(sql, (seed_id,), fetch_one=True)
if not last_record:
logger.warning(f' - 种子{seed_id}({seed_name}) 没有crawled=1的记录跳过')
continue
last_created_at = last_record[0]
time_diff = (datetime.now() - last_created_at).total_seconds()
if time_diff <= 1800: # 30分钟 = 1800秒
logger.info(f' - 种子{seed_id}({seed_name}) 最后一条记录时间差:{time_diff/60:.1f}分钟未超过30分钟跳过')
continue
# 满足两个条件,开始标记完成
logger.info(f' ✓ 种子{seed_id}({seed_name}) 满足完成条件:')
logger.info(f' - crawled=0记录数: 0')
logger.info(f' - 最后一条记录时间差: {time_diff/60:.1f}分钟')
# 更新baidu_keyword表将parents_id=0且crawled=2改为1
sql1 = "UPDATE baidu_keyword SET crawled=1 WHERE parents_id=0 AND seed_id=%s AND crawled=2"
result_count = db_manager.execute_update(sql1, (seed_id,), autocommit=True)
logger.info(f' - 已将 {result_count} 条parents_id=0的记录从crawled=2更新为1')
# 更新baidu_seed_keywords表
sql2 = "UPDATE baidu_seed_keywords SET crawled=1, status='finished' WHERE id=%s"
db_manager.execute_update(sql2, (seed_id,), autocommit=True)
logger.info(f' ✓ 种子{seed_id}({seed_name}) 已标记为完成status=finished')
marked_count += 1
except Exception as e:
logger.error(f' ✗ 种子{seed_id} 检查失败:{e}')
return marked_count
except Exception as e:
logger.error(f'检查已完成种子失败:{e}')
return 0
def main():
"""
主流程
1. 监听baidu_seed_keywords表获取2个种子同步到baidu_keyword表
2. 监听baidu_keyword表检查并标记已完成的种子
3. 循环执行每3秒检查一次
"""
logger.info('='*70)
logger.info('百度种子词监听系统启动 - ' + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
logger.info('='*70)
try:
cycle_count = 0
while True:
cycle_count += 1
logger.info(f'\n\n{"="*70}')
logger.info(f'【第{cycle_count}轮循环】开始监听')
logger.info(f'{"="*70}')
# 2监听baidu_keyword检查并标记已完成的种子
marked_count = check_and_mark_finished_seeds()
if marked_count > 0:
logger.info(f'\n✅ 本轮共标记 {marked_count} 个种子为完成')
# 1监听baidu_seed_keywords处理2个种子词
has_seeds = process_seed_keywords()
# 等待3秒后继续下一轮
logger.info(f'\n{"="*70}')
logger.info(f'【第{cycle_count}轮循环】完成3秒后继续...')
logger.info(f'{"="*70}\n')
time.sleep(10)
except KeyboardInterrupt:
logger.info('\n接收到停止信号,程序退出')
except Exception as e:
logger.error(f'程序异常退出:{e}', exc_info=True)
if __name__ == '__main__':
main()