Files
ai_crawler_tiktok/tiktok/search.py
2025-12-08 15:20:22 +08:00

171 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import re
import threading
import time
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from urllib.request import Request, urlopen
from core.curl import parse_curl_file
from data.store import save_links_snapshot
def _update_query(url, updates):
"""在原始 URL 上用 `updates` 更新查询参数并返回新 URL"""
p = urlparse(url)
q = parse_qs(p.query)
for k, v in updates.items():
q[k] = [str(v)]
new_q = urlencode(q, doseq=True)
return urlunparse((p.scheme, p.netloc, p.path, p.params, new_q, p.fragment))
def _extract_links(obj):
"""从返回对象中提取视频链接
优先从 `data -> item -> author.uniqueId + item.id` 组合;
同时遍历字符串字段,用正则匹配 tiktok 链接作为兜底。
返回:链接列表(可能包含重复,外层负责去重)。
"""
links = []
data = obj.get('data') if isinstance(obj, dict) else None
if isinstance(data, list):
for e in data:
if isinstance(e, dict) and e.get('type') == 1 and isinstance(e.get('item'), dict):
it = e['item']
author = it.get('author') or {}
uid = author.get('uniqueId')
vid = it.get('id')
if uid and vid:
links.append(f"https://www.tiktok.com/@{uid}/video/{vid}")
patterns = [
r"https?://www\.tiktok\.com/[\w@._-]+/video/\d+",
r"https?://www\.tiktok\.com/video/\d+",
r"https?://vm\.tiktok\.com/[\w-]+",
r"https?://vt\.tiktok\.com/[\w-]+",
]
def rec(x):
if isinstance(x, dict):
for v in x.values():
rec(v)
elif isinstance(x, list):
for v in x:
rec(v)
elif isinstance(x, str):
s = x
for pat in patterns:
for m in re.finditer(pat, s):
links.append(m.group(0))
rec(obj)
return links
def search_video_links(keyword, file_path, max_pages=50, timeout=30, count=None, on_link=None):
"""按关键词分页搜索视频链接
输入:从 `file_path` 的第 2 个 curl 请求获取基准 URL 与头部
行为:分页拉取、重试、解析链接;对新链接触发 `on_link` 回调
返回:所有发现的链接列表(不去重本地返回,外层统一去重)。
"""
reqs = parse_curl_file(file_path)
if len(reqs) < 2:
return []
base = reqs[1]
headers = base['headers']
parsed = urlparse(base['url'])
q = parse_qs(parsed.query)
if count is None:
if 'count' in q:
try:
count = int(q['count'][0])
except Exception:
count = 12
else:
count = 12
all_links = []
seen = set()
offset = 0
cursor = None
for _ in range(max_pages):
params = {'keyword': keyword, 'count': count}
if cursor is not None:
params['offset'] = cursor
else:
params['offset'] = offset
u = _update_query(base['url'], params)
data = None
for i in range(3):
try:
req = Request(u, headers=headers, method='GET')
with urlopen(req, timeout=timeout) as resp:
data = resp.read()
break
except Exception:
time.sleep(0.5 * (i + 1))
data = None
try:
obj = json.loads(data.decode('utf-8', errors='ignore'))
except Exception:
obj = {}
links = _extract_links(obj)
has_more = obj.get('has_more')
next_cursor = obj.get('cursor')
new = 0
for l in links:
if l not in seen:
seen.add(l)
all_links.append(l)
new += 1
if on_link:
try:
on_link(l)
except Exception:
pass
if has_more in (True, 1) and isinstance(next_cursor, int):
cursor = next_cursor
continue
if new == 0:
break
offset += count
return all_links
_print_lock = threading.Lock()
def save_links_multi(keywords, out_path, file_path, max_pages=50, timeout=30, count=None, workers=5):
"""并发按多个关键词搜索并保存快照
并发:使用线程 + 信号量限制并发;跨关键词统一去重;
输出:写入 `out_path`,包含 `keywords/items/total_count/links`。
"""
all_links = []
seen = set()
items = []
seen_lock = threading.Lock()
sem = threading.Semaphore(max(1, int(workers)))
def worker(kw):
with sem:
item_links = []
def on_new(l):
with seen_lock:
if l not in seen:
seen.add(l)
all_links.append(l)
item_links.append(l)
with _print_lock:
print(l, flush=True)
search_video_links(kw, file_path=file_path, max_pages=max_pages, timeout=timeout, count=count, on_link=on_new)
items.append({'keyword': kw, 'count': len(item_links), 'links': item_links})
threads = []
for kw in keywords:
t = threading.Thread(target=worker, args=(kw,))
t.daemon = True
t.start()
threads.append(t)
for t in threads:
t.join()
save_links_snapshot(out_path, keywords, items, all_links)
return out_path
"""TikTok 视频链接搜索模块
核心能力:
- 构造查询 URL更新 keyword/offset/count 等参数)
- 发起请求并解析返回中的视频链接(结构化 + 正则兜底)
- 对多个关键词并发搜索、统一去重与快照保存
"""