import requests from bs4 import BeautifulSoup import time import logging from datetime import datetime from multiprocessing import Process import os from log_config import setup_baidu_crawl_logger from database_config import db_manager # 禁用系统代理环境变量 os.environ['NO_PROXY'] = '*' os.environ['no_proxy'] = '*' if 'HTTP_PROXY' in os.environ: del os.environ['HTTP_PROXY'] if 'HTTPS_PROXY' in os.environ: del os.environ['HTTPS_PROXY'] if 'http_proxy' in os.environ: del os.environ['http_proxy'] if 'https_proxy' in os.environ: del os.environ['https_proxy'] # 使用统一的日志配置 logger = setup_baidu_crawl_logger() # 简单的代理获取配置 - 大麦代理IP PROXY_API_URL = ( 'https://api2.damaiip.com/index.php?s=/front/user/getIPlist&xsn=2912cb2b22d3b7ae724f045012790479&osn=TC_NO176707424165606223&tiqu=1' ) # 大麦代理账号密码认证 PROXY_USERNAME = '69538fdef04e1' PROXY_PASSWORD = '63v0kQBr2yJXnjf' # 备用固定代理IP池(格式:'IP:端口', '用户名', '密码') BACKUP_PROXY_POOL = [ {'ip': '61.171.69.167:50000', 'user': '6jinnh', 'password': 'fi9k7q5d'}, {'ip': '36.213.32.122:50001', 'user': '9w6xpg', 'password': 'tqswr1ee'}, ] def init_db(): # 只做连接关闭,去除建表语句 pass def insert_keywords(words, parent_id=0, seed_id=0, seed_name=''): """插入关键词到baidu_keyword表(自动提交模式,高效快速) Args: words: 关键词列表 parent_id: 父层级ID seed_id: 种子ID seed_name: 种子名称 """ if not words: return try: sql = """INSERT IGNORE INTO baidu_keyword (keyword, parents_id, seed_id, seed_name) VALUES (%s, %s, %s, %s)""" params_list = [(word, parent_id, seed_id, seed_name) for word in words] inserted_count = db_manager.execute_many(sql, params_list, autocommit=True) if inserted_count > 0: logger.info(f'成功插入 {inserted_count}/{len(words)} 个新关键词 (父ID:{parent_id}, 种子:{seed_name})') except Exception as e: logger.error(f'插入关键词异常:{e},父ID:{parent_id},种子:{seed_name}') def get_seed_keywords_batch(batch_size=4): """从baidu_seed_keywords表获取一批未抓取的种子词 Args: batch_size: 批次大小 Returns: list: 种子词列表,每个元素为(id, keyword) """ try: conn = pymysql.connect(**DB_CONFIG) cursor = conn.cursor() cursor.execute( "SELECT id, keyword FROM baidu_seed_keywords WHERE crawled=0 LIMIT %s", (batch_size,) ) results = cursor.fetchall() cursor.close() conn.close() logger.info(f'从种子表获取到 {len(results)} 个未抓取的种子词') return results except Exception as e: logger.error(f'查询种子词失败:{e}') return [] def get_keyword_info(keyword): """获取关键词的详细信息 Args: keyword: 关键词 Returns: tuple: (id, keyword, parents_id, seed_id, seed_name) 或 None """ try: sql = "SELECT id, keyword, parents_id, seed_id, seed_name FROM baidu_keyword WHERE keyword=%s" result = db_manager.execute_query(sql, (keyword,), fetch_one=True) return result except Exception as e: logger.error(f'查询关键词信息失败:{e}') return None def export_all_keywords(filename='seed_pool.txt'): try: sql = "SELECT keyword FROM baidu_keyword" results = db_manager.execute_query(sql) all_words = [row[0] for row in results] if results else [] with open(filename, 'w', encoding='utf-8') as f: for word in sorted(all_words): f.write(word + '\n') logger.info(f'已导出 {len(all_words)} 个关键词到 {filename}') except Exception as e: logger.error(f'导出关键词失败:{e}') def fetch_proxy(custom_logger=None): """从代理服务获取一个可用代理,失败时使用备用固定代理。 Args: custom_logger: 自定义logger,用于多进程环境 """ log = custom_logger if custom_logger else logger try: # 使用大麦代理API获取IP resp = requests.get(PROXY_API_URL, timeout=10) resp.raise_for_status() # 尝试解析JSON格式 try: result = resp.json() if result.get('code') == 0 and result.get('data'): ip_list = result['data'] if ip_list and len(ip_list) > 0: ip_info = ip_list[0] ip_port = f"{ip_info['ip']}:{ip_info['port']}" nowtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) log.info(f'提取大麦代理IP: {ip_port}_{nowtime}') # 构建带账密的代理URL proxy_url = f'http://{PROXY_USERNAME}:{PROXY_PASSWORD}@{ip_info["ip"]}:{ip_info["port"]}' return { 'http': proxy_url, 'https': proxy_url, } except: pass # 如果不是JSON格式,尝试解析纯文本格式 ip_port = resp.text.strip().split('\n', 1)[0] if ':' in ip_port: ip_message = resp.text.strip() nowtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) log.info(f'提取大麦代理IP: {ip_port}_{nowtime}') log.info(f'提取IPMessage: {ip_message}') host, port = ip_port.split(':', 1) proxy_url = f'http://{PROXY_USERNAME}:{PROXY_PASSWORD}@{host}:{port}' return { 'http': proxy_url, 'https': proxy_url, } except Exception as exc: # noqa: BLE001 log.warning(f'大麦代理API获取失败:{exc},使用备用固定代理池') # 从备用代理池随机选择一个(支持账密认证) import random backup_proxy = random.choice(BACKUP_PROXY_POOL) ip_port = backup_proxy['ip'] username = backup_proxy['user'] password = backup_proxy['password'] # 构建带账密的代理URL: http://username:password@host:port host, port = ip_port.split(':', 1) proxy_url = f'http://{username}:{password}@{host}:{port}' log.info(f'使用备用代理:{username}@{ip_port}') return { 'http': proxy_url, 'https': proxy_url, } def fetch_related_words(keyword, proxies=None): cookies = { # ...existing cookies... 'PSTM': '1764302604', 'BAIDUID': '17E56B6A4915D5B98222C8D7A7CFF059:FG=1', 'BD_HOME': '1', 'H_PS_PSSID': '63140_64007_65866_66117_66218_66194_66236_66243_66168_66362_66281_66264_66393_66395_66479_66510_66529_66553_66589_66590_66602_66614_66647_66679_66692_66695_66687', 'delPer': '0', 'BD_CK_SAM': '1', 'PSINO': '3', 'BAIDUID_BFESS': '17E56B6A4915D5B98222C8D7A7CFF059:FG=1', 'PAD_BROWSER': '1', 'BD_UPN': '12314753', 'BDORZ': 'B490B5EBF6F3CD402E515D22BCDA1598', 'BA_HECTOR': 'ala10k2421a0ag25a5a40524a10l8n1kii7of24', 'BIDUPSID': 'C047CB4D757AC8632D7B5792A4254C89', 'ZFY': 'hLpeh2:BHPDeKfEN3yuM7C:A7dmFl03pP:AkeekLlPw5J4:C', 'channel': 'baidusearch', 'H_WISE_SIDS': '63140_64007_65866_66117_66218_66194_66236_66243_66168_66362_66281_66264_66393_66395_66479_66510_66529_66553_66589_66590_66602_66614_66647_66679_66692_66695_66687', 'baikeVisitId': '7f510782-16ce-4371-ad1d-cc8c0ba5ccc8', 'COOKIE_SESSION': '0_0_1_0_0_0_1_0_1_1_7462_1_0_0_0_0_0_0_1764302605%7C1%230_0_1764302605%7C1', 'H_PS_645EC': 'c7554ktAJah5Z6fmLi0RDEpB3a2TvS0rgHEQ7JP12K2UeBuFhGHrlxODIbY', 'BDSVRTM': '16', 'WWW_ST': '1764310101036', } from urllib.parse import quote headers = { 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Referer': f'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&tn=baidu&wd={quote(keyword)}', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36', 'X-Requested-With': 'XMLHttpRequest', 'is_xhr': '1', 'sec-ch-ua': '"Chromium";v="142", "Google Chrome";v="142", "Not_A Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', } params = [ ('ie', 'utf-8'), ('mod', '1'), ('isbd', '1'), ('isid', 'b857f27e00205440'), ('ie', 'utf-8'), ('f', '8'), ('rsv_bp', '1'), ('tn', 'baidu'), ('wd', keyword), ('oq', keyword), ('rsv_pq', 'b857f27e00205440'), ('rsv_t', 'c7554ktAJah5Z6fmLi0RDEpB3a2TvS0rgHEQ7JP12K2UeBuFhGHrlxODIbY'), ('rqlang', 'cn'), ('rsv_enter', '1'), ('rsv_dl', 'tb'), ('rsv_btype', 't'), ('inputT', '23852'), ('rsv_sug2', '0'), ('rsv_sug3', '15'), ('rsv_sug1', '23'), ('rsv_sug7', '100'), ('rsv_sug4', '23852'), ('bs', keyword), ('rsv_sid', 'undefined'), ('_ss', '1'), ('clist', 'ddad409c4a1855aa'), ('hsug', ''), ('f4s', '1'), ('csor', '3'), ('_cr1', '35542'), ] response = requests.get( 'https://www.baidu.com/s', params=params, cookies=cookies, headers=headers, proxies=proxies, timeout=10, ) soup = BeautifulSoup(response.text, 'html.parser') result = [] div = soup.find('div', class_='list_1V4Yg') if div: for a in div.find_all('a', class_='item_3WKCf'): spans = a.find_all('span') if len(spans) > 1: result.append(spans[1].get_text(strip=True)) related_search = [] rs_label = soup.find('div', class_='c-color-t rs-label_ihUhK') if rs_label: rs_table = rs_label.find_next('table', class_='rs-table_3RiQc') if rs_table: for a in rs_table.find_all('a', class_='rs-link_2DE3Q'): span = a.find('span', class_='rs-text_3K5mR') if span: related_search.append(span.get_text(strip=True)) return result, related_search def crawl_keyword_worker(keyword, parent_id, seed_id, seed_name, is_seed=False): """单个关键词抓取工作进程 Args: keyword: 要抓取的关键词 parent_id: 父层级ID(新抓取词的父ID,就是当前关键词的ID) seed_id: 种子ID seed_name: 种子名称 is_seed: 是否为种子词 """ # 数据验证:确保 seed_id 和 seed_name 不为空 if not seed_id or seed_id == 0: logger.error(f'[数据错误] {keyword} 的 seed_id 为空或0,跳过抓取') return if not seed_name or seed_name.strip() == '': logger.error(f'[数据错误] {keyword} 的 seed_name 为空,跳过抓取') return logger.info(f'[进程启动] 正在抓取:{keyword} (ID作为新词父ID:{parent_id}, 种子ID:{seed_id}, 种子名:{seed_name})') try: proxies = fetch_proxy() words, related_search = fetch_related_words(keyword, proxies=proxies) new_words = set(words + related_search) logger.info(f'[抓取结果] {keyword} -> 推荐词:{len(words)}个, 相关搜索:{len(related_search)}个') # 插入新关键词:parent_id=当前关键词的ID,seed_id和seed_name延续 if new_words: insert_keywords(new_words, parent_id, seed_id, seed_name) logger.info(f'[成功] {keyword} -> 抓取到 {len(new_words)} 个相关词并入库 (子词父ID:{parent_id}, 种子:{seed_name})') else: logger.warning(f'[空结果] {keyword} -> 未抓取到任何相关词') # 标记为已抓取 if is_seed: # 更新种子表 sql = "UPDATE baidu_seed_keywords SET crawled=1 WHERE keyword=%s" db_manager.execute_update(sql, (keyword,), autocommit=True) logger.info(f'[标记完成] 种子词 {keyword} 已标记为已抓取') else: # 更新关键词表 sql = "UPDATE baidu_keyword SET crawled=1 WHERE keyword=%s" db_manager.execute_update(sql, (keyword,), autocommit=True) logger.info(f'[标记完成] 关键词 {keyword} 已标记为已抓取') except Exception as e: logger.error(f'[失败] 抓取失败:{keyword},种子:{seed_name},错误:{e}', exc_info=True) def process_seed_keywords(): """处理种子关键词:一次只处理2个种子词,插入到baidu_keyword表中且crawled=0""" logger.info('\n========== 处理2个种子关键词 ==========') # 获取一批种子词(2个) seeds = get_seed_keywords_batch(batch_size=2) if not seeds: logger.info('当前无未处理的种子词') return False logger.info(f'\n[批次开始] 获取到 {len(seeds)} 个种子词:{[s[1] for s in seeds]}') for seed_id, seed_keyword in seeds: # 将种子词插入到baidu_keyword表中,crawled=0, parents_id=0 insert_keywords( [seed_keyword], parent_id=0, seed_id=seed_id, seed_name=seed_keyword ) # 标记种子表中的种子词为已处理 try: sql = "UPDATE baidu_seed_keywords SET crawled=1 WHERE id=%s" db_manager.execute_update(sql, (seed_id,), autocommit=True) logger.info(f' ✓ 种子词已插入: {seed_keyword} (种子ID:{seed_id}, crawled=0, parents_id=0)') except Exception as e: logger.error(f'标记种子表失败:{e}') logger.info(f'[批次完成] 本批 {len(seeds)} 个种子词已插入baidu_keyword表,等待process_layer_keywords()处理') return True def process_seed_worker(seed_id, seed_name): """单个种子的工作进程:顺序处理该种子下的所有关键词 Args: seed_id: 种子ID seed_name: 种子名称 """ # 关键修复:子进程中重新初始化logger,确保错误日志正确写入文件 from log_config import setup_baidu_crawl_logger process_logger = setup_baidu_crawl_logger(force_reinit=True) process_logger.info(f'[进程启动] 种子ID:{seed_id}, 种子名:{seed_name} - 开始处理') while True: try: # 获取该种子下的未抓取关键词(按id ASC顺序) sql = """SELECT id, keyword, parents_id FROM baidu_keyword WHERE crawled=0 AND seed_id=%s ORDER BY id ASC LIMIT 1""" record = db_manager.execute_query(sql, (seed_id,), fetch_one=True) if not record: # 没有crawled=0的记录了,结束进程 process_logger.info(f'[种子{seed_id}] 没有crawled=0的记录,进程结束') break # 有记录需要处理 keyword_id, keyword, parent_id = record process_logger.info(f'[种子{seed_id}] 处理关键词: {keyword} (ID:{keyword_id}, 父ID:{parent_id})') try: # 抓取相关关键词 - 增加代理重试机制 max_retries = 3 retry_count = 0 words, related_search = [], [] while retry_count < max_retries: try: proxies = fetch_proxy(custom_logger=process_logger) words, related_search = fetch_related_words(keyword, proxies=proxies) break # 成功则退出重试循环 except Exception as retry_error: retry_count += 1 if retry_count < max_retries: process_logger.warning(f'[种子{seed_id}] {keyword} 第{retry_count}次尝试失败,重试中... 错误:{retry_error}') time.sleep(2) # 等待2秒再重试 else: process_logger.error(f'[种子{seed_id}] {keyword} 经过{max_retries}次重试仍然失败,放弃该关键词') raise # 重试失败,向外抛出异常 new_words = set(words + related_search) if new_words: insert_keywords(new_words, keyword_id, seed_id, seed_name) process_logger.info(f'[种子{seed_id}] {keyword} -> 抓取到 {len(new_words)} 个相关词') else: process_logger.info(f'[种子{seed_id}] {keyword} -> 未抓取到相关词') # 更新状态:parents_id=0标记为2,其他标记为1 if parent_id == 0: # parents_id=0,标记为crawled=2(临时状态) sql = "UPDATE baidu_keyword SET crawled=2 WHERE id=%s" db_manager.execute_update(sql, (keyword_id,), autocommit=True) process_logger.info(f'[种子{seed_id}] {keyword} parents_id=0,标记为crawled=2') else: # parents_id!=0,标记为crawled=1 sql = "UPDATE baidu_keyword SET crawled=1 WHERE id=%s" db_manager.execute_update(sql, (keyword_id,), autocommit=True) process_logger.info(f'[种子{seed_id}] {keyword} 已标记为crawled=1') except Exception as e: process_logger.error(f'[种子{seed_id}] 抓取失败:{keyword},错误:{e}', exc_info=True) # 失败也标记,避免卡住 try: if parent_id == 0: sql = "UPDATE baidu_keyword SET crawled=2 WHERE id=%s" db_manager.execute_update(sql, (keyword_id,), autocommit=True) else: sql = "UPDATE baidu_keyword SET crawled=1 WHERE id=%s" db_manager.execute_update(sql, (keyword_id,), autocommit=True) except: pass # 等待3秒再处理下一条 time.sleep(3) except Exception as e: process_logger.error(f'[种子{seed_id}] 进程异常:{e}', exc_info=True) time.sleep(3) process_logger.info(f'[进程结束] 种子ID:{seed_id}, 种子名:{seed_name} - 处理完成') def process_layer_keywords(): """循环监听处理关键词: 1. 获取待处理的关键词(crawled=0) 2. 按seed_id聚合,每个seed_id开启一个进程 3. 每个进程顺序处理该种子下的所有关键词 """ logger.info('\n========== 开始循环监听处理关键词 ==========') while True: try: # 获取待处理的关键词(crawled=0, seed_id>0) sql = """SELECT seed_id, seed_name FROM baidu_keyword WHERE crawled=0 AND seed_id>0 GROUP BY seed_id, seed_name ORDER BY MIN(created_at) ASC LIMIT 10""" seed_groups = db_manager.execute_query(sql) if not seed_groups: logger.info('\n[监听中] 当前没有待处理的关键词,3秒后重试...') time.sleep(3) continue logger.info(f'\n[批次开始] 获取到 {len(seed_groups)} 个种子需要处理') # 按seed_id启动多个进程 processes = [] for seed_id, seed_name in seed_groups: logger.info(f' - 启动进程: 种子ID:{seed_id}, 种子名:{seed_name}') p = Process(target=process_seed_worker, args=(seed_id, seed_name)) p.start() processes.append(p) # 等待所有进程完成 for p in processes: p.join() logger.info(f'[批次完成] {len(seed_groups)} 个种子处理完成,继续监听...') except Exception as e: logger.error(f'循环监听异常:{e}', exc_info=True) time.sleep(3) def main(): """ 主流程: 1. 每次从 baidu_seed_keywords 表处理2个种子词 2. 等待该批种子词产生的所有层级关键词全部抓取完成(baidu_keyword.crawled 全部=1) 3. 再处理下一扉2个种子词 4. 循环直到 baidu_seed_keywords 全部消耗完 5. 然后每10秒监听是否有新的种子词,有则重新开始 """ try: logger.info('='*70) logger.info('百度关键词抓取系统启动 - ' + datetime.now().strftime('%Y-%m-%d %H:%M:%S')) logger.info('='*70) process_layer_keywords() logger.info(f'\n{"-"*70}') logger.info(f'{"-"*70}\n') except Exception as e: logger.error(f'程序异常退出:{e}', exc_info=True) if __name__ == '__main__': main()