import argparse import os from utils.io import load_keywords_from_file, read_json from tiktok.search import save_links_multi from tiktok.comments import save_comments_from_links from db.mysql_import import import_csv_to_mysql, create_database_if_not_exists def run_links(args): """运行链接收集阶段 参数来源:命令行(关键词、请求文件、分页、并发等) 流程: 1. 汇总关键词(--keyword/--keywords/--keywords-file) 2. 校验非空 3. 调用 `save_links_multi` 并发搜索与去重,保存到 `args.out` """ kws = [] if args.keyword: kws.extend([k for k in args.keyword if k]) if args.keywords: for k in args.keywords.split(','): k = k.strip() if k: kws.append(k) if args.keywords_file: kws.extend(load_keywords_from_file(args.keywords_file)) kws = [k for k in kws if k] if not kws: raise SystemExit('no keywords') save_links_multi(kws, out_path=args.out, file_path=args.file_path, max_pages=args.max_pages, timeout=args.timeout, count=args.count, workers=args.workers) def run_comments(args): """运行评论与回复抓取阶段 输入:`args.links_json`(可为统一快照或简单结构) 读取逻辑:优先 `links` 字段;若无,则聚合 `items[*].links` 调用:`save_comments_from_links` 执行并发抓取,输出 JSON 与可选 CSV """ obj = read_json(args.links_json) links = obj.get('links') or [] if not links and os.path.exists(args.links_json): try: for name in ['links', 'items']: if name == 'items': tmp = [] for it in obj.get('items', []): tmp.extend(it.get('links', [])) links = tmp break except Exception: pass if not links: raise SystemExit('no links') save_comments_from_links(links, out_path=args.out, file_path=args.file_path, count=args.count, pages=args.pages, timeout=args.timeout, reply_count=args.reply_count, reply_pages=args.reply_pages, total_limit=args.limit, reply_total_limit=args.reply_limit, csv_path=args.csv, workers=args.workers) def run_all(args): """串联执行链接收集与评论抓取 1. 解析关键词并调用搜索阶段输出到 `args.links_out` 2. 读取链接快照,兼容两种结构 3. 调用评论抓取阶段输出到 `args.comments_out` 并可写入 CSV 适用于一体化流水线执行。 """ kws = [] if getattr(args, 'keyword', None): kws.extend([k for k in args.keyword if k]) if getattr(args, 'keywords', None): for k in args.keywords.split(','): k = k.strip() if k: kws.append(k) if getattr(args, 'keywords_file', None): kws.extend(load_keywords_from_file(args.keywords_file)) kws = [k for k in kws if k] if not kws: raise SystemExit('no keywords') save_links_multi(kws, out_path=args.links_out, file_path=args.file_path, max_pages=args.search_max_pages, timeout=args.search_timeout, count=args.search_count, workers=args.search_workers) obj = read_json(args.links_out) links = obj.get('links') or [] if not links and os.path.exists(args.links_out): try: for name in ['links', 'items']: if name == 'items': tmp = [] for it in obj.get('items', []): tmp.extend(it.get('links', [])) links = tmp break except Exception: pass if not links: raise SystemExit('no links') save_comments_from_links(links, out_path=args.comments_out, file_path=args.file_path, count=args.comments_count, pages=args.comments_pages, timeout=args.comments_timeout, reply_count=args.reply_count, reply_pages=args.reply_pages, total_limit=args.comments_limit, reply_total_limit=args.reply_limit, csv_path=args.csv, workers=args.comments_workers) def main(): """命令行解析并分发到对应子命令函数""" p = argparse.ArgumentParser() sub = p.add_subparsers(dest='cmd') p_links = sub.add_parser('links') p_links.add_argument('--keyword', action='append') p_links.add_argument('--keywords', default=None) p_links.add_argument('--keywords-file', default=None) p_links.add_argument('--file-path', default=r'data\1.text') p_links.add_argument('--out', default='data\\urls.json') p_links.add_argument('--max-pages', type=int, default=50) p_links.add_argument('--count', type=int, default=None) p_links.add_argument('--timeout', type=int, default=30) p_links.add_argument('--workers', type=int, default=5) p_links.set_defaults(func=run_links) p_comments = sub.add_parser('comments') p_comments.add_argument('--links-json', default='data\\urls.json') p_comments.add_argument('--out', default='data\\tik_comments.json') p_comments.add_argument('--file-path', default=r'data\\1.text') p_comments.add_argument('--count', type=int, default=100) p_comments.add_argument('--pages', type=int, default=100) p_comments.add_argument('--timeout', type=int, default=30) p_comments.add_argument('--limit', type=int, default=None) p_comments.add_argument('--reply-count', type=int, default=100) p_comments.add_argument('--reply-pages', type=int, default=100) p_comments.add_argument('--reply-limit', type=int, default=None) p_comments.add_argument('--csv', default='data\\comments.csv') p_comments.add_argument('--workers', type=int, default=None) p_comments.set_defaults(func=run_comments) p_all = sub.add_parser('all') p_all.add_argument('--keyword', action='append') p_all.add_argument('--keywords', default=None) p_all.add_argument('--keywords-file', default=None) p_all.add_argument('--file-path', default=r'data\\1.text') p_all.add_argument('--links-out', default='data\\urls.json') p_all.add_argument('--search-max-pages', type=int, default=50) p_all.add_argument('--search-count', type=int, default=None) p_all.add_argument('--search-timeout', type=int, default=30) p_all.add_argument('--search-workers', type=int, default=5) p_all.add_argument('--comments-out', default='data\\tik_comments.json') p_all.add_argument('--comments-count', type=int, default=100) p_all.add_argument('--comments-pages', type=int, default=100) p_all.add_argument('--comments-timeout', type=int, default=30) p_all.add_argument('--comments-limit', type=int, default=None) p_all.add_argument('--reply-count', type=int, default=100) p_all.add_argument('--reply-pages', type=int, default=100) p_all.add_argument('--reply-limit', type=int, default=None) p_all.add_argument('--csv', default='data\\comments.csv') p_all.add_argument('--comments-workers', type=int, default=None) p_all.set_defaults(func=run_all) p_mysql = sub.add_parser('mysql') p_mysql.add_argument('--csv', default='data\\comments.csv') p_mysql.add_argument('--host', default='localhost') p_mysql.add_argument('--port', type=int, default=3306) p_mysql.add_argument('--user', default='root') p_mysql.add_argument('--password', default='') p_mysql.add_argument('--database', default='crawler_tiktok') p_mysql.add_argument('--table', default='comments') def run_mysql(args): import_csv_to_mysql(args.csv, host=args.host, port=args.port, user=args.user, password=args.password, database=args.database, table=args.table) p_mysql.set_defaults(func=run_mysql) p_mysql_db = sub.add_parser('mysql-db') p_mysql_db.add_argument('--host', default='localhost') p_mysql_db.add_argument('--port', type=int, default=3306) p_mysql_db.add_argument('--user', default='root') p_mysql_db.add_argument('--password', default='') p_mysql_db.add_argument('--database', default='yunque') def run_mysql_db(args): create_database_if_not_exists(host=args.host, port=args.port, user=args.user, password=args.password, database=args.database) p_mysql_db.set_defaults(func=run_mysql_db) args = p.parse_args() if not args.cmd: p.print_help() raise SystemExit(1) args.func(args) if __name__ == '__main__': main() """命令行入口模块 提供三类子命令: - links:根据关键词并发搜索视频链接并保存快照 - comments:根据链接列表抓取评论与回复并保存快照与 CSV - all:串联 links 与 comments,一次性完成全流程 运行方式建议使用 `python -m crawler_tiktok.main ...` 以避免导入路径问题。 """