Files
“shengyudong” 322ac74336 2025-12-25 upload
2025-12-25 11:16:59 +08:00

247 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from bs4 import BeautifulSoup
import time
import logging
from datetime import datetime
from multiprocessing import Process
from log_config import setup_baidu_seed_logger
from database_config import db_manager
# 使用统一的日志配置
logger = setup_baidu_seed_logger()
def init_db():
# 只做连接关闭,去除建表语句
pass
def insert_keywords(words, parent_id=0, seed_id=0, seed_name=''):
"""插入关键词到baidu_keyword表自动提交模式高效快速
Args:
words: 关键词列表
parent_id: 父层级ID
seed_id: 种子ID
seed_name: 种子名称
"""
if not words:
return
try:
# 1先强制删除 parents_id=0 且 seed_id=当前ID 的记录
delete_sql = "DELETE FROM baidu_keyword WHERE parents_id=0 AND seed_id=%s LIMIT 1"
deleted = db_manager.execute_update(delete_sql, (seed_id,), autocommit=True)
if deleted > 0:
logger.info(f'[删除旧记录] 种子ID:{seed_id}, parents_id=0, 删除{deleted}')
inserted_count = 0
for word in words:
# 2插入新记录不使用 IGNORE重复就报错
insert_sql = """INSERT INTO baidu_keyword
(keyword, parents_id, seed_id, seed_name)
VALUES (%s, %s, %s, %s)"""
try:
db_manager.execute_update(insert_sql, (word, parent_id, seed_id, seed_name), autocommit=True)
inserted_count += 1
logger.info(f'[插入成功] {word} (父ID:{parent_id}, 种子ID:{seed_id})')
except Exception as insert_err:
logger.warning(f'[插入失败] {word} - {insert_err}')
if inserted_count > 0:
logger.info(f'成功插入 {inserted_count}/{len(words)} 个新关键词 (父ID:{parent_id}, 种子:{seed_name})')
except Exception as e:
logger.error(f'插入关键词异常:{e}父ID:{parent_id},种子:{seed_name}')
def get_seed_keywords_batch(batch_size=4):
"""从 baidu_seed_keywords表获取一批未抓取的种子词
Args:
batch_size: 批次大小
Returns:
list: 种子词列表,每个元素为(id, keyword)
"""
try:
sql = "SELECT id, keyword FROM baidu_seed_keywords WHERE crawled=0 AND status = 'ready' LIMIT %s"
results = db_manager.execute_query(sql, (batch_size,))
return results or []
except Exception as e:
logger.error(f'查询种子词失败:{e}')
return []
def get_keyword_info(keyword):
"""获取关键词的详细信息
Args:
keyword: 关键词
Returns:
tuple: (id, keyword, parents_id, seed_id, seed_name) 或 None
"""
try:
sql = "SELECT id, keyword, parents_id, seed_id, seed_name FROM baidu_keyword WHERE keyword=%s"
result = db_manager.execute_query(sql, (keyword,), fetch_one=True)
return result
except Exception as e:
logger.error(f'查询关键词信息失败:{e}')
return None
def process_seed_keywords():
"""处理种子关键词一次只处理2个种子词插入到baidu_keyword表中且crawled=0"""
logger.info('\n========== 处理2个种子关键词 ==========')
# 获取一批种子词2个
seeds = get_seed_keywords_batch(batch_size=2)
if not seeds:
logger.info('当前无未处理的种子词')
return False
logger.info(f'\n[批次开始] 获取到 {len(seeds)} 个种子词:{[s[1] for s in seeds]}')
for seed_id, seed_keyword in seeds:
# 将种子词插入到baidu_keyword表中crawled=0, parents_id=0
insert_keywords(
[seed_keyword],
parent_id=0,
seed_id=seed_id,
seed_name=seed_keyword
)
# 更新种子表中的状态为doing
try:
sql = "UPDATE baidu_seed_keywords SET status='doing' WHERE id=%s"
db_manager.execute_update(sql, (seed_id,), autocommit=True)
logger.info(f' ✓ 种子词已插入: {seed_keyword} (种子ID:{seed_id}, crawled=0, parents_id=0, status=doing)')
except Exception as e:
logger.error(f'更新种子表状态失败:{e}')
logger.info(f'[批次完成] 本批 {len(seeds)} 个种子词已插入baidu_keyword表等待baidu_crawl.py处理')
return True
def check_and_mark_finished_seeds():
"""检查并标记已完成的种子:
- 条件1seed_id下没有crawled=0的记录
- 条件2seed_id下crawled=1的最后一条记录超过30分钟
- 满足以上两个条件则将parents_id=0且crawled=2的记录更新为crawled=1
- 同步更新 baidu_seed_keywords 表
"""
try:
# 查找所有有 parents_id=0, crawled=2 的种子(候选种子)
sql = """SELECT DISTINCT seed_id, seed_name
FROM baidu_keyword
WHERE parents_id=0 AND crawled=2 AND seed_id>0"""
candidate_seeds = db_manager.execute_query(sql)
if not candidate_seeds:
return 0
logger.info(f'\n[检查候选种子] 找到 {len(candidate_seeds)} 个候选种子,开始检查完成条件...')
marked_count = 0
for seed_id, seed_name in candidate_seeds:
try:
# 条件1检查是否有 crawled=0 的记录
sql = "SELECT COUNT(*) FROM baidu_keyword WHERE seed_id=%s AND crawled=0"
result = db_manager.execute_query(sql, (seed_id,), fetch_one=True)
count_crawled_0 = result[0] if result else 0
if count_crawled_0 > 0:
logger.info(f' - 种子{seed_id}({seed_name}) 还有 {count_crawled_0} 条crawled=0的记录跳过')
continue
# 条件2检查 crawled=1 的最后一条记录是否超过30分钟
sql = """SELECT created_at
FROM baidu_keyword
WHERE seed_id=%s AND crawled=1
ORDER BY created_at DESC
LIMIT 1"""
last_record = db_manager.execute_query(sql, (seed_id,), fetch_one=True)
if not last_record:
logger.warning(f' - 种子{seed_id}({seed_name}) 没有crawled=1的记录跳过')
continue
last_created_at = last_record[0]
time_diff = (datetime.now() - last_created_at).total_seconds()
if time_diff <= 1800: # 30分钟 = 1800秒
logger.info(f' - 种子{seed_id}({seed_name}) 最后一条记录时间差:{time_diff/60:.1f}分钟未超过30分钟跳过')
continue
# 满足两个条件,开始标记完成
logger.info(f' ✓ 种子{seed_id}({seed_name}) 满足完成条件:')
logger.info(f' - crawled=0记录数: 0')
logger.info(f' - 最后一条记录时间差: {time_diff/60:.1f}分钟')
# 更新baidu_keyword表将parents_id=0且crawled=2改为1
sql1 = "UPDATE baidu_keyword SET crawled=1 WHERE parents_id=0 AND seed_id=%s AND crawled=2"
result_count = db_manager.execute_update(sql1, (seed_id,), autocommit=True)
logger.info(f' - 已将 {result_count} 条parents_id=0的记录从crawled=2更新为1')
# 更新baidu_seed_keywords表
sql2 = "UPDATE baidu_seed_keywords SET crawled=1, status='finished' WHERE id=%s"
db_manager.execute_update(sql2, (seed_id,), autocommit=True)
logger.info(f' ✓ 种子{seed_id}({seed_name}) 已标记为完成status=finished')
marked_count += 1
except Exception as e:
logger.error(f' ✗ 种子{seed_id} 检查失败:{e}')
return marked_count
except Exception as e:
logger.error(f'检查已完成种子失败:{e}')
return 0
def main():
"""
主流程:
1. 监听baidu_seed_keywords表获取2个种子同步到baidu_keyword表
2. 监听baidu_keyword表检查并标记已完成的种子
3. 循环执行每3秒检查一次
"""
logger.info('='*70)
logger.info('百度种子词监听系统启动 - ' + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
logger.info('='*70)
try:
cycle_count = 0
while True:
cycle_count += 1
logger.info(f'\n\n{"="*70}')
logger.info(f'【第{cycle_count}轮循环】开始监听')
logger.info(f'{"="*70}')
# 2监听baidu_keyword检查并标记已完成的种子
marked_count = check_and_mark_finished_seeds()
if marked_count > 0:
logger.info(f'\n✅ 本轮共标记 {marked_count} 个种子为完成')
# 1监听baidu_seed_keywords处理2个种子词
has_seeds = process_seed_keywords()
# 等待3秒后继续下一轮
logger.info(f'\n{"="*70}')
logger.info(f'【第{cycle_count}轮循环】完成3秒后继续...')
logger.info(f'{"="*70}\n')
time.sleep(10)
except KeyboardInterrupt:
logger.info('\n接收到停止信号,程序退出')
except Exception as e:
logger.error(f'程序异常退出:{e}', exc_info=True)
if __name__ == '__main__':
main()