Files
baijiahao_data_crawl/add_single_cookie_to_db.py

441 lines
15 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
将单个账号的Cookie输入到MySQL数据库
支持手动输入Cookie信息或从剪贴板粘贴
"""
import json
import sys
import os
from datetime import datetime
from typing import Dict, Optional
# 导入统一的数据库管理器和日志配置
from database_config import DatabaseManager, DB_CONFIG
from log_config import setup_cookie_sync_logger
# 初始化日志记录器
logger = setup_cookie_sync_logger()
# 设置UTF-8编码
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
class SingleCookieToDB:
"""单个Cookie同步到数据库"""
def __init__(self, db_config: Optional[Dict] = None):
"""
初始化数据库连接
Args:
db_config: 数据库配置字典默认使用database_config.DB_CONFIG
"""
self.script_dir = os.path.dirname(os.path.abspath(__file__))
# 使用统一的数据库管理器
self.db_manager = DatabaseManager(db_config)
self.db_config = self.db_manager.config
def connect_db(self) -> bool:
"""连接数据库"""
return self.db_manager.test_connection()
def close_db(self):
"""关闭数据库连接"""
print("[OK] 数据库操作完成")
def cookie_dict_to_string(self, cookies: Dict) -> str:
"""
将Cookie字典转换为字符串格式
Args:
cookies: Cookie字典
Returns:
Cookie字符串格式: "key1=value1; key2=value2"
"""
return '; '.join([f"{k}={v}" for k, v in cookies.items()])
def cookie_string_to_dict(self, cookie_string: str) -> Dict:
"""
将Cookie字符串转换为字典格式
Args:
cookie_string: Cookie字符串格式: "key1=value1; key2=value2"
Returns:
Cookie字典
"""
cookies = {}
for item in cookie_string.split(';'):
item = item.strip()
if '=' in item:
key, value = item.split('=', 1)
cookies[key.strip()] = value.strip()
return cookies
def find_author_by_name(self, author_name: str, channel: int = 1) -> Optional[Dict]:
"""
根据作者名称和渠道查找数据库记录
Args:
author_name: 作者名称
channel: 渠道1=百家号默认1
Returns:
作者记录字典未找到返回None
"""
try:
sql = "SELECT * FROM ai_authors WHERE author_name = %s AND channel = %s LIMIT 1"
result = self.db_manager.execute_query(sql, (author_name, channel), fetch_one=True)
return result
except Exception as e:
print(f"[X] 查询作者失败: {e}")
return None
def update_author_cookie(self, author_id: int, cookie_string: str,
app_id: Optional[str] = None) -> bool:
"""
更新作者的Cookie信息
Args:
author_id: 作者ID
cookie_string: Cookie字符串
app_id: 百家号app_id可选
Returns:
是否更新成功
"""
try:
# 构建更新SQL
update_fields = ["toutiao_cookie = %s", "updated_at = NOW()"]
params = [cookie_string]
# 如果提供了app_id也一并更新
if app_id:
update_fields.append("app_id = %s")
params.append(app_id)
params.append(author_id)
sql = f"UPDATE ai_authors SET {', '.join(update_fields)} WHERE id = %s"
self.db_manager.execute_update(sql, tuple(params))
logger.info(f"成功更新作者ID={author_id}的Cookie")
return True
except Exception as e:
logger.error(f"更新Cookie失败: {e}", exc_info=True)
print(f"[X] 更新Cookie失败: {e}")
return False
def insert_new_author(self, author_name: str, cookie_string: str,
app_id: Optional[str] = None, nick: Optional[str] = None,
domain: Optional[str] = None) -> bool:
"""
插入新作者记录
Args:
author_name: 作者名称用于数据库author_name字段
cookie_string: Cookie字符串
app_id: 百家号app_id
nick: 昵称
domain: 领域
Returns:
是否插入成功
"""
try:
# 构建插入SQL
sql = """
INSERT INTO ai_authors
(author_name, app_id, app_token, department_id, department_name,
department, toutiao_cookie, channel, status, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
"""
# 参数
params = (
author_name,
app_id or '',
'', # app_token 暂时为空
0, # department_id 默认0
domain or '其它', # department_name 使用领域
'', # department 暂时为空
cookie_string,
1, # channel: 1=baidu
'active' # status
)
self.db_manager.execute_update(sql, params)
logger.info(f"成功创建新作者: {author_name}")
return True
except Exception as e:
logger.error(f"插入新作者失败: {e}", exc_info=True)
print(f"[X] 插入新作者失败: {e}")
return False
def add_cookie(self, account_info: Dict, auto_create: bool = True) -> bool:
"""
添加单个账号的Cookie到数据库
Args:
account_info: 账号信息字典包含cookiesusernamenick等字段
auto_create: 当作者不存在时是否自动创建默认True
Returns:
是否添加成功
"""
# 提取Cookie信息
cookies = account_info.get('cookies', {})
if not cookies:
print("[X] Cookie信息为空")
return False
# 转换Cookie为字符串如果是字典格式
if isinstance(cookies, dict):
cookie_string = self.cookie_dict_to_string(cookies)
else:
cookie_string = str(cookies)
# 提取其他信息使用username和nick作为author_name进行匹配
username = account_info.get('username', '').strip()
nick = account_info.get('nick', '').strip()
app_id = account_info.get('app_id', '').strip()
domain = account_info.get('domain', '').strip()
# 验证username或nick至少有一个存在
if not username and not nick:
print("[X] username和nick至少需要提供一个")
return False
print(f"\n账号信息:")
print(f" Username: {username}")
print(f" 昵称: {nick}")
print(f" App ID: {app_id}")
print(f" 领域: {domain}")
# 查找作者使用双重匹配机制先username后nick
channel = 1 # 百家号固定为channel=1
author = None
matched_field = None
# 1. 首先尝试使用username匹配
if username:
author = self.find_author_by_name(username, channel)
if author:
matched_field = 'username'
print(f"\n[√] 通过username匹配到作者: {author['author_name']} (ID: {author['id']}, Channel: {author['channel']})")
# 2. 如果username匹配失败尝试使用nick匹配
if not author and nick:
author = self.find_author_by_name(nick, channel)
if author:
matched_field = 'nick'
print(f"\n[√] 通过nick匹配到作者: {author['author_name']} (ID: {author['id']}, Channel: {author['channel']})")
# 3. 如果都没匹配到
if not author:
print(f"\n[!] 未找到匹配的作者已尝试username和nick")
# 更新或创建
if author:
# 更新现有记录
print(f"\n正在更新作者Cookie...")
success = self.update_author_cookie(
author['id'],
cookie_string,
app_id if app_id else None
)
if success:
print(f"[OK] Cookie已更新匹配字段: {matched_field}")
return True
else:
print(f"[X] Cookie更新失败")
return False
else:
# 作者不存在,考虑创建
if auto_create:
# 优先使用username如果没有则使用nick
author_name_to_create = username if username else nick
print(f"\n正在创建新作者author_name: {author_name_to_create}...")
success = self.insert_new_author(
author_name_to_create,
cookie_string,
app_id,
nick,
domain
)
if success:
print(f"[OK] 新作者已创建 (author_name: {author_name_to_create})")
return True
else:
print(f"[X] 创建作者失败")
return False
else:
print(f"[X] 作者不存在,且未开启自动创建")
return False
def run_interactive(self):
"""交互式运行模式"""
print("\n" + "="*70)
print("添加单个账号Cookie到数据库")
print("="*70)
# 连接数据库
if not self.connect_db():
logger.error("数据库连接失败,退出")
return
try:
# 询问是否自动创建不存在的作者
print("\n当作者不存在时是否自动创建?")
auto_create_input = input("(y/n, 默认y): ").strip().lower()
auto_create = auto_create_input != 'n'
# 输入账号信息
print("\n" + "="*70)
print("请输入账号信息:")
print("="*70)
username = input("\n1. Username (用于匹配数据库author_name): ").strip()
nick = input("2. 昵称 (备用匹配字段): ").strip()
app_id = input("3. App ID (可选): ").strip()
domain = input("4. 领域 (可选): ").strip()
# 输入Cookie
print("\n" + "="*70)
print("请输入Cookie信息:")
print("提示: 可以输入以下任意格式:")
print(" 1. Cookie字符串: key1=value1; key2=value2")
print(" 2. JSON格式: {\"key1\": \"value1\", \"key2\": \"value2\"}")
print(" 3. 多行输入,输入完成后输入 END 结束")
print("="*70)
cookie_lines = []
while True:
line = input().strip()
if line.upper() == 'END':
break
if line:
cookie_lines.append(line)
cookie_input = ' '.join(cookie_lines)
# 解析Cookie
cookies = {}
if cookie_input.startswith('{'):
# JSON格式
try:
cookies = json.loads(cookie_input)
except json.JSONDecodeError:
print("[X] Cookie JSON格式解析失败")
return
else:
# 字符串格式
cookies = self.cookie_string_to_dict(cookie_input)
if not cookies:
print("[X] Cookie为空操作取消")
return
# 构建账号信息
account_info = {
'username': username,
'nick': nick,
'app_id': app_id,
'domain': domain,
'cookies': cookies
}
# 确认信息
print("\n" + "="*70)
print("确认账号信息:")
print("="*70)
print(f" Username: {username}")
print(f" 昵称: {nick}")
print(f" App ID: {app_id}")
print(f" 领域: {domain}")
print(f" Cookie条目数: {len(cookies)}")
print(f" 自动创建: {'' if auto_create else ''}")
print("="*70)
confirm = input("\n确认添加到数据库?(y/n): ").strip().lower()
if confirm != 'y':
print("\n已取消")
return
# 添加Cookie
success = self.add_cookie(account_info, auto_create)
if success:
print("\n" + "="*70)
print("添加成功!")
print("="*70)
else:
print("\n" + "="*70)
print("添加失败,请查看错误信息")
print("="*70)
finally:
# 关闭数据库连接
self.close_db()
def main():
"""主函数"""
print("\n" + "="*70)
print("单个账号Cookie同步工具")
print("="*70)
# 使用默认配置还是自定义配置
print("\n请选择数据库配置方式:")
print(" 1. 使用默认配置 (8.149.233.36/ai_statistics_read)")
print(" 2. 自定义配置")
choice = input("\n请选择 (1/2, 默认1): ").strip() or '1'
if choice == '2':
# 自定义数据库配置
print("\n请输入数据库连接信息:\n")
host = input("数据库地址: ").strip()
port = input("端口 (默认: 3306): ").strip() or '3306'
user = input("用户名: ").strip()
password = input("密码: ").strip()
database = input("数据库名: ").strip()
db_config = {
'host': host,
'port': int(port),
'user': user,
'password': password,
'database': database,
'charset': 'utf8mb4'
}
else:
# 使用默认配置
db_config = None
print("\n使用默认数据库配置...")
# 创建同步器并执行
syncer = SingleCookieToDB(db_config)
print("\n配置确认:")
print(f" 数据库: {syncer.db_config['host']}:{syncer.db_config.get('port', 3306)}/{syncer.db_config['database']}")
print(f" 用户: {syncer.db_config['user']}")
print("="*70)
# 运行交互式模式
syncer.run_interactive()
if __name__ == '__main__':
main()