Files
ai_crawler_tiktok/tiktok/search.py

171 lines
5.7 KiB
Python
Raw Normal View History

2025-12-08 15:20:22 +08:00
import json
import re
import threading
import time
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from urllib.request import Request, urlopen
from core.curl import parse_curl_file
from data.store import save_links_snapshot
def _update_query(url, updates):
"""在原始 URL 上用 `updates` 更新查询参数并返回新 URL"""
p = urlparse(url)
q = parse_qs(p.query)
for k, v in updates.items():
q[k] = [str(v)]
new_q = urlencode(q, doseq=True)
return urlunparse((p.scheme, p.netloc, p.path, p.params, new_q, p.fragment))
def _extract_links(obj):
"""从返回对象中提取视频链接
优先从 `data -> item -> author.uniqueId + item.id` 组合
同时遍历字符串字段用正则匹配 tiktok 链接作为兜底
返回链接列表可能包含重复外层负责去重
"""
links = []
data = obj.get('data') if isinstance(obj, dict) else None
if isinstance(data, list):
for e in data:
if isinstance(e, dict) and e.get('type') == 1 and isinstance(e.get('item'), dict):
it = e['item']
author = it.get('author') or {}
uid = author.get('uniqueId')
vid = it.get('id')
if uid and vid:
links.append(f"https://www.tiktok.com/@{uid}/video/{vid}")
patterns = [
r"https?://www\.tiktok\.com/[\w@._-]+/video/\d+",
r"https?://www\.tiktok\.com/video/\d+",
r"https?://vm\.tiktok\.com/[\w-]+",
r"https?://vt\.tiktok\.com/[\w-]+",
]
def rec(x):
if isinstance(x, dict):
for v in x.values():
rec(v)
elif isinstance(x, list):
for v in x:
rec(v)
elif isinstance(x, str):
s = x
for pat in patterns:
for m in re.finditer(pat, s):
links.append(m.group(0))
rec(obj)
return links
def search_video_links(keyword, file_path, max_pages=50, timeout=30, count=None, on_link=None):
"""按关键词分页搜索视频链接
输入 `file_path` 的第 2 curl 请求获取基准 URL 与头部
行为分页拉取重试解析链接对新链接触发 `on_link` 回调
返回所有发现的链接列表不去重本地返回外层统一去重
"""
reqs = parse_curl_file(file_path)
if len(reqs) < 2:
return []
base = reqs[1]
headers = base['headers']
parsed = urlparse(base['url'])
q = parse_qs(parsed.query)
if count is None:
if 'count' in q:
try:
count = int(q['count'][0])
except Exception:
count = 12
else:
count = 12
all_links = []
seen = set()
offset = 0
cursor = None
for _ in range(max_pages):
params = {'keyword': keyword, 'count': count}
if cursor is not None:
params['offset'] = cursor
else:
params['offset'] = offset
u = _update_query(base['url'], params)
data = None
for i in range(3):
try:
req = Request(u, headers=headers, method='GET')
with urlopen(req, timeout=timeout) as resp:
data = resp.read()
break
except Exception:
time.sleep(0.5 * (i + 1))
data = None
try:
obj = json.loads(data.decode('utf-8', errors='ignore'))
except Exception:
obj = {}
links = _extract_links(obj)
has_more = obj.get('has_more')
next_cursor = obj.get('cursor')
new = 0
for l in links:
if l not in seen:
seen.add(l)
all_links.append(l)
new += 1
if on_link:
try:
on_link(l)
except Exception:
pass
if has_more in (True, 1) and isinstance(next_cursor, int):
cursor = next_cursor
continue
if new == 0:
break
offset += count
return all_links
_print_lock = threading.Lock()
def save_links_multi(keywords, out_path, file_path, max_pages=50, timeout=30, count=None, workers=5):
"""并发按多个关键词搜索并保存快照
并发使用线程 + 信号量限制并发跨关键词统一去重
输出写入 `out_path`包含 `keywords/items/total_count/links`
"""
all_links = []
seen = set()
items = []
seen_lock = threading.Lock()
sem = threading.Semaphore(max(1, int(workers)))
def worker(kw):
with sem:
item_links = []
def on_new(l):
with seen_lock:
if l not in seen:
seen.add(l)
all_links.append(l)
item_links.append(l)
with _print_lock:
print(l, flush=True)
search_video_links(kw, file_path=file_path, max_pages=max_pages, timeout=timeout, count=count, on_link=on_new)
items.append({'keyword': kw, 'count': len(item_links), 'links': item_links})
threads = []
for kw in keywords:
t = threading.Thread(target=worker, args=(kw,))
t.daemon = True
t.start()
threads.append(t)
for t in threads:
t.join()
save_links_snapshot(out_path, keywords, items, all_links)
return out_path
"""TikTok 视频链接搜索模块
核心能力
- 构造查询 URL更新 keyword/offset/count 等参数
- 发起请求并解析返回中的视频链接结构化 + 正则兜底
- 对多个关键词并发搜索统一去重与快照保存
"""