Files
baijiahao_data_crawl/sync_cookies_to_db.py

521 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
将捕获的Cookie同步到MySQL数据库
从captured_account_cookies.json读取Cookie数据更新到ai_authors表
"""
import json
import sys
import os
from datetime import datetime
from typing import Dict, List, Optional
# 导入统一的数据库管理器和日志配置
from database_config import DatabaseManager, DB_CONFIG
from log_config import setup_cookie_sync_logger
# 初始化日志记录器
logger = setup_cookie_sync_logger()
# 设置UTF-8编码
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
class CookieSyncToDB:
"""Cookie同步到数据库"""
def __init__(self, db_config: Optional[Dict] = None):
"""
初始化数据库连接
Args:
db_config: 数据库配置字典默认使用database_config.DB_CONFIG
"""
self.script_dir = os.path.dirname(os.path.abspath(__file__))
self.cookies_file = os.path.join(self.script_dir, "captured_account_cookies.json")
# 成功和失败记录文件
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.success_file = os.path.join(self.script_dir, f"sync_success_{timestamp}.json")
self.failed_file = os.path.join(self.script_dir, f"sync_failed_{timestamp}.json")
# 使用统一的数据库管理器
self.db_manager = DatabaseManager(db_config)
self.db_config = self.db_manager.config
def connect_db(self) -> bool:
"""连接数据库"""
return self.db_manager.test_connection()
def close_db(self):
"""关闭数据库连接"""
print("[OK] 数据库操作完成")
def load_cookies_from_file(self) -> Dict:
"""从JSON文件加载Cookie数据"""
try:
if not os.path.exists(self.cookies_file):
print(f"[X] Cookie文件不存在: {self.cookies_file}")
return {}
with open(self.cookies_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"[OK] 从文件加载了 {len(data)} 个账号的Cookie")
return data
except Exception as e:
print(f"[X] 加载Cookie文件失败: {e}")
return {}
def cookie_dict_to_string(self, cookies: Dict) -> str:
"""
将Cookie字典转换为字符串格式
Args:
cookies: Cookie字典
Returns:
Cookie字符串格式: "key1=value1; key2=value2"
"""
return '; '.join([f"{k}={v}" for k, v in cookies.items()])
def save_sync_records(self, success_records: List[Dict], failed_records: List[Dict]):
"""
保存同步结果记录
Args:
success_records: 成功记录列表
failed_records: 失败记录列表
"""
try:
# 保存成功记录
if success_records:
with open(self.success_file, 'w', encoding='utf-8') as f:
json.dump(success_records, f, ensure_ascii=False, indent=2)
logger.info(f"成功记录已保存到: {self.success_file}")
print(f"\n[OK] 成功记录已保存: {self.success_file}")
# 保存失败记录
if failed_records:
with open(self.failed_file, 'w', encoding='utf-8') as f:
json.dump(failed_records, f, ensure_ascii=False, indent=2)
logger.info(f"失败记录已保存到: {self.failed_file}")
print(f"[OK] 失败记录已保存: {self.failed_file}")
except Exception as e:
logger.error(f"保存记录文件失败: {e}", exc_info=True)
print(f"[X] 保存记录文件失败: {e}")
def find_author_by_name(self, author_name: str, channel: int = 1) -> Optional[Dict]:
"""
根据作者名称和渠道查找数据库记录
Args:
author_name: 作者名称
channel: 渠道1=百家号默认1
Returns:
作者记录字典未找到返回None
"""
try:
sql = "SELECT * FROM ai_authors WHERE author_name = %s AND channel = %s LIMIT 1"
result = self.db_manager.execute_query(sql, (author_name, channel), fetch_one=True)
return result
except Exception as e:
print(f"[X] 查询作者失败: {e}")
return None
def find_author_by_app_id(self, app_id: str) -> Optional[Dict]:
"""
根据app_id查找数据库记录
Args:
app_id: 百家号app_id
Returns:
作者记录字典未找到返回None
"""
try:
sql = "SELECT * FROM ai_authors WHERE app_id = %s LIMIT 1"
result = self.db_manager.execute_query(sql, (app_id,), fetch_one=True)
return result
except Exception as e:
print(f"[X] 查询作者失败: {e}")
return None
def update_author_cookie(self, author_id: int, cookie_string: str,
app_id: Optional[str] = None, app_token: Optional[str] = None) -> bool:
"""
更新作者的Cookie信息
Args:
author_id: 作者ID
cookie_string: Cookie字符串
app_id: 百家号app_id可选
app_token: 百家号app_token可选
Returns:
是否更新成功
"""
try:
# 构建更新SQL
update_fields = ["toutiao_cookie = %s", "updated_at = NOW()"]
params = [cookie_string]
# 如果提供了app_id和app_token也一并更新
if app_id:
update_fields.append("app_id = %s")
params.append(app_id)
if app_token:
update_fields.append("app_token = %s")
params.append(app_token)
params.append(author_id)
sql = f"UPDATE ai_authors SET {', '.join(update_fields)} WHERE id = %s"
self.db_manager.execute_update(sql, tuple(params))
return True
except Exception as e:
print(f"[X] 更新Cookie失败: {e}")
return False
def insert_new_author(self, author_name: str, cookie_string: str,
app_id: Optional[str] = None, nick: Optional[str] = None,
domain: Optional[str] = None, level: Optional[str] = None) -> bool:
"""
插入新作者记录
Args:
author_name: 作者名称
cookie_string: Cookie字符串
app_id: 百家号app_id
nick: 昵称
domain: 领域
level: 等级
Returns:
是否插入成功
"""
try:
# 构建插入SQL
sql = """
INSERT INTO ai_authors
(author_name, app_id, app_token, department_id, department_name,
department, toutiao_cookie, channel, status, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
"""
# 参数
params = (
author_name,
app_id or '',
'', # app_token 暂时为空
0, # department_id 默认0
domain or '其它', # department_name 使用领域
'', # department 暂时为空
cookie_string,
1, # channel: 1=baidu
'active' # status
)
self.db_manager.execute_update(sql, params)
return True
except Exception as e:
print(f"[X] 插入新作者失败: {e}")
return False
def sync_cookies(self, auto_create: bool = True) -> Dict:
"""
同步Cookie到数据库
Args:
auto_create: 当作者不存在时是否自动创建默认True
Returns:
同步统计信息
"""
stats = {
'total': 0,
'updated': 0,
'created': 0,
'failed': 0,
'skipped': 0
}
# 成功和失败记录
success_records = []
failed_records = []
# 加载Cookie数据
cookies_data = self.load_cookies_from_file()
if not cookies_data:
print("[X] 没有可同步的Cookie数据")
return stats
stats['total'] = len(cookies_data)
print("\n" + "="*70)
print("开始同步Cookie到数据库")
print("="*70)
# 遍历每个账号
for idx, (account_name, account_info) in enumerate(cookies_data.items(), 1):
print(f"\n[{idx}/{stats['total']}] 处理账号: {account_name}")
# 提取Cookie信息
cookies = account_info.get('cookies', {})
if not cookies:
print(" [!] 该账号没有Cookie跳过")
stats['skipped'] += 1
continue
# 转换Cookie为字符串
cookie_string = self.cookie_dict_to_string(cookies)
# 提取其他信息使用username和nick作为author_name进行匹配
username = account_info.get('username', '') # 优先用于与数据库author_name匹配
nick = account_info.get('nick', '') # 备用匹配字段
app_id = account_info.get('app_id', '')
domain = account_info.get('domain', '')
level = account_info.get('level', '')
# 验证username或nick至少有一个存在
if not username and not nick:
print(" [!] 该账号没有username和nick字段跳过")
stats['skipped'] += 1
continue
print(f" Username: {username}")
print(f" 昵称: {nick}")
# 查找作者使用双重匹配机制先username后nick
channel = 1 # 百家号固定为channel=1
author = None
matched_field = None
# 1. 首先尝试使用username匹配
if username:
author = self.find_author_by_name(username, channel)
if author:
matched_field = 'username'
print(f" [√] 通过username匹配到作者: {author['author_name']} (ID: {author['id']}, Channel: {author['channel']})")
# 2. 如果username匹配失败尝试使用nick匹配
if not author and nick:
author = self.find_author_by_name(nick, channel)
if author:
matched_field = 'nick'
print(f" [√] 通过nick匹配到作者: {author['author_name']} (ID: {author['id']}, Channel: {author['channel']})")
# 3. 如果都没匹配到
if not author:
print(f" [!] 未找到匹配的作者已尝试username和nick")
# 更新或创建
if author:
# 更新现有记录
success = self.update_author_cookie(
author['id'],
cookie_string,
app_id if app_id else None,
None # app_token暂不更新
)
if success:
print(f" [OK] Cookie已更新匹配字段: {matched_field}")
stats['updated'] += 1
# 记录成功
success_records.append({
'account_name': account_name,
'username': username,
'nick': nick,
'app_id': app_id,
'domain': domain,
'action': 'updated',
'matched_field': matched_field,
'db_author_id': author['id'],
'db_author_name': author['author_name']
})
else:
print(f" [X] Cookie更新失败")
stats['failed'] += 1
# 记录失败
failed_records.append({
'account_name': account_name,
'username': username,
'nick': nick,
'app_id': app_id,
'reason': '数据库更新失败',
'matched_field': matched_field,
'db_author_id': author['id']
})
else:
# 作者不存在,考虑创建
if auto_create:
# 优先使用username如果没有则使用nick
author_name_to_create = username if username else nick
print(f" [*] 作者不存在创建新记录author_name: {author_name_to_create}...")
success = self.insert_new_author(
author_name_to_create, # 优先使用username否则使用nick
cookie_string,
app_id,
nick,
domain,
level
)
if success:
print(f" [OK] 新作者已创建 (author_name: {author_name_to_create})")
stats['created'] += 1
# 记录成功
success_records.append({
'account_name': account_name,
'username': username,
'nick': nick,
'app_id': app_id,
'domain': domain,
'action': 'created',
'created_with': 'username' if username else 'nick'
})
else:
print(f" [X] 创建作者失败")
stats['failed'] += 1
# 记录失败
failed_records.append({
'account_name': account_name,
'username': username,
'nick': nick,
'app_id': app_id,
'reason': '数据库插入失败'
})
else:
print(f" [!] 作者不存在跳过auto_create=False")
stats['skipped'] += 1
# 记录失败(数据库中不存在)
failed_records.append({
'account_name': account_name,
'username': username,
'nick': nick,
'app_id': app_id,
'reason': '数据库中不存在该账号已尝试username和nick且未开启自动创建'
})
# 保存记录文件
self.save_sync_records(success_records, failed_records)
return stats
def run(self, auto_create: bool = True):
"""
执行同步任务
Args:
auto_create: 是否自动创建不存在的作者
"""
logger.info("\n" + "="*70)
logger.info("Cookie同步到数据库工具")
logger.info("="*70)
print("\n" + "="*70)
print("Cookie同步到数据库工具")
print("="*70)
# 连接数据库
if not self.connect_db():
logger.error("数据库连接失败,退出")
return
try:
# 同步Cookie
logger.info(f"开始同步Cookie自动创建={auto_create}")
stats = self.sync_cookies(auto_create=auto_create)
# 显示统计
print("\n" + "="*70)
print("同步完成")
print("="*70)
print(f" 总账号数: {stats['total']}")
print(f" 已更新: {stats['updated']}")
print(f" 已创建: {stats['created']}")
print(f" 失败: {stats['failed']}")
print(f" 跳过: {stats['skipped']}")
print("="*70 + "\n")
logger.info("同步结果:")
logger.info(f" 总账号数: {stats['total']}")
logger.info(f" 已更新: {stats['updated']}")
logger.info(f" 已创建: {stats['created']}")
logger.info(f" 失败: {stats['failed']}")
logger.info(f" 跳过: {stats['skipped']}")
finally:
# 关闭数据库连接
self.close_db()
def main():
"""主函数"""
print("\n" + "="*70)
print("Cookie同步到数据库配置")
print("="*70)
# 使用默认配置还是自定义配置
print("\n请选择数据库配置方式:")
print(" 1. 使用默认配置 (8.149.233.36/ai_statistics_read)")
print(" 2. 自定义配置")
choice = input("\n请选择 (1/2, 默认1): ").strip() or '1'
if choice == '2':
# 自定义数据库配置
print("\n请输入数据库连接信息:\n")
host = input("数据库地址: ").strip()
port = input("端口 (默认: 3306): ").strip() or '3306'
user = input("用户名: ").strip()
password = input("密码: ").strip()
database = input("数据库名: ").strip()
db_config = {
'host': host,
'port': int(port),
'user': user,
'password': password,
'database': database,
'charset': 'utf8mb4'
}
else:
# 使用默认配置
db_config = None
print("\n使用默认数据库配置...")
# 询问是否自动创建不存在的作者
print("\n" + "="*70)
auto_create_input = input("当作者不存在时是否自动创建?(y/n, 默认y): ").strip().lower()
auto_create = auto_create_input != 'n'
# 创建同步器并执行
syncer = CookieSyncToDB(db_config)
print("\n配置确认:")
print(f" 数据库: {syncer.db_config['host']}:{syncer.db_config.get('port', 3306)}/{syncer.db_config['database']}")
print(f" 用户: {syncer.db_config['user']}")
print(f" 自动创建: {'' if auto_create else ''}")
print("="*70)
confirm = input("\n确认开始同步?(y/n): ").strip().lower()
if confirm != 'y':
print("\n已取消")
return
syncer.run(auto_create=auto_create)
if __name__ == '__main__':
main()