171 lines
5.7 KiB
Python
171 lines
5.7 KiB
Python
|
|
import json
|
|||
|
|
import re
|
|||
|
|
import threading
|
|||
|
|
import time
|
|||
|
|
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
|
|||
|
|
from urllib.request import Request, urlopen
|
|||
|
|
from core.curl import parse_curl_file
|
|||
|
|
from data.store import save_links_snapshot
|
|||
|
|
|
|||
|
|
def _update_query(url, updates):
|
|||
|
|
"""在原始 URL 上用 `updates` 更新查询参数并返回新 URL"""
|
|||
|
|
p = urlparse(url)
|
|||
|
|
q = parse_qs(p.query)
|
|||
|
|
for k, v in updates.items():
|
|||
|
|
q[k] = [str(v)]
|
|||
|
|
new_q = urlencode(q, doseq=True)
|
|||
|
|
return urlunparse((p.scheme, p.netloc, p.path, p.params, new_q, p.fragment))
|
|||
|
|
|
|||
|
|
def _extract_links(obj):
|
|||
|
|
"""从返回对象中提取视频链接
|
|||
|
|
|
|||
|
|
优先从 `data -> item -> author.uniqueId + item.id` 组合;
|
|||
|
|
同时遍历字符串字段,用正则匹配 tiktok 链接作为兜底。
|
|||
|
|
返回:链接列表(可能包含重复,外层负责去重)。
|
|||
|
|
"""
|
|||
|
|
links = []
|
|||
|
|
data = obj.get('data') if isinstance(obj, dict) else None
|
|||
|
|
if isinstance(data, list):
|
|||
|
|
for e in data:
|
|||
|
|
if isinstance(e, dict) and e.get('type') == 1 and isinstance(e.get('item'), dict):
|
|||
|
|
it = e['item']
|
|||
|
|
author = it.get('author') or {}
|
|||
|
|
uid = author.get('uniqueId')
|
|||
|
|
vid = it.get('id')
|
|||
|
|
if uid and vid:
|
|||
|
|
links.append(f"https://www.tiktok.com/@{uid}/video/{vid}")
|
|||
|
|
patterns = [
|
|||
|
|
r"https?://www\.tiktok\.com/[\w@._-]+/video/\d+",
|
|||
|
|
r"https?://www\.tiktok\.com/video/\d+",
|
|||
|
|
r"https?://vm\.tiktok\.com/[\w-]+",
|
|||
|
|
r"https?://vt\.tiktok\.com/[\w-]+",
|
|||
|
|
]
|
|||
|
|
def rec(x):
|
|||
|
|
if isinstance(x, dict):
|
|||
|
|
for v in x.values():
|
|||
|
|
rec(v)
|
|||
|
|
elif isinstance(x, list):
|
|||
|
|
for v in x:
|
|||
|
|
rec(v)
|
|||
|
|
elif isinstance(x, str):
|
|||
|
|
s = x
|
|||
|
|
for pat in patterns:
|
|||
|
|
for m in re.finditer(pat, s):
|
|||
|
|
links.append(m.group(0))
|
|||
|
|
rec(obj)
|
|||
|
|
return links
|
|||
|
|
|
|||
|
|
def search_video_links(keyword, file_path, max_pages=50, timeout=30, count=None, on_link=None):
|
|||
|
|
"""按关键词分页搜索视频链接
|
|||
|
|
|
|||
|
|
输入:从 `file_path` 的第 2 个 curl 请求获取基准 URL 与头部
|
|||
|
|
行为:分页拉取、重试、解析链接;对新链接触发 `on_link` 回调
|
|||
|
|
返回:所有发现的链接列表(不去重本地返回,外层统一去重)。
|
|||
|
|
"""
|
|||
|
|
reqs = parse_curl_file(file_path)
|
|||
|
|
if len(reqs) < 2:
|
|||
|
|
return []
|
|||
|
|
base = reqs[1]
|
|||
|
|
headers = base['headers']
|
|||
|
|
parsed = urlparse(base['url'])
|
|||
|
|
q = parse_qs(parsed.query)
|
|||
|
|
if count is None:
|
|||
|
|
if 'count' in q:
|
|||
|
|
try:
|
|||
|
|
count = int(q['count'][0])
|
|||
|
|
except Exception:
|
|||
|
|
count = 12
|
|||
|
|
else:
|
|||
|
|
count = 12
|
|||
|
|
all_links = []
|
|||
|
|
seen = set()
|
|||
|
|
offset = 0
|
|||
|
|
cursor = None
|
|||
|
|
for _ in range(max_pages):
|
|||
|
|
params = {'keyword': keyword, 'count': count}
|
|||
|
|
if cursor is not None:
|
|||
|
|
params['offset'] = cursor
|
|||
|
|
else:
|
|||
|
|
params['offset'] = offset
|
|||
|
|
u = _update_query(base['url'], params)
|
|||
|
|
data = None
|
|||
|
|
for i in range(3):
|
|||
|
|
try:
|
|||
|
|
req = Request(u, headers=headers, method='GET')
|
|||
|
|
with urlopen(req, timeout=timeout) as resp:
|
|||
|
|
data = resp.read()
|
|||
|
|
break
|
|||
|
|
except Exception:
|
|||
|
|
time.sleep(0.5 * (i + 1))
|
|||
|
|
data = None
|
|||
|
|
try:
|
|||
|
|
obj = json.loads(data.decode('utf-8', errors='ignore'))
|
|||
|
|
except Exception:
|
|||
|
|
obj = {}
|
|||
|
|
links = _extract_links(obj)
|
|||
|
|
has_more = obj.get('has_more')
|
|||
|
|
next_cursor = obj.get('cursor')
|
|||
|
|
new = 0
|
|||
|
|
for l in links:
|
|||
|
|
if l not in seen:
|
|||
|
|
seen.add(l)
|
|||
|
|
all_links.append(l)
|
|||
|
|
new += 1
|
|||
|
|
if on_link:
|
|||
|
|
try:
|
|||
|
|
on_link(l)
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
if has_more in (True, 1) and isinstance(next_cursor, int):
|
|||
|
|
cursor = next_cursor
|
|||
|
|
continue
|
|||
|
|
if new == 0:
|
|||
|
|
break
|
|||
|
|
offset += count
|
|||
|
|
return all_links
|
|||
|
|
|
|||
|
|
_print_lock = threading.Lock()
|
|||
|
|
|
|||
|
|
def save_links_multi(keywords, out_path, file_path, max_pages=50, timeout=30, count=None, workers=5):
|
|||
|
|
"""并发按多个关键词搜索并保存快照
|
|||
|
|
|
|||
|
|
并发:使用线程 + 信号量限制并发;跨关键词统一去重;
|
|||
|
|
输出:写入 `out_path`,包含 `keywords/items/total_count/links`。
|
|||
|
|
"""
|
|||
|
|
all_links = []
|
|||
|
|
seen = set()
|
|||
|
|
items = []
|
|||
|
|
seen_lock = threading.Lock()
|
|||
|
|
sem = threading.Semaphore(max(1, int(workers)))
|
|||
|
|
|
|||
|
|
def worker(kw):
|
|||
|
|
with sem:
|
|||
|
|
item_links = []
|
|||
|
|
def on_new(l):
|
|||
|
|
with seen_lock:
|
|||
|
|
if l not in seen:
|
|||
|
|
seen.add(l)
|
|||
|
|
all_links.append(l)
|
|||
|
|
item_links.append(l)
|
|||
|
|
with _print_lock:
|
|||
|
|
print(l, flush=True)
|
|||
|
|
search_video_links(kw, file_path=file_path, max_pages=max_pages, timeout=timeout, count=count, on_link=on_new)
|
|||
|
|
items.append({'keyword': kw, 'count': len(item_links), 'links': item_links})
|
|||
|
|
|
|||
|
|
threads = []
|
|||
|
|
for kw in keywords:
|
|||
|
|
t = threading.Thread(target=worker, args=(kw,))
|
|||
|
|
t.daemon = True
|
|||
|
|
t.start()
|
|||
|
|
threads.append(t)
|
|||
|
|
for t in threads:
|
|||
|
|
t.join()
|
|||
|
|
save_links_snapshot(out_path, keywords, items, all_links)
|
|||
|
|
return out_path
|
|||
|
|
"""TikTok 视频链接搜索模块
|
|||
|
|
|
|||
|
|
核心能力:
|
|||
|
|
- 构造查询 URL(更新 keyword/offset/count 等参数)
|
|||
|
|
- 发起请求并解析返回中的视频链接(结构化 + 正则兜底)
|
|||
|
|
- 对多个关键词并发搜索、统一去重与快照保存
|
|||
|
|
"""
|