#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 将捕获的Cookie同步到MySQL数据库 从captured_account_cookies.json读取Cookie数据,更新到ai_authors表 """ import json import sys import os from datetime import datetime from typing import Dict, List, Optional # 导入统一的数据库管理器和日志配置 from database_config import DatabaseManager, DB_CONFIG from log_config import setup_cookie_sync_logger # 初始化日志记录器 logger = setup_cookie_sync_logger() # 设置UTF-8编码 if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') class CookieSyncToDB: """Cookie同步到数据库""" def __init__(self, db_config: Optional[Dict] = None): """ 初始化数据库连接 Args: db_config: 数据库配置字典,默认使用database_config.DB_CONFIG """ self.script_dir = os.path.dirname(os.path.abspath(__file__)) self.cookies_file = os.path.join(self.script_dir, "captured_account_cookies.json") # 成功和失败记录文件 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') self.success_file = os.path.join(self.script_dir, f"sync_success_{timestamp}.json") self.failed_file = os.path.join(self.script_dir, f"sync_failed_{timestamp}.json") # 使用统一的数据库管理器 self.db_manager = DatabaseManager(db_config) self.db_config = self.db_manager.config def connect_db(self) -> bool: """连接数据库""" return self.db_manager.test_connection() def close_db(self): """关闭数据库连接""" print("[OK] 数据库操作完成") def load_cookies_from_file(self) -> Dict: """从JSON文件加载Cookie数据""" try: if not os.path.exists(self.cookies_file): print(f"[X] Cookie文件不存在: {self.cookies_file}") return {} with open(self.cookies_file, 'r', encoding='utf-8') as f: data = json.load(f) print(f"[OK] 从文件加载了 {len(data)} 个账号的Cookie") return data except Exception as e: print(f"[X] 加载Cookie文件失败: {e}") return {} def cookie_dict_to_string(self, cookies: Dict) -> str: """ 将Cookie字典转换为字符串格式 Args: cookies: Cookie字典 Returns: Cookie字符串,格式: "key1=value1; key2=value2" """ return '; '.join([f"{k}={v}" for k, v in cookies.items()]) def save_sync_records(self, success_records: List[Dict], failed_records: List[Dict]): """ 保存同步结果记录 Args: success_records: 成功记录列表 failed_records: 失败记录列表 """ try: # 保存成功记录 if success_records: with open(self.success_file, 'w', encoding='utf-8') as f: json.dump(success_records, f, ensure_ascii=False, indent=2) logger.info(f"成功记录已保存到: {self.success_file}") print(f"\n[OK] 成功记录已保存: {self.success_file}") # 保存失败记录 if failed_records: with open(self.failed_file, 'w', encoding='utf-8') as f: json.dump(failed_records, f, ensure_ascii=False, indent=2) logger.info(f"失败记录已保存到: {self.failed_file}") print(f"[OK] 失败记录已保存: {self.failed_file}") except Exception as e: logger.error(f"保存记录文件失败: {e}", exc_info=True) print(f"[X] 保存记录文件失败: {e}") def find_author_by_name(self, author_name: str, channel: int = 1) -> Optional[Dict]: """ 根据作者名称和渠道查找数据库记录 Args: author_name: 作者名称 channel: 渠道(1=百家号,默认1) Returns: 作者记录字典,未找到返回None """ try: sql = "SELECT * FROM ai_authors WHERE author_name = %s AND channel = %s LIMIT 1" result = self.db_manager.execute_query(sql, (author_name, channel), fetch_one=True) return result except Exception as e: print(f"[X] 查询作者失败: {e}") return None def find_author_by_app_id(self, app_id: str) -> Optional[Dict]: """ 根据app_id查找数据库记录 Args: app_id: 百家号app_id Returns: 作者记录字典,未找到返回None """ try: sql = "SELECT * FROM ai_authors WHERE app_id = %s LIMIT 1" result = self.db_manager.execute_query(sql, (app_id,), fetch_one=True) return result except Exception as e: print(f"[X] 查询作者失败: {e}") return None def update_author_cookie(self, author_id: int, cookie_string: str, app_id: Optional[str] = None, app_token: Optional[str] = None) -> bool: """ 更新作者的Cookie信息 Args: author_id: 作者ID cookie_string: Cookie字符串 app_id: 百家号app_id(可选) app_token: 百家号app_token(可选) Returns: 是否更新成功 """ try: # 构建更新SQL update_fields = ["toutiao_cookie = %s", "updated_at = NOW()"] params = [cookie_string] # 如果提供了app_id和app_token,也一并更新 if app_id: update_fields.append("app_id = %s") params.append(app_id) if app_token: update_fields.append("app_token = %s") params.append(app_token) params.append(author_id) sql = f"UPDATE ai_authors SET {', '.join(update_fields)} WHERE id = %s" self.db_manager.execute_update(sql, tuple(params)) return True except Exception as e: print(f"[X] 更新Cookie失败: {e}") return False def insert_new_author(self, author_name: str, cookie_string: str, app_id: Optional[str] = None, nick: Optional[str] = None, domain: Optional[str] = None, level: Optional[str] = None) -> bool: """ 插入新作者记录 Args: author_name: 作者名称 cookie_string: Cookie字符串 app_id: 百家号app_id nick: 昵称 domain: 领域 level: 等级 Returns: 是否插入成功 """ try: # 构建插入SQL sql = """ INSERT INTO ai_authors (author_name, app_id, app_token, department_id, department_name, department, toutiao_cookie, channel, status, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) """ # 参数 params = ( author_name, app_id or '', '', # app_token 暂时为空 0, # department_id 默认0 domain or '其它', # department_name 使用领域 '', # department 暂时为空 cookie_string, 1, # channel: 1=baidu 'active' # status ) self.db_manager.execute_update(sql, params) return True except Exception as e: print(f"[X] 插入新作者失败: {e}") return False def sync_cookies(self, auto_create: bool = True) -> Dict: """ 同步Cookie到数据库 Args: auto_create: 当作者不存在时是否自动创建,默认True Returns: 同步统计信息 """ stats = { 'total': 0, 'updated': 0, 'created': 0, 'failed': 0, 'skipped': 0 } # 成功和失败记录 success_records = [] failed_records = [] # 加载Cookie数据 cookies_data = self.load_cookies_from_file() if not cookies_data: print("[X] 没有可同步的Cookie数据") return stats stats['total'] = len(cookies_data) print("\n" + "="*70) print("开始同步Cookie到数据库") print("="*70) # 遍历每个账号 for idx, (account_name, account_info) in enumerate(cookies_data.items(), 1): print(f"\n[{idx}/{stats['total']}] 处理账号: {account_name}") # 提取Cookie信息 cookies = account_info.get('cookies', {}) if not cookies: print(" [!] 该账号没有Cookie,跳过") stats['skipped'] += 1 continue # 转换Cookie为字符串 cookie_string = self.cookie_dict_to_string(cookies) # 提取其他信息 app_id = account_info.get('app_id', '') nick = account_info.get('nick', '') domain = account_info.get('domain', '') level = account_info.get('level', '') # 查找作者(使用 author_name + channel 作为唯一键) channel = 1 # 百家号固定为channel=1 author = self.find_author_by_name(account_name, channel) if author: print(f" [√] 找到作者: {author['author_name']} (ID: {author['id']}, Channel: {author['channel']})") # 更新或创建 if author: # 更新现有记录 success = self.update_author_cookie( author['id'], cookie_string, app_id if app_id else None, None # app_token暂不更新 ) if success: print(f" [OK] Cookie已更新") stats['updated'] += 1 # 记录成功 success_records.append({ 'account_name': account_name, 'app_id': app_id, 'nick': nick, 'domain': domain, 'action': 'updated', 'db_author_id': author['id'], 'db_author_name': author['author_name'] }) else: print(f" [X] Cookie更新失败") stats['failed'] += 1 # 记录失败 failed_records.append({ 'account_name': account_name, 'app_id': app_id, 'nick': nick, 'reason': '数据库更新失败', 'db_author_id': author['id'] }) else: # 作者不存在 if auto_create: print(f" [*] 作者不存在,创建新记录...") success = self.insert_new_author( account_name, cookie_string, app_id, nick, domain, level ) if success: print(f" [OK] 新作者已创建") stats['created'] += 1 # 记录成功 success_records.append({ 'account_name': account_name, 'app_id': app_id, 'nick': nick, 'domain': domain, 'action': 'created' }) else: print(f" [X] 创建作者失败") stats['failed'] += 1 # 记录失败 failed_records.append({ 'account_name': account_name, 'app_id': app_id, 'nick': nick, 'reason': '数据库插入失败' }) else: print(f" [!] 作者不存在,跳过(auto_create=False)") stats['skipped'] += 1 # 记录失败(数据库中不存在) failed_records.append({ 'account_name': account_name, 'app_id': app_id, 'nick': nick, 'reason': '数据库中不存在该账号,且未开启自动创建' }) # 保存记录文件 self.save_sync_records(success_records, failed_records) return stats def run(self, auto_create: bool = True): """ 执行同步任务 Args: auto_create: 是否自动创建不存在的作者 """ logger.info("\n" + "="*70) logger.info("Cookie同步到数据库工具") logger.info("="*70) print("\n" + "="*70) print("Cookie同步到数据库工具") print("="*70) # 连接数据库 if not self.connect_db(): logger.error("数据库连接失败,退出") return try: # 同步Cookie logger.info(f"开始同步Cookie,自动创建={auto_create}") stats = self.sync_cookies(auto_create=auto_create) # 显示统计 print("\n" + "="*70) print("同步完成") print("="*70) print(f" 总账号数: {stats['total']}") print(f" 已更新: {stats['updated']}") print(f" 已创建: {stats['created']}") print(f" 失败: {stats['failed']}") print(f" 跳过: {stats['skipped']}") print("="*70 + "\n") logger.info("同步结果:") logger.info(f" 总账号数: {stats['total']}") logger.info(f" 已更新: {stats['updated']}") logger.info(f" 已创建: {stats['created']}") logger.info(f" 失败: {stats['failed']}") logger.info(f" 跳过: {stats['skipped']}") finally: # 关闭数据库连接 self.close_db() def main(): """主函数""" print("\n" + "="*70) print("Cookie同步到数据库配置") print("="*70) # 使用默认配置还是自定义配置 print("\n请选择数据库配置方式:") print(" 1. 使用默认配置 (8.149.233.36/ai_statistics_read)") print(" 2. 自定义配置") choice = input("\n请选择 (1/2, 默认1): ").strip() or '1' if choice == '2': # 自定义数据库配置 print("\n请输入数据库连接信息:\n") host = input("数据库地址: ").strip() port = input("端口 (默认: 3306): ").strip() or '3306' user = input("用户名: ").strip() password = input("密码: ").strip() database = input("数据库名: ").strip() db_config = { 'host': host, 'port': int(port), 'user': user, 'password': password, 'database': database, 'charset': 'utf8mb4' } else: # 使用默认配置 db_config = None print("\n使用默认数据库配置...") # 询问是否自动创建不存在的作者 print("\n" + "="*70) auto_create_input = input("当作者不存在时是否自动创建?(y/n, 默认y): ").strip().lower() auto_create = auto_create_input != 'n' # 创建同步器并执行 syncer = CookieSyncToDB(db_config) print("\n配置确认:") print(f" 数据库: {syncer.db_config['host']}:{syncer.db_config.get('port', 3306)}/{syncer.db_config['database']}") print(f" 用户: {syncer.db_config['user']}") print(f" 自动创建: {'是' if auto_create else '否'}") print("="*70) confirm = input("\n确认开始同步?(y/n): ").strip().lower() if confirm != 'y': print("\n已取消") return syncer.run(auto_create=auto_create) if __name__ == '__main__': main()