Files
ai_crawler_tiktok/main.py
2025-12-08 15:20:22 +08:00

186 lines
8.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import argparse
import os
from utils.io import load_keywords_from_file, read_json
from tiktok.search import save_links_multi
from tiktok.comments import save_comments_from_links
from db.mysql_import import import_csv_to_mysql, create_database_if_not_exists
def run_links(args):
"""运行链接收集阶段
参数来源:命令行(关键词、请求文件、分页、并发等)
流程:
1. 汇总关键词(--keyword/--keywords/--keywords-file
2. 校验非空
3. 调用 `save_links_multi` 并发搜索与去重,保存到 `args.out`
"""
kws = []
if args.keyword:
kws.extend([k for k in args.keyword if k])
if args.keywords:
for k in args.keywords.split(','):
k = k.strip()
if k:
kws.append(k)
if args.keywords_file:
kws.extend(load_keywords_from_file(args.keywords_file))
kws = [k for k in kws if k]
if not kws:
raise SystemExit('no keywords')
save_links_multi(kws, out_path=args.out, file_path=args.file_path, max_pages=args.max_pages, timeout=args.timeout, count=args.count, workers=args.workers)
def run_comments(args):
"""运行评论与回复抓取阶段
输入:`args.links_json`(可为统一快照或简单结构)
读取逻辑:优先 `links` 字段;若无,则聚合 `items[*].links`
调用:`save_comments_from_links` 执行并发抓取,输出 JSON 与可选 CSV
"""
obj = read_json(args.links_json)
links = obj.get('links') or []
if not links and os.path.exists(args.links_json):
try:
for name in ['links', 'items']:
if name == 'items':
tmp = []
for it in obj.get('items', []):
tmp.extend(it.get('links', []))
links = tmp
break
except Exception:
pass
if not links:
raise SystemExit('no links')
save_comments_from_links(links, out_path=args.out, file_path=args.file_path, count=args.count, pages=args.pages, timeout=args.timeout, reply_count=args.reply_count, reply_pages=args.reply_pages, total_limit=args.limit, reply_total_limit=args.reply_limit, csv_path=args.csv, workers=args.workers)
def run_all(args):
"""串联执行链接收集与评论抓取
1. 解析关键词并调用搜索阶段输出到 `args.links_out`
2. 读取链接快照,兼容两种结构
3. 调用评论抓取阶段输出到 `args.comments_out` 并可写入 CSV
适用于一体化流水线执行。
"""
kws = []
if getattr(args, 'keyword', None):
kws.extend([k for k in args.keyword if k])
if getattr(args, 'keywords', None):
for k in args.keywords.split(','):
k = k.strip()
if k:
kws.append(k)
if getattr(args, 'keywords_file', None):
kws.extend(load_keywords_from_file(args.keywords_file))
kws = [k for k in kws if k]
if not kws:
raise SystemExit('no keywords')
save_links_multi(kws, out_path=args.links_out, file_path=args.file_path, max_pages=args.search_max_pages, timeout=args.search_timeout, count=args.search_count, workers=args.search_workers)
obj = read_json(args.links_out)
links = obj.get('links') or []
if not links and os.path.exists(args.links_out):
try:
for name in ['links', 'items']:
if name == 'items':
tmp = []
for it in obj.get('items', []):
tmp.extend(it.get('links', []))
links = tmp
break
except Exception:
pass
if not links:
raise SystemExit('no links')
save_comments_from_links(links, out_path=args.comments_out, file_path=args.file_path, count=args.comments_count, pages=args.comments_pages, timeout=args.comments_timeout, reply_count=args.reply_count, reply_pages=args.reply_pages, total_limit=args.comments_limit, reply_total_limit=args.reply_limit, csv_path=args.csv, workers=args.comments_workers)
def main():
"""命令行解析并分发到对应子命令函数"""
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest='cmd')
p_links = sub.add_parser('links')
p_links.add_argument('--keyword', action='append')
p_links.add_argument('--keywords', default=None)
p_links.add_argument('--keywords-file', default=None)
p_links.add_argument('--file-path', default=r'data\1.text')
p_links.add_argument('--out', default='data\\urls.json')
p_links.add_argument('--max-pages', type=int, default=50)
p_links.add_argument('--count', type=int, default=None)
p_links.add_argument('--timeout', type=int, default=30)
p_links.add_argument('--workers', type=int, default=5)
p_links.set_defaults(func=run_links)
p_comments = sub.add_parser('comments')
p_comments.add_argument('--links-json', default='data\\urls.json')
p_comments.add_argument('--out', default='data\\tik_comments.json')
p_comments.add_argument('--file-path', default=r'data\\1.text')
p_comments.add_argument('--count', type=int, default=100)
p_comments.add_argument('--pages', type=int, default=100)
p_comments.add_argument('--timeout', type=int, default=30)
p_comments.add_argument('--limit', type=int, default=None)
p_comments.add_argument('--reply-count', type=int, default=100)
p_comments.add_argument('--reply-pages', type=int, default=100)
p_comments.add_argument('--reply-limit', type=int, default=None)
p_comments.add_argument('--csv', default='data\\comments.csv')
p_comments.add_argument('--workers', type=int, default=None)
p_comments.set_defaults(func=run_comments)
p_all = sub.add_parser('all')
p_all.add_argument('--keyword', action='append')
p_all.add_argument('--keywords', default=None)
p_all.add_argument('--keywords-file', default=None)
p_all.add_argument('--file-path', default=r'data\\1.text')
p_all.add_argument('--links-out', default='data\\urls.json')
p_all.add_argument('--search-max-pages', type=int, default=50)
p_all.add_argument('--search-count', type=int, default=None)
p_all.add_argument('--search-timeout', type=int, default=30)
p_all.add_argument('--search-workers', type=int, default=5)
p_all.add_argument('--comments-out', default='data\\tik_comments.json')
p_all.add_argument('--comments-count', type=int, default=100)
p_all.add_argument('--comments-pages', type=int, default=100)
p_all.add_argument('--comments-timeout', type=int, default=30)
p_all.add_argument('--comments-limit', type=int, default=None)
p_all.add_argument('--reply-count', type=int, default=100)
p_all.add_argument('--reply-pages', type=int, default=100)
p_all.add_argument('--reply-limit', type=int, default=None)
p_all.add_argument('--csv', default='data\\comments.csv')
p_all.add_argument('--comments-workers', type=int, default=None)
p_all.set_defaults(func=run_all)
p_mysql = sub.add_parser('mysql')
p_mysql.add_argument('--csv', default='data\\comments.csv')
p_mysql.add_argument('--host', default='localhost')
p_mysql.add_argument('--port', type=int, default=3306)
p_mysql.add_argument('--user', default='root')
p_mysql.add_argument('--password', default='')
p_mysql.add_argument('--database', default='crawler_tiktok')
p_mysql.add_argument('--table', default='comments')
def run_mysql(args):
import_csv_to_mysql(args.csv, host=args.host, port=args.port, user=args.user, password=args.password, database=args.database, table=args.table)
p_mysql.set_defaults(func=run_mysql)
p_mysql_db = sub.add_parser('mysql-db')
p_mysql_db.add_argument('--host', default='localhost')
p_mysql_db.add_argument('--port', type=int, default=3306)
p_mysql_db.add_argument('--user', default='root')
p_mysql_db.add_argument('--password', default='')
p_mysql_db.add_argument('--database', default='yunque')
def run_mysql_db(args):
create_database_if_not_exists(host=args.host, port=args.port, user=args.user, password=args.password, database=args.database)
p_mysql_db.set_defaults(func=run_mysql_db)
args = p.parse_args()
if not args.cmd:
p.print_help()
raise SystemExit(1)
args.func(args)
if __name__ == '__main__':
main()
"""命令行入口模块
提供三类子命令:
- links根据关键词并发搜索视频链接并保存快照
- comments根据链接列表抓取评论与回复并保存快照与 CSV
- all串联 links 与 comments一次性完成全流程
运行方式建议使用 `python -m crawler_tiktok.main ...` 以避免导入路径问题。
"""