484 lines
17 KiB
Python
484 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
将捕获的Cookie同步到MySQL数据库
|
||
从captured_account_cookies.json读取Cookie数据,更新到ai_authors表
|
||
"""
|
||
|
||
import json
|
||
import sys
|
||
import os
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional
|
||
|
||
# 导入统一的数据库管理器和日志配置
|
||
from database_config import DatabaseManager, DB_CONFIG
|
||
from log_config import setup_cookie_sync_logger
|
||
|
||
# 初始化日志记录器
|
||
logger = setup_cookie_sync_logger()
|
||
|
||
# 设置UTF-8编码
|
||
if sys.platform == 'win32':
|
||
import io
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
|
||
|
||
|
||
class CookieSyncToDB:
|
||
"""Cookie同步到数据库"""
|
||
|
||
def __init__(self, db_config: Optional[Dict] = None):
|
||
"""
|
||
初始化数据库连接
|
||
|
||
Args:
|
||
db_config: 数据库配置字典,默认使用database_config.DB_CONFIG
|
||
"""
|
||
self.script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
self.cookies_file = os.path.join(self.script_dir, "captured_account_cookies.json")
|
||
|
||
# 成功和失败记录文件
|
||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
self.success_file = os.path.join(self.script_dir, f"sync_success_{timestamp}.json")
|
||
self.failed_file = os.path.join(self.script_dir, f"sync_failed_{timestamp}.json")
|
||
|
||
# 使用统一的数据库管理器
|
||
self.db_manager = DatabaseManager(db_config)
|
||
self.db_config = self.db_manager.config
|
||
|
||
def connect_db(self) -> bool:
|
||
"""连接数据库"""
|
||
return self.db_manager.test_connection()
|
||
|
||
def close_db(self):
|
||
"""关闭数据库连接"""
|
||
print("[OK] 数据库操作完成")
|
||
|
||
def load_cookies_from_file(self) -> Dict:
|
||
"""从JSON文件加载Cookie数据"""
|
||
try:
|
||
if not os.path.exists(self.cookies_file):
|
||
print(f"[X] Cookie文件不存在: {self.cookies_file}")
|
||
return {}
|
||
|
||
with open(self.cookies_file, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
|
||
print(f"[OK] 从文件加载了 {len(data)} 个账号的Cookie")
|
||
return data
|
||
except Exception as e:
|
||
print(f"[X] 加载Cookie文件失败: {e}")
|
||
return {}
|
||
|
||
def cookie_dict_to_string(self, cookies: Dict) -> str:
|
||
"""
|
||
将Cookie字典转换为字符串格式
|
||
|
||
Args:
|
||
cookies: Cookie字典
|
||
|
||
Returns:
|
||
Cookie字符串,格式: "key1=value1; key2=value2"
|
||
"""
|
||
return '; '.join([f"{k}={v}" for k, v in cookies.items()])
|
||
|
||
def save_sync_records(self, success_records: List[Dict], failed_records: List[Dict]):
|
||
"""
|
||
保存同步结果记录
|
||
|
||
Args:
|
||
success_records: 成功记录列表
|
||
failed_records: 失败记录列表
|
||
"""
|
||
try:
|
||
# 保存成功记录
|
||
if success_records:
|
||
with open(self.success_file, 'w', encoding='utf-8') as f:
|
||
json.dump(success_records, f, ensure_ascii=False, indent=2)
|
||
logger.info(f"成功记录已保存到: {self.success_file}")
|
||
print(f"\n[OK] 成功记录已保存: {self.success_file}")
|
||
|
||
# 保存失败记录
|
||
if failed_records:
|
||
with open(self.failed_file, 'w', encoding='utf-8') as f:
|
||
json.dump(failed_records, f, ensure_ascii=False, indent=2)
|
||
logger.info(f"失败记录已保存到: {self.failed_file}")
|
||
print(f"[OK] 失败记录已保存: {self.failed_file}")
|
||
except Exception as e:
|
||
logger.error(f"保存记录文件失败: {e}", exc_info=True)
|
||
print(f"[X] 保存记录文件失败: {e}")
|
||
|
||
def find_author_by_name(self, author_name: str, channel: int = 1) -> Optional[Dict]:
|
||
"""
|
||
根据作者名称和渠道查找数据库记录
|
||
|
||
Args:
|
||
author_name: 作者名称
|
||
channel: 渠道(1=百家号,默认1)
|
||
|
||
Returns:
|
||
作者记录字典,未找到返回None
|
||
"""
|
||
try:
|
||
sql = "SELECT * FROM ai_authors WHERE author_name = %s AND channel = %s LIMIT 1"
|
||
result = self.db_manager.execute_query(sql, (author_name, channel), fetch_one=True)
|
||
return result
|
||
except Exception as e:
|
||
print(f"[X] 查询作者失败: {e}")
|
||
return None
|
||
|
||
def find_author_by_app_id(self, app_id: str) -> Optional[Dict]:
|
||
"""
|
||
根据app_id查找数据库记录
|
||
|
||
Args:
|
||
app_id: 百家号app_id
|
||
|
||
Returns:
|
||
作者记录字典,未找到返回None
|
||
"""
|
||
try:
|
||
sql = "SELECT * FROM ai_authors WHERE app_id = %s LIMIT 1"
|
||
result = self.db_manager.execute_query(sql, (app_id,), fetch_one=True)
|
||
return result
|
||
except Exception as e:
|
||
print(f"[X] 查询作者失败: {e}")
|
||
return None
|
||
|
||
def update_author_cookie(self, author_id: int, cookie_string: str,
|
||
app_id: Optional[str] = None, app_token: Optional[str] = None) -> bool:
|
||
"""
|
||
更新作者的Cookie信息
|
||
|
||
Args:
|
||
author_id: 作者ID
|
||
cookie_string: Cookie字符串
|
||
app_id: 百家号app_id(可选)
|
||
app_token: 百家号app_token(可选)
|
||
|
||
Returns:
|
||
是否更新成功
|
||
"""
|
||
try:
|
||
# 构建更新SQL
|
||
update_fields = ["toutiao_cookie = %s", "updated_at = NOW()"]
|
||
params = [cookie_string]
|
||
|
||
# 如果提供了app_id和app_token,也一并更新
|
||
if app_id:
|
||
update_fields.append("app_id = %s")
|
||
params.append(app_id)
|
||
|
||
if app_token:
|
||
update_fields.append("app_token = %s")
|
||
params.append(app_token)
|
||
|
||
params.append(author_id)
|
||
|
||
sql = f"UPDATE ai_authors SET {', '.join(update_fields)} WHERE id = %s"
|
||
self.db_manager.execute_update(sql, tuple(params))
|
||
|
||
return True
|
||
except Exception as e:
|
||
print(f"[X] 更新Cookie失败: {e}")
|
||
return False
|
||
|
||
def insert_new_author(self, author_name: str, cookie_string: str,
|
||
app_id: Optional[str] = None, nick: Optional[str] = None,
|
||
domain: Optional[str] = None, level: Optional[str] = None) -> bool:
|
||
"""
|
||
插入新作者记录
|
||
|
||
Args:
|
||
author_name: 作者名称
|
||
cookie_string: Cookie字符串
|
||
app_id: 百家号app_id
|
||
nick: 昵称
|
||
domain: 领域
|
||
level: 等级
|
||
|
||
Returns:
|
||
是否插入成功
|
||
"""
|
||
try:
|
||
# 构建插入SQL
|
||
sql = """
|
||
INSERT INTO ai_authors
|
||
(author_name, app_id, app_token, department_id, department_name,
|
||
department, toutiao_cookie, channel, status, created_at, updated_at)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
|
||
"""
|
||
|
||
# 参数
|
||
params = (
|
||
author_name,
|
||
app_id or '',
|
||
'', # app_token 暂时为空
|
||
0, # department_id 默认0
|
||
domain or '其它', # department_name 使用领域
|
||
'', # department 暂时为空
|
||
cookie_string,
|
||
1, # channel: 1=baidu
|
||
'active' # status
|
||
)
|
||
|
||
self.db_manager.execute_update(sql, params)
|
||
return True
|
||
except Exception as e:
|
||
print(f"[X] 插入新作者失败: {e}")
|
||
return False
|
||
|
||
def sync_cookies(self, auto_create: bool = True) -> Dict:
|
||
"""
|
||
同步Cookie到数据库
|
||
|
||
Args:
|
||
auto_create: 当作者不存在时是否自动创建,默认True
|
||
|
||
Returns:
|
||
同步统计信息
|
||
"""
|
||
stats = {
|
||
'total': 0,
|
||
'updated': 0,
|
||
'created': 0,
|
||
'failed': 0,
|
||
'skipped': 0
|
||
}
|
||
|
||
# 成功和失败记录
|
||
success_records = []
|
||
failed_records = []
|
||
|
||
# 加载Cookie数据
|
||
cookies_data = self.load_cookies_from_file()
|
||
if not cookies_data:
|
||
print("[X] 没有可同步的Cookie数据")
|
||
return stats
|
||
|
||
stats['total'] = len(cookies_data)
|
||
|
||
print("\n" + "="*70)
|
||
print("开始同步Cookie到数据库")
|
||
print("="*70)
|
||
|
||
# 遍历每个账号
|
||
for idx, (account_name, account_info) in enumerate(cookies_data.items(), 1):
|
||
print(f"\n[{idx}/{stats['total']}] 处理账号: {account_name}")
|
||
|
||
# 提取Cookie信息
|
||
cookies = account_info.get('cookies', {})
|
||
if not cookies:
|
||
print(" [!] 该账号没有Cookie,跳过")
|
||
stats['skipped'] += 1
|
||
continue
|
||
|
||
# 转换Cookie为字符串
|
||
cookie_string = self.cookie_dict_to_string(cookies)
|
||
|
||
# 提取其他信息
|
||
app_id = account_info.get('app_id', '')
|
||
nick = account_info.get('nick', '')
|
||
domain = account_info.get('domain', '')
|
||
level = account_info.get('level', '')
|
||
|
||
# 查找作者(使用 author_name + channel 作为唯一键)
|
||
channel = 1 # 百家号固定为channel=1
|
||
author = self.find_author_by_name(account_name, channel)
|
||
if author:
|
||
print(f" [√] 找到作者: {author['author_name']} (ID: {author['id']}, Channel: {author['channel']})")
|
||
|
||
# 更新或创建
|
||
if author:
|
||
# 更新现有记录
|
||
success = self.update_author_cookie(
|
||
author['id'],
|
||
cookie_string,
|
||
app_id if app_id else None,
|
||
None # app_token暂不更新
|
||
)
|
||
|
||
if success:
|
||
print(f" [OK] Cookie已更新")
|
||
stats['updated'] += 1
|
||
# 记录成功
|
||
success_records.append({
|
||
'account_name': account_name,
|
||
'app_id': app_id,
|
||
'nick': nick,
|
||
'domain': domain,
|
||
'action': 'updated',
|
||
'db_author_id': author['id'],
|
||
'db_author_name': author['author_name']
|
||
})
|
||
else:
|
||
print(f" [X] Cookie更新失败")
|
||
stats['failed'] += 1
|
||
# 记录失败
|
||
failed_records.append({
|
||
'account_name': account_name,
|
||
'app_id': app_id,
|
||
'nick': nick,
|
||
'reason': '数据库更新失败',
|
||
'db_author_id': author['id']
|
||
})
|
||
else:
|
||
# 作者不存在
|
||
if auto_create:
|
||
print(f" [*] 作者不存在,创建新记录...")
|
||
success = self.insert_new_author(
|
||
account_name,
|
||
cookie_string,
|
||
app_id,
|
||
nick,
|
||
domain,
|
||
level
|
||
)
|
||
|
||
if success:
|
||
print(f" [OK] 新作者已创建")
|
||
stats['created'] += 1
|
||
# 记录成功
|
||
success_records.append({
|
||
'account_name': account_name,
|
||
'app_id': app_id,
|
||
'nick': nick,
|
||
'domain': domain,
|
||
'action': 'created'
|
||
})
|
||
else:
|
||
print(f" [X] 创建作者失败")
|
||
stats['failed'] += 1
|
||
# 记录失败
|
||
failed_records.append({
|
||
'account_name': account_name,
|
||
'app_id': app_id,
|
||
'nick': nick,
|
||
'reason': '数据库插入失败'
|
||
})
|
||
else:
|
||
print(f" [!] 作者不存在,跳过(auto_create=False)")
|
||
stats['skipped'] += 1
|
||
# 记录失败(数据库中不存在)
|
||
failed_records.append({
|
||
'account_name': account_name,
|
||
'app_id': app_id,
|
||
'nick': nick,
|
||
'reason': '数据库中不存在该账号,且未开启自动创建'
|
||
})
|
||
|
||
# 保存记录文件
|
||
self.save_sync_records(success_records, failed_records)
|
||
|
||
return stats
|
||
|
||
def run(self, auto_create: bool = True):
|
||
"""
|
||
执行同步任务
|
||
|
||
Args:
|
||
auto_create: 是否自动创建不存在的作者
|
||
"""
|
||
logger.info("\n" + "="*70)
|
||
logger.info("Cookie同步到数据库工具")
|
||
logger.info("="*70)
|
||
|
||
print("\n" + "="*70)
|
||
print("Cookie同步到数据库工具")
|
||
print("="*70)
|
||
|
||
# 连接数据库
|
||
if not self.connect_db():
|
||
logger.error("数据库连接失败,退出")
|
||
return
|
||
|
||
try:
|
||
# 同步Cookie
|
||
logger.info(f"开始同步Cookie,自动创建={auto_create}")
|
||
stats = self.sync_cookies(auto_create=auto_create)
|
||
|
||
# 显示统计
|
||
print("\n" + "="*70)
|
||
print("同步完成")
|
||
print("="*70)
|
||
print(f" 总账号数: {stats['total']}")
|
||
print(f" 已更新: {stats['updated']}")
|
||
print(f" 已创建: {stats['created']}")
|
||
print(f" 失败: {stats['failed']}")
|
||
print(f" 跳过: {stats['skipped']}")
|
||
print("="*70 + "\n")
|
||
|
||
logger.info("同步结果:")
|
||
logger.info(f" 总账号数: {stats['total']}")
|
||
logger.info(f" 已更新: {stats['updated']}")
|
||
logger.info(f" 已创建: {stats['created']}")
|
||
logger.info(f" 失败: {stats['failed']}")
|
||
logger.info(f" 跳过: {stats['skipped']}")
|
||
|
||
finally:
|
||
# 关闭数据库连接
|
||
self.close_db()
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("\n" + "="*70)
|
||
print("Cookie同步到数据库配置")
|
||
print("="*70)
|
||
|
||
# 使用默认配置还是自定义配置
|
||
print("\n请选择数据库配置方式:")
|
||
print(" 1. 使用默认配置 (8.149.233.36/ai_statistics_read)")
|
||
print(" 2. 自定义配置")
|
||
|
||
choice = input("\n请选择 (1/2, 默认1): ").strip() or '1'
|
||
|
||
if choice == '2':
|
||
# 自定义数据库配置
|
||
print("\n请输入数据库连接信息:\n")
|
||
|
||
host = input("数据库地址: ").strip()
|
||
port = input("端口 (默认: 3306): ").strip() or '3306'
|
||
user = input("用户名: ").strip()
|
||
password = input("密码: ").strip()
|
||
database = input("数据库名: ").strip()
|
||
|
||
db_config = {
|
||
'host': host,
|
||
'port': int(port),
|
||
'user': user,
|
||
'password': password,
|
||
'database': database,
|
||
'charset': 'utf8mb4'
|
||
}
|
||
else:
|
||
# 使用默认配置
|
||
db_config = None
|
||
print("\n使用默认数据库配置...")
|
||
|
||
# 询问是否自动创建不存在的作者
|
||
print("\n" + "="*70)
|
||
auto_create_input = input("当作者不存在时是否自动创建?(y/n, 默认y): ").strip().lower()
|
||
auto_create = auto_create_input != 'n'
|
||
|
||
# 创建同步器并执行
|
||
syncer = CookieSyncToDB(db_config)
|
||
|
||
print("\n配置确认:")
|
||
print(f" 数据库: {syncer.db_config['host']}:{syncer.db_config.get('port', 3306)}/{syncer.db_config['database']}")
|
||
print(f" 用户: {syncer.db_config['user']}")
|
||
print(f" 自动创建: {'是' if auto_create else '否'}")
|
||
print("="*70)
|
||
|
||
confirm = input("\n确认开始同步?(y/n): ").strip().lower()
|
||
if confirm != 'y':
|
||
print("\n已取消")
|
||
return
|
||
|
||
syncer.run(auto_create=auto_create)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|