Files
ai_crawler_tiktok/tiktok/comments.py
2025-12-08 15:20:22 +08:00

208 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import re
import threading
import time
from urllib.parse import urlparse, parse_qs, urlencode
from urllib.request import Request, urlopen
from core.curl import parse_curl_file
from utils.io import ensure_csv_header, append_csv_rows
from data.store import save_comments_snapshot
def _extract_aweme_id(link):
"""从视频链接中提取 aweme_id/video/<id>"""
m = re.search(r"/video/(\d+)", link)
return m.group(1) if m else None
def fetch_comments_aweme(aweme_id, file_path, count=20, max_pages=50, timeout=30, total_limit=None, referer=None):
"""分页抓取某个视频的评论
参数:
- `aweme_id` 视频 ID
- `file_path` curl 文本文件(第 1 块为评论接口基准)
- `count/max_pages/timeout` 分页与超时控制
- `total_limit` 总条数上限(可选)
- `referer` 用于设置请求头的来源页(可选)
行为:失败重试、必要时切换到兜底评论接口;处理 `has_more/next_cursor`。
返回:评论对象列表。
"""
reqs = parse_curl_file(file_path)
if not reqs:
return []
base = reqs[0]
headers = dict(base['headers'])
if referer:
headers['referer'] = referer
cursor = 0
all_comments = []
for _ in range(max_pages):
u_parsed = urlparse(base['url'])
q = parse_qs(u_parsed.query)
q['aweme_id'] = [str(aweme_id)]
q['count'] = [str(count)]
q['cursor'] = [str(cursor)]
u = u_parsed._replace(query=urlencode(q, doseq=True)).geturl()
data = None
for i in range(3):
try:
req = Request(u, headers=headers, method='GET')
with urlopen(req, timeout=timeout) as resp:
data = resp.read()
break
except Exception:
time.sleep(0.5 * (i + 1))
data = None
try:
obj = json.loads(data.decode('utf-8', errors='ignore'))
except Exception:
obj = {}
if not obj.get('comments'):
alt_params = {'aid': 1988, 'aweme_id': aweme_id, 'count': count, 'cursor': cursor}
alt_url = 'https://www.tiktok.com/api/comment/list/?' + urlencode(alt_params)
for i in range(2):
try:
req = Request(alt_url, headers=headers, method='GET')
with urlopen(req, timeout=timeout) as resp:
data2 = resp.read()
obj2 = json.loads(data2.decode('utf-8', errors='ignore'))
if obj2.get('comments'):
obj = obj2
break
except Exception:
time.sleep(0.5 * (i + 1))
comments = obj.get('comments') or []
for c in comments:
all_comments.append(c)
if isinstance(total_limit, int) and total_limit > 0 and len(all_comments) >= total_limit:
break
has_more = obj.get('has_more')
next_cursor = obj.get('cursor') or obj.get('next_cursor')
if has_more in (True, 1) and isinstance(next_cursor, int):
cursor = next_cursor
continue
if comments and isinstance(next_cursor, int):
cursor = next_cursor
continue
break
return all_comments
def fetch_replies(comment_id, aweme_id, file_path, count=20, max_pages=50, timeout=30, total_limit=None):
"""分页抓取某条评论的二级回复
参数:`comment_id/aweme_id` 标识;其他参数同评论抓取。
返回:回复对象列表。
"""
reqs = parse_curl_file(file_path)
if not reqs:
return []
headers = reqs[0]['headers']
base = 'https://www.tiktok.com/api/comment/list/reply/'
cursor = 0
replies = []
for _ in range(max_pages):
params = {'aid': 1988, 'aweme_id': aweme_id, 'comment_id': comment_id, 'count': count, 'cursor': cursor}
url = base + '?' + urlencode(params)
data = None
for i in range(3):
try:
req = Request(url, headers=headers, method='GET')
with urlopen(req, timeout=timeout) as resp:
data = resp.read()
break
except Exception:
time.sleep(0.5 * (i + 1))
data = None
try:
obj = json.loads(data.decode('utf-8', errors='ignore'))
except Exception:
obj = {}
arr = obj.get('comments') or []
for r in arr:
replies.append(r)
if isinstance(total_limit, int) and total_limit > 0 and len(replies) >= total_limit:
break
has_more = obj.get('has_more')
next_cursor = obj.get('cursor')
if has_more in (True, 1) and isinstance(next_cursor, int):
cursor = next_cursor
continue
break
return replies
_csv_lock = threading.Lock()
_print_lock = threading.Lock()
_results_lock = threading.Lock()
def save_comments_from_links(links, out_path, file_path, count=20, pages=50, timeout=30, reply_count=20, reply_pages=50, total_limit=None, reply_total_limit=None, csv_path=None, workers=None):
"""并发从视频链接抓取评论与回复并保存快照
并发:可选信号量限制;每个链接独立线程抓取;
CSV若提供 `csv_path`,按 `username,text` 追加主评论与回复;
输出:写入 `out_path`,结构为 `{'items': [{link,count,comments: [...]}, ...]}`。
"""
ensure_csv_header(csv_path, ['username', 'text'])
results = []
sem = None
if isinstance(workers, int) and workers > 0:
sem = threading.Semaphore(workers)
def _process(link):
if sem:
sem.acquire()
with _print_lock:
print(f"[START] {link}", flush=True)
try:
cs = fetch_comments_aweme(_extract_aweme_id(link), file_path=file_path, count=count, max_pages=pages, timeout=timeout, total_limit=total_limit, referer=link)
enriched = []
for c in cs:
cid = c.get('cid')
if cid:
rs = fetch_replies(cid, _extract_aweme_id(link), file_path=file_path, count=reply_count, max_pages=reply_pages, timeout=timeout, total_limit=reply_total_limit)
c = dict(c)
c['replies'] = rs
c['reply_count'] = len(rs)
enriched.append(c)
try:
with _print_lock:
print(f"{link} | cid={c.get('cid')} | create_time={c.get('create_time')} | reply_count={c.get('reply_count', 0)} | text={c.get('text')}", flush=True)
except Exception:
pass
if csv_path:
u = c.get('user') or {}
uname = u.get('unique_id') or u.get('nickname') or u.get('uid') or ''
rows = [[uname, c.get('text')]]
for r in c.get('replies', []) or []:
ru = r.get('user') or {}
runame = ru.get('unique_id') or ru.get('nickname') or ru.get('uid') or ''
rows.append([runame, r.get('text')])
with _csv_lock:
append_csv_rows(csv_path, rows)
with _results_lock:
results.append({'link': link, 'count': len(cs), 'comments': enriched})
reply_total = sum(len(c.get('replies') or []) for c in enriched)
with _print_lock:
print(f"[DONE] {link} comments={len(cs)} replies={reply_total}", flush=True)
except Exception as e:
with _print_lock:
print(f"[ERROR] {link} {e}", flush=True)
finally:
if sem:
sem.release()
threads = []
for link in links:
t = threading.Thread(target=_process, args=(link,))
t.daemon = True
t.start()
threads.append(t)
for t in threads:
t.join()
save_comments_snapshot(out_path, results)
return out_path
"""TikTok 评论与回复抓取模块
能力:
- 根据视频链接提取 aweme_id
- 通过评论接口分页拉取评论(支持兜底接口)
- 针对每条评论抓取二级回复并汇总
- 可选写入 CSV 与打印进度日志
"""