Files
baijiahao_data_crawl/sync_cookies_to_db.py

484 lines
17 KiB
Python
Raw Permalink Normal View History

2025-12-25 11:16:59 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
将捕获的Cookie同步到MySQL数据库
从captured_account_cookies.json读取Cookie数据更新到ai_authors表
"""
import json
import sys
import os
from datetime import datetime
from typing import Dict, List, Optional
# 导入统一的数据库管理器和日志配置
from database_config import DatabaseManager, DB_CONFIG
from log_config import setup_cookie_sync_logger
# 初始化日志记录器
logger = setup_cookie_sync_logger()
# 设置UTF-8编码
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
class CookieSyncToDB:
"""Cookie同步到数据库"""
def __init__(self, db_config: Optional[Dict] = None):
"""
初始化数据库连接
Args:
db_config: 数据库配置字典默认使用database_config.DB_CONFIG
"""
self.script_dir = os.path.dirname(os.path.abspath(__file__))
self.cookies_file = os.path.join(self.script_dir, "captured_account_cookies.json")
# 成功和失败记录文件
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.success_file = os.path.join(self.script_dir, f"sync_success_{timestamp}.json")
self.failed_file = os.path.join(self.script_dir, f"sync_failed_{timestamp}.json")
# 使用统一的数据库管理器
self.db_manager = DatabaseManager(db_config)
self.db_config = self.db_manager.config
def connect_db(self) -> bool:
"""连接数据库"""
return self.db_manager.test_connection()
def close_db(self):
"""关闭数据库连接"""
print("[OK] 数据库操作完成")
def load_cookies_from_file(self) -> Dict:
"""从JSON文件加载Cookie数据"""
try:
if not os.path.exists(self.cookies_file):
print(f"[X] Cookie文件不存在: {self.cookies_file}")
return {}
with open(self.cookies_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"[OK] 从文件加载了 {len(data)} 个账号的Cookie")
return data
except Exception as e:
print(f"[X] 加载Cookie文件失败: {e}")
return {}
def cookie_dict_to_string(self, cookies: Dict) -> str:
"""
将Cookie字典转换为字符串格式
Args:
cookies: Cookie字典
Returns:
Cookie字符串格式: "key1=value1; key2=value2"
"""
return '; '.join([f"{k}={v}" for k, v in cookies.items()])
def save_sync_records(self, success_records: List[Dict], failed_records: List[Dict]):
"""
保存同步结果记录
Args:
success_records: 成功记录列表
failed_records: 失败记录列表
"""
try:
# 保存成功记录
if success_records:
with open(self.success_file, 'w', encoding='utf-8') as f:
json.dump(success_records, f, ensure_ascii=False, indent=2)
logger.info(f"成功记录已保存到: {self.success_file}")
print(f"\n[OK] 成功记录已保存: {self.success_file}")
# 保存失败记录
if failed_records:
with open(self.failed_file, 'w', encoding='utf-8') as f:
json.dump(failed_records, f, ensure_ascii=False, indent=2)
logger.info(f"失败记录已保存到: {self.failed_file}")
print(f"[OK] 失败记录已保存: {self.failed_file}")
except Exception as e:
logger.error(f"保存记录文件失败: {e}", exc_info=True)
print(f"[X] 保存记录文件失败: {e}")
def find_author_by_name(self, author_name: str, channel: int = 1) -> Optional[Dict]:
"""
根据作者名称和渠道查找数据库记录
Args:
author_name: 作者名称
channel: 渠道1=百家号默认1
Returns:
作者记录字典未找到返回None
"""
try:
sql = "SELECT * FROM ai_authors WHERE author_name = %s AND channel = %s LIMIT 1"
result = self.db_manager.execute_query(sql, (author_name, channel), fetch_one=True)
return result
except Exception as e:
print(f"[X] 查询作者失败: {e}")
return None
def find_author_by_app_id(self, app_id: str) -> Optional[Dict]:
"""
根据app_id查找数据库记录
Args:
app_id: 百家号app_id
Returns:
作者记录字典未找到返回None
"""
try:
sql = "SELECT * FROM ai_authors WHERE app_id = %s LIMIT 1"
result = self.db_manager.execute_query(sql, (app_id,), fetch_one=True)
return result
except Exception as e:
print(f"[X] 查询作者失败: {e}")
return None
def update_author_cookie(self, author_id: int, cookie_string: str,
app_id: Optional[str] = None, app_token: Optional[str] = None) -> bool:
"""
更新作者的Cookie信息
Args:
author_id: 作者ID
cookie_string: Cookie字符串
app_id: 百家号app_id可选
app_token: 百家号app_token可选
Returns:
是否更新成功
"""
try:
# 构建更新SQL
update_fields = ["toutiao_cookie = %s", "updated_at = NOW()"]
params = [cookie_string]
# 如果提供了app_id和app_token也一并更新
if app_id:
update_fields.append("app_id = %s")
params.append(app_id)
if app_token:
update_fields.append("app_token = %s")
params.append(app_token)
params.append(author_id)
sql = f"UPDATE ai_authors SET {', '.join(update_fields)} WHERE id = %s"
self.db_manager.execute_update(sql, tuple(params))
return True
except Exception as e:
print(f"[X] 更新Cookie失败: {e}")
return False
def insert_new_author(self, author_name: str, cookie_string: str,
app_id: Optional[str] = None, nick: Optional[str] = None,
domain: Optional[str] = None, level: Optional[str] = None) -> bool:
"""
插入新作者记录
Args:
author_name: 作者名称
cookie_string: Cookie字符串
app_id: 百家号app_id
nick: 昵称
domain: 领域
level: 等级
Returns:
是否插入成功
"""
try:
# 构建插入SQL
sql = """
INSERT INTO ai_authors
(author_name, app_id, app_token, department_id, department_name,
department, toutiao_cookie, channel, status, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
"""
# 参数
params = (
author_name,
app_id or '',
'', # app_token 暂时为空
0, # department_id 默认0
domain or '其它', # department_name 使用领域
'', # department 暂时为空
cookie_string,
1, # channel: 1=baidu
'active' # status
)
self.db_manager.execute_update(sql, params)
return True
except Exception as e:
print(f"[X] 插入新作者失败: {e}")
return False
def sync_cookies(self, auto_create: bool = True) -> Dict:
"""
同步Cookie到数据库
Args:
auto_create: 当作者不存在时是否自动创建默认True
Returns:
同步统计信息
"""
stats = {
'total': 0,
'updated': 0,
'created': 0,
'failed': 0,
'skipped': 0
}
# 成功和失败记录
success_records = []
failed_records = []
# 加载Cookie数据
cookies_data = self.load_cookies_from_file()
if not cookies_data:
print("[X] 没有可同步的Cookie数据")
return stats
stats['total'] = len(cookies_data)
print("\n" + "="*70)
print("开始同步Cookie到数据库")
print("="*70)
# 遍历每个账号
for idx, (account_name, account_info) in enumerate(cookies_data.items(), 1):
print(f"\n[{idx}/{stats['total']}] 处理账号: {account_name}")
# 提取Cookie信息
cookies = account_info.get('cookies', {})
if not cookies:
print(" [!] 该账号没有Cookie跳过")
stats['skipped'] += 1
continue
# 转换Cookie为字符串
cookie_string = self.cookie_dict_to_string(cookies)
# 提取其他信息
app_id = account_info.get('app_id', '')
nick = account_info.get('nick', '')
domain = account_info.get('domain', '')
level = account_info.get('level', '')
# 查找作者(使用 author_name + channel 作为唯一键)
channel = 1 # 百家号固定为channel=1
author = self.find_author_by_name(account_name, channel)
if author:
print(f" [√] 找到作者: {author['author_name']} (ID: {author['id']}, Channel: {author['channel']})")
# 更新或创建
if author:
# 更新现有记录
success = self.update_author_cookie(
author['id'],
cookie_string,
app_id if app_id else None,
None # app_token暂不更新
)
if success:
print(f" [OK] Cookie已更新")
stats['updated'] += 1
# 记录成功
success_records.append({
'account_name': account_name,
'app_id': app_id,
'nick': nick,
'domain': domain,
'action': 'updated',
'db_author_id': author['id'],
'db_author_name': author['author_name']
})
else:
print(f" [X] Cookie更新失败")
stats['failed'] += 1
# 记录失败
failed_records.append({
'account_name': account_name,
'app_id': app_id,
'nick': nick,
'reason': '数据库更新失败',
'db_author_id': author['id']
})
else:
# 作者不存在
if auto_create:
print(f" [*] 作者不存在,创建新记录...")
success = self.insert_new_author(
account_name,
cookie_string,
app_id,
nick,
domain,
level
)
if success:
print(f" [OK] 新作者已创建")
stats['created'] += 1
# 记录成功
success_records.append({
'account_name': account_name,
'app_id': app_id,
'nick': nick,
'domain': domain,
'action': 'created'
})
else:
print(f" [X] 创建作者失败")
stats['failed'] += 1
# 记录失败
failed_records.append({
'account_name': account_name,
'app_id': app_id,
'nick': nick,
'reason': '数据库插入失败'
})
else:
print(f" [!] 作者不存在跳过auto_create=False")
stats['skipped'] += 1
# 记录失败(数据库中不存在)
failed_records.append({
'account_name': account_name,
'app_id': app_id,
'nick': nick,
'reason': '数据库中不存在该账号,且未开启自动创建'
})
# 保存记录文件
self.save_sync_records(success_records, failed_records)
return stats
def run(self, auto_create: bool = True):
"""
执行同步任务
Args:
auto_create: 是否自动创建不存在的作者
"""
logger.info("\n" + "="*70)
logger.info("Cookie同步到数据库工具")
logger.info("="*70)
print("\n" + "="*70)
print("Cookie同步到数据库工具")
print("="*70)
# 连接数据库
if not self.connect_db():
logger.error("数据库连接失败,退出")
return
try:
# 同步Cookie
logger.info(f"开始同步Cookie自动创建={auto_create}")
stats = self.sync_cookies(auto_create=auto_create)
# 显示统计
print("\n" + "="*70)
print("同步完成")
print("="*70)
print(f" 总账号数: {stats['total']}")
print(f" 已更新: {stats['updated']}")
print(f" 已创建: {stats['created']}")
print(f" 失败: {stats['failed']}")
print(f" 跳过: {stats['skipped']}")
print("="*70 + "\n")
logger.info("同步结果:")
logger.info(f" 总账号数: {stats['total']}")
logger.info(f" 已更新: {stats['updated']}")
logger.info(f" 已创建: {stats['created']}")
logger.info(f" 失败: {stats['failed']}")
logger.info(f" 跳过: {stats['skipped']}")
finally:
# 关闭数据库连接
self.close_db()
def main():
"""主函数"""
print("\n" + "="*70)
print("Cookie同步到数据库配置")
print("="*70)
# 使用默认配置还是自定义配置
print("\n请选择数据库配置方式:")
print(" 1. 使用默认配置 (8.149.233.36/ai_statistics_read)")
print(" 2. 自定义配置")
choice = input("\n请选择 (1/2, 默认1): ").strip() or '1'
if choice == '2':
# 自定义数据库配置
print("\n请输入数据库连接信息:\n")
host = input("数据库地址: ").strip()
port = input("端口 (默认: 3306): ").strip() or '3306'
user = input("用户名: ").strip()
password = input("密码: ").strip()
database = input("数据库名: ").strip()
db_config = {
'host': host,
'port': int(port),
'user': user,
'password': password,
'database': database,
'charset': 'utf8mb4'
}
else:
# 使用默认配置
db_config = None
print("\n使用默认数据库配置...")
# 询问是否自动创建不存在的作者
print("\n" + "="*70)
auto_create_input = input("当作者不存在时是否自动创建?(y/n, 默认y): ").strip().lower()
auto_create = auto_create_input != 'n'
# 创建同步器并执行
syncer = CookieSyncToDB(db_config)
print("\n配置确认:")
print(f" 数据库: {syncer.db_config['host']}:{syncer.db_config.get('port', 3306)}/{syncer.db_config['database']}")
print(f" 用户: {syncer.db_config['user']}")
print(f" 自动创建: {'' if auto_create else ''}")
print("="*70)
confirm = input("\n确认开始同步?(y/n): ").strip().lower()
if confirm != 'y':
print("\n已取消")
return
syncer.run(auto_create=auto_create)
if __name__ == '__main__':
main()