import json import re import threading import time from urllib.parse import urlparse, parse_qs, urlencode, urlunparse from urllib.request import Request, urlopen from core.curl import parse_curl_file from data.store import save_links_snapshot def _update_query(url, updates): """在原始 URL 上用 `updates` 更新查询参数并返回新 URL""" p = urlparse(url) q = parse_qs(p.query) for k, v in updates.items(): q[k] = [str(v)] new_q = urlencode(q, doseq=True) return urlunparse((p.scheme, p.netloc, p.path, p.params, new_q, p.fragment)) def _extract_links(obj): """从返回对象中提取视频链接 优先从 `data -> item -> author.uniqueId + item.id` 组合; 同时遍历字符串字段,用正则匹配 tiktok 链接作为兜底。 返回:链接列表(可能包含重复,外层负责去重)。 """ links = [] data = obj.get('data') if isinstance(obj, dict) else None if isinstance(data, list): for e in data: if isinstance(e, dict) and e.get('type') == 1 and isinstance(e.get('item'), dict): it = e['item'] author = it.get('author') or {} uid = author.get('uniqueId') vid = it.get('id') if uid and vid: links.append(f"https://www.tiktok.com/@{uid}/video/{vid}") patterns = [ r"https?://www\.tiktok\.com/[\w@._-]+/video/\d+", r"https?://www\.tiktok\.com/video/\d+", r"https?://vm\.tiktok\.com/[\w-]+", r"https?://vt\.tiktok\.com/[\w-]+", ] def rec(x): if isinstance(x, dict): for v in x.values(): rec(v) elif isinstance(x, list): for v in x: rec(v) elif isinstance(x, str): s = x for pat in patterns: for m in re.finditer(pat, s): links.append(m.group(0)) rec(obj) return links def search_video_links(keyword, file_path, max_pages=50, timeout=30, count=None, on_link=None): """按关键词分页搜索视频链接 输入:从 `file_path` 的第 2 个 curl 请求获取基准 URL 与头部 行为:分页拉取、重试、解析链接;对新链接触发 `on_link` 回调 返回:所有发现的链接列表(不去重本地返回,外层统一去重)。 """ reqs = parse_curl_file(file_path) if len(reqs) < 2: return [] base = reqs[1] headers = base['headers'] parsed = urlparse(base['url']) q = parse_qs(parsed.query) if count is None: if 'count' in q: try: count = int(q['count'][0]) except Exception: count = 12 else: count = 12 all_links = [] seen = set() offset = 0 cursor = None for _ in range(max_pages): params = {'keyword': keyword, 'count': count} if cursor is not None: params['offset'] = cursor else: params['offset'] = offset u = _update_query(base['url'], params) data = None for i in range(3): try: req = Request(u, headers=headers, method='GET') with urlopen(req, timeout=timeout) as resp: data = resp.read() break except Exception: time.sleep(0.5 * (i + 1)) data = None try: obj = json.loads(data.decode('utf-8', errors='ignore')) except Exception: obj = {} links = _extract_links(obj) has_more = obj.get('has_more') next_cursor = obj.get('cursor') new = 0 for l in links: if l not in seen: seen.add(l) all_links.append(l) new += 1 if on_link: try: on_link(l) except Exception: pass if has_more in (True, 1) and isinstance(next_cursor, int): cursor = next_cursor continue if new == 0: break offset += count return all_links _print_lock = threading.Lock() def save_links_multi(keywords, out_path, file_path, max_pages=50, timeout=30, count=None, workers=5): """并发按多个关键词搜索并保存快照 并发:使用线程 + 信号量限制并发;跨关键词统一去重; 输出:写入 `out_path`,包含 `keywords/items/total_count/links`。 """ all_links = [] seen = set() items = [] seen_lock = threading.Lock() sem = threading.Semaphore(max(1, int(workers))) def worker(kw): with sem: item_links = [] def on_new(l): with seen_lock: if l not in seen: seen.add(l) all_links.append(l) item_links.append(l) with _print_lock: print(l, flush=True) search_video_links(kw, file_path=file_path, max_pages=max_pages, timeout=timeout, count=count, on_link=on_new) items.append({'keyword': kw, 'count': len(item_links), 'links': item_links}) threads = [] for kw in keywords: t = threading.Thread(target=worker, args=(kw,)) t.daemon = True t.start() threads.append(t) for t in threads: t.join() save_links_snapshot(out_path, keywords, items, all_links) return out_path """TikTok 视频链接搜索模块 核心能力: - 构造查询 URL(更新 keyword/offset/count 等参数) - 发起请求并解析返回中的视频链接(结构化 + 正则兜底) - 对多个关键词并发搜索、统一去重与快照保存 """