import requests from bs4 import BeautifulSoup import time import logging from datetime import datetime from multiprocessing import Process from log_config import setup_baidu_seed_logger from database_config import db_manager # 使用统一的日志配置 logger = setup_baidu_seed_logger() def init_db(): # 只做连接关闭,去除建表语句 pass def insert_keywords(words, parent_id=0, seed_id=0, seed_name=''): """插入关键词到baidu_keyword表(自动提交模式,高效快速) Args: words: 关键词列表 parent_id: 父层级ID seed_id: 种子ID seed_name: 种子名称 """ if not words: return try: # (1)先强制删除 parents_id=0 且 seed_id=当前ID 的记录 delete_sql = "DELETE FROM baidu_keyword WHERE parents_id=0 AND seed_id=%s LIMIT 1" deleted = db_manager.execute_update(delete_sql, (seed_id,), autocommit=True) if deleted > 0: logger.info(f'[删除旧记录] 种子ID:{seed_id}, parents_id=0, 删除{deleted}条') inserted_count = 0 for word in words: # (2)插入新记录,不使用 IGNORE,重复就报错 insert_sql = """INSERT INTO baidu_keyword (keyword, parents_id, seed_id, seed_name) VALUES (%s, %s, %s, %s)""" try: db_manager.execute_update(insert_sql, (word, parent_id, seed_id, seed_name), autocommit=True) inserted_count += 1 logger.info(f'[插入成功] {word} (父ID:{parent_id}, 种子ID:{seed_id})') except Exception as insert_err: logger.warning(f'[插入失败] {word} - {insert_err}') if inserted_count > 0: logger.info(f'成功插入 {inserted_count}/{len(words)} 个新关键词 (父ID:{parent_id}, 种子:{seed_name})') except Exception as e: logger.error(f'插入关键词异常:{e},父ID:{parent_id},种子:{seed_name}') def get_seed_keywords_batch(batch_size=4): """从 baidu_seed_keywords表获取一批未抓取的种子词 Args: batch_size: 批次大小 Returns: list: 种子词列表,每个元素为(id, keyword) """ try: sql = "SELECT id, keyword FROM baidu_seed_keywords WHERE crawled=0 AND status = 'ready' LIMIT %s" results = db_manager.execute_query(sql, (batch_size,)) return results or [] except Exception as e: logger.error(f'查询种子词失败:{e}') return [] def get_keyword_info(keyword): """获取关键词的详细信息 Args: keyword: 关键词 Returns: tuple: (id, keyword, parents_id, seed_id, seed_name) 或 None """ try: sql = "SELECT id, keyword, parents_id, seed_id, seed_name FROM baidu_keyword WHERE keyword=%s" result = db_manager.execute_query(sql, (keyword,), fetch_one=True) return result except Exception as e: logger.error(f'查询关键词信息失败:{e}') return None def process_seed_keywords(): """处理种子关键词:一次只处理2个种子词,插入到baidu_keyword表中且crawled=0""" logger.info('\n========== 处理2个种子关键词 ==========') # 获取一批种子词(2个) seeds = get_seed_keywords_batch(batch_size=2) if not seeds: logger.info('当前无未处理的种子词') return False logger.info(f'\n[批次开始] 获取到 {len(seeds)} 个种子词:{[s[1] for s in seeds]}') for seed_id, seed_keyword in seeds: # 将种子词插入到baidu_keyword表中,crawled=0, parents_id=0 insert_keywords( [seed_keyword], parent_id=0, seed_id=seed_id, seed_name=seed_keyword ) # 更新种子表中的状态为doing try: sql = "UPDATE baidu_seed_keywords SET status='doing' WHERE id=%s" db_manager.execute_update(sql, (seed_id,), autocommit=True) logger.info(f' ✓ 种子词已插入: {seed_keyword} (种子ID:{seed_id}, crawled=0, parents_id=0, status=doing)') except Exception as e: logger.error(f'更新种子表状态失败:{e}') logger.info(f'[批次完成] 本批 {len(seeds)} 个种子词已插入baidu_keyword表,等待baidu_crawl.py处理') return True def check_and_mark_finished_seeds(): """检查并标记已完成的种子: - 条件1:seed_id下没有crawled=0的记录 - 条件2:seed_id下crawled=1的最后一条记录超过30分钟 - 满足以上两个条件,则将parents_id=0且crawled=2的记录更新为crawled=1 - 同步更新 baidu_seed_keywords 表 """ try: # 查找所有有 parents_id=0, crawled=2 的种子(候选种子) sql = """SELECT DISTINCT seed_id, seed_name FROM baidu_keyword WHERE parents_id=0 AND crawled=2 AND seed_id>0""" candidate_seeds = db_manager.execute_query(sql) if not candidate_seeds: return 0 logger.info(f'\n[检查候选种子] 找到 {len(candidate_seeds)} 个候选种子,开始检查完成条件...') marked_count = 0 for seed_id, seed_name in candidate_seeds: try: # 条件1:检查是否有 crawled=0 的记录 sql = "SELECT COUNT(*) FROM baidu_keyword WHERE seed_id=%s AND crawled=0" result = db_manager.execute_query(sql, (seed_id,), fetch_one=True) count_crawled_0 = result[0] if result else 0 if count_crawled_0 > 0: logger.info(f' - 种子{seed_id}({seed_name}) 还有 {count_crawled_0} 条crawled=0的记录,跳过') continue # 条件2:检查 crawled=1 的最后一条记录是否超过30分钟 sql = """SELECT created_at FROM baidu_keyword WHERE seed_id=%s AND crawled=1 ORDER BY created_at DESC LIMIT 1""" last_record = db_manager.execute_query(sql, (seed_id,), fetch_one=True) if not last_record: logger.warning(f' - 种子{seed_id}({seed_name}) 没有crawled=1的记录,跳过') continue last_created_at = last_record[0] time_diff = (datetime.now() - last_created_at).total_seconds() if time_diff <= 1800: # 30分钟 = 1800秒 logger.info(f' - 种子{seed_id}({seed_name}) 最后一条记录时间差:{time_diff/60:.1f}分钟,未超过30分钟,跳过') continue # 满足两个条件,开始标记完成 logger.info(f' ✓ 种子{seed_id}({seed_name}) 满足完成条件:') logger.info(f' - crawled=0记录数: 0') logger.info(f' - 最后一条记录时间差: {time_diff/60:.1f}分钟') # 更新baidu_keyword表:将parents_id=0且crawled=2改为1 sql1 = "UPDATE baidu_keyword SET crawled=1 WHERE parents_id=0 AND seed_id=%s AND crawled=2" result_count = db_manager.execute_update(sql1, (seed_id,), autocommit=True) logger.info(f' - 已将 {result_count} 条parents_id=0的记录从crawled=2更新为1') # 更新baidu_seed_keywords表 sql2 = "UPDATE baidu_seed_keywords SET crawled=1, status='finished' WHERE id=%s" db_manager.execute_update(sql2, (seed_id,), autocommit=True) logger.info(f' ✓ 种子{seed_id}({seed_name}) 已标记为完成,status=finished') marked_count += 1 except Exception as e: logger.error(f' ✗ 种子{seed_id} 检查失败:{e}') return marked_count except Exception as e: logger.error(f'检查已完成种子失败:{e}') return 0 def main(): """ 主流程: 1. 监听baidu_seed_keywords表,获取2个种子同步到baidu_keyword表 2. 监听baidu_keyword表,检查并标记已完成的种子 3. 循环执行,每3秒检查一次 """ logger.info('='*70) logger.info('百度种子词监听系统启动 - ' + datetime.now().strftime('%Y-%m-%d %H:%M:%S')) logger.info('='*70) try: cycle_count = 0 while True: cycle_count += 1 logger.info(f'\n\n{"="*70}') logger.info(f'【第{cycle_count}轮循环】开始监听') logger.info(f'{"="*70}') # (2)监听baidu_keyword,检查并标记已完成的种子 marked_count = check_and_mark_finished_seeds() if marked_count > 0: logger.info(f'\n✅ 本轮共标记 {marked_count} 个种子为完成') # (1)监听baidu_seed_keywords,处理2个种子词 has_seeds = process_seed_keywords() # 等待3秒后继续下一轮 logger.info(f'\n{"="*70}') logger.info(f'【第{cycle_count}轮循环】完成,3秒后继续...') logger.info(f'{"="*70}\n') time.sleep(10) except KeyboardInterrupt: logger.info('\n接收到停止信号,程序退出') except Exception as e: logger.error(f'程序异常退出:{e}', exc_info=True) if __name__ == '__main__': main()