Files
baijiahao_data_crawl/add_single_cookie_to_db.py

441 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
将单个账号的Cookie输入到MySQL数据库
支持手动输入Cookie信息或从剪贴板粘贴
"""
import json
import sys
import os
from datetime import datetime
from typing import Dict, Optional
# 导入统一的数据库管理器和日志配置
from database_config import DatabaseManager, DB_CONFIG
from log_config import setup_cookie_sync_logger
# 初始化日志记录器
logger = setup_cookie_sync_logger()
# 设置UTF-8编码
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
class SingleCookieToDB:
"""单个Cookie同步到数据库"""
def __init__(self, db_config: Optional[Dict] = None):
"""
初始化数据库连接
Args:
db_config: 数据库配置字典默认使用database_config.DB_CONFIG
"""
self.script_dir = os.path.dirname(os.path.abspath(__file__))
# 使用统一的数据库管理器
self.db_manager = DatabaseManager(db_config)
self.db_config = self.db_manager.config
def connect_db(self) -> bool:
"""连接数据库"""
return self.db_manager.test_connection()
def close_db(self):
"""关闭数据库连接"""
print("[OK] 数据库操作完成")
def cookie_dict_to_string(self, cookies: Dict) -> str:
"""
将Cookie字典转换为字符串格式
Args:
cookies: Cookie字典
Returns:
Cookie字符串格式: "key1=value1; key2=value2"
"""
return '; '.join([f"{k}={v}" for k, v in cookies.items()])
def cookie_string_to_dict(self, cookie_string: str) -> Dict:
"""
将Cookie字符串转换为字典格式
Args:
cookie_string: Cookie字符串格式: "key1=value1; key2=value2"
Returns:
Cookie字典
"""
cookies = {}
for item in cookie_string.split(';'):
item = item.strip()
if '=' in item:
key, value = item.split('=', 1)
cookies[key.strip()] = value.strip()
return cookies
def find_author_by_name(self, author_name: str, channel: int = 1) -> Optional[Dict]:
"""
根据作者名称和渠道查找数据库记录
Args:
author_name: 作者名称
channel: 渠道1=百家号默认1
Returns:
作者记录字典未找到返回None
"""
try:
sql = "SELECT * FROM ai_authors WHERE author_name = %s AND channel = %s LIMIT 1"
result = self.db_manager.execute_query(sql, (author_name, channel), fetch_one=True)
return result
except Exception as e:
print(f"[X] 查询作者失败: {e}")
return None
def update_author_cookie(self, author_id: int, cookie_string: str,
app_id: Optional[str] = None) -> bool:
"""
更新作者的Cookie信息
Args:
author_id: 作者ID
cookie_string: Cookie字符串
app_id: 百家号app_id可选
Returns:
是否更新成功
"""
try:
# 构建更新SQL
update_fields = ["toutiao_cookie = %s", "updated_at = NOW()"]
params = [cookie_string]
# 如果提供了app_id也一并更新
if app_id:
update_fields.append("app_id = %s")
params.append(app_id)
params.append(author_id)
sql = f"UPDATE ai_authors SET {', '.join(update_fields)} WHERE id = %s"
self.db_manager.execute_update(sql, tuple(params))
logger.info(f"成功更新作者ID={author_id}的Cookie")
return True
except Exception as e:
logger.error(f"更新Cookie失败: {e}", exc_info=True)
print(f"[X] 更新Cookie失败: {e}")
return False
def insert_new_author(self, author_name: str, cookie_string: str,
app_id: Optional[str] = None, nick: Optional[str] = None,
domain: Optional[str] = None) -> bool:
"""
插入新作者记录
Args:
author_name: 作者名称用于数据库author_name字段
cookie_string: Cookie字符串
app_id: 百家号app_id
nick: 昵称
domain: 领域
Returns:
是否插入成功
"""
try:
# 构建插入SQL
sql = """
INSERT INTO ai_authors
(author_name, app_id, app_token, department_id, department_name,
department, toutiao_cookie, channel, status, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
"""
# 参数
params = (
author_name,
app_id or '',
'', # app_token 暂时为空
0, # department_id 默认0
domain or '其它', # department_name 使用领域
'', # department 暂时为空
cookie_string,
1, # channel: 1=baidu
'active' # status
)
self.db_manager.execute_update(sql, params)
logger.info(f"成功创建新作者: {author_name}")
return True
except Exception as e:
logger.error(f"插入新作者失败: {e}", exc_info=True)
print(f"[X] 插入新作者失败: {e}")
return False
def add_cookie(self, account_info: Dict, auto_create: bool = True) -> bool:
"""
添加单个账号的Cookie到数据库
Args:
account_info: 账号信息字典包含cookies、username、nick等字段
auto_create: 当作者不存在时是否自动创建默认True
Returns:
是否添加成功
"""
# 提取Cookie信息
cookies = account_info.get('cookies', {})
if not cookies:
print("[X] Cookie信息为空")
return False
# 转换Cookie为字符串如果是字典格式
if isinstance(cookies, dict):
cookie_string = self.cookie_dict_to_string(cookies)
else:
cookie_string = str(cookies)
# 提取其他信息使用username和nick作为author_name进行匹配
username = account_info.get('username', '').strip()
nick = account_info.get('nick', '').strip()
app_id = account_info.get('app_id', '').strip()
domain = account_info.get('domain', '').strip()
# 验证username或nick至少有一个存在
if not username and not nick:
print("[X] username和nick至少需要提供一个")
return False
print(f"\n账号信息:")
print(f" Username: {username}")
print(f" 昵称: {nick}")
print(f" App ID: {app_id}")
print(f" 领域: {domain}")
# 查找作者使用双重匹配机制先username后nick
channel = 1 # 百家号固定为channel=1
author = None
matched_field = None
# 1. 首先尝试使用username匹配
if username:
author = self.find_author_by_name(username, channel)
if author:
matched_field = 'username'
print(f"\n[√] 通过username匹配到作者: {author['author_name']} (ID: {author['id']}, Channel: {author['channel']})")
# 2. 如果username匹配失败尝试使用nick匹配
if not author and nick:
author = self.find_author_by_name(nick, channel)
if author:
matched_field = 'nick'
print(f"\n[√] 通过nick匹配到作者: {author['author_name']} (ID: {author['id']}, Channel: {author['channel']})")
# 3. 如果都没匹配到
if not author:
print(f"\n[!] 未找到匹配的作者已尝试username和nick")
# 更新或创建
if author:
# 更新现有记录
print(f"\n正在更新作者Cookie...")
success = self.update_author_cookie(
author['id'],
cookie_string,
app_id if app_id else None
)
if success:
print(f"[OK] Cookie已更新匹配字段: {matched_field}")
return True
else:
print(f"[X] Cookie更新失败")
return False
else:
# 作者不存在,考虑创建
if auto_create:
# 优先使用username如果没有则使用nick
author_name_to_create = username if username else nick
print(f"\n正在创建新作者author_name: {author_name_to_create}...")
success = self.insert_new_author(
author_name_to_create,
cookie_string,
app_id,
nick,
domain
)
if success:
print(f"[OK] 新作者已创建 (author_name: {author_name_to_create})")
return True
else:
print(f"[X] 创建作者失败")
return False
else:
print(f"[X] 作者不存在,且未开启自动创建")
return False
def run_interactive(self):
"""交互式运行模式"""
print("\n" + "="*70)
print("添加单个账号Cookie到数据库")
print("="*70)
# 连接数据库
if not self.connect_db():
logger.error("数据库连接失败,退出")
return
try:
# 询问是否自动创建不存在的作者
print("\n当作者不存在时是否自动创建?")
auto_create_input = input("(y/n, 默认y): ").strip().lower()
auto_create = auto_create_input != 'n'
# 输入账号信息
print("\n" + "="*70)
print("请输入账号信息:")
print("="*70)
username = input("\n1. Username (用于匹配数据库author_name): ").strip()
nick = input("2. 昵称 (备用匹配字段): ").strip()
app_id = input("3. App ID (可选): ").strip()
domain = input("4. 领域 (可选): ").strip()
# 输入Cookie
print("\n" + "="*70)
print("请输入Cookie信息:")
print("提示: 可以输入以下任意格式:")
print(" 1. Cookie字符串: key1=value1; key2=value2")
print(" 2. JSON格式: {\"key1\": \"value1\", \"key2\": \"value2\"}")
print(" 3. 多行输入,输入完成后输入 END 结束")
print("="*70)
cookie_lines = []
while True:
line = input().strip()
if line.upper() == 'END':
break
if line:
cookie_lines.append(line)
cookie_input = ' '.join(cookie_lines)
# 解析Cookie
cookies = {}
if cookie_input.startswith('{'):
# JSON格式
try:
cookies = json.loads(cookie_input)
except json.JSONDecodeError:
print("[X] Cookie JSON格式解析失败")
return
else:
# 字符串格式
cookies = self.cookie_string_to_dict(cookie_input)
if not cookies:
print("[X] Cookie为空操作取消")
return
# 构建账号信息
account_info = {
'username': username,
'nick': nick,
'app_id': app_id,
'domain': domain,
'cookies': cookies
}
# 确认信息
print("\n" + "="*70)
print("确认账号信息:")
print("="*70)
print(f" Username: {username}")
print(f" 昵称: {nick}")
print(f" App ID: {app_id}")
print(f" 领域: {domain}")
print(f" Cookie条目数: {len(cookies)}")
print(f" 自动创建: {'' if auto_create else ''}")
print("="*70)
confirm = input("\n确认添加到数据库?(y/n): ").strip().lower()
if confirm != 'y':
print("\n已取消")
return
# 添加Cookie
success = self.add_cookie(account_info, auto_create)
if success:
print("\n" + "="*70)
print("添加成功!")
print("="*70)
else:
print("\n" + "="*70)
print("添加失败,请查看错误信息")
print("="*70)
finally:
# 关闭数据库连接
self.close_db()
def main():
"""主函数"""
print("\n" + "="*70)
print("单个账号Cookie同步工具")
print("="*70)
# 使用默认配置还是自定义配置
print("\n请选择数据库配置方式:")
print(" 1. 使用默认配置 (8.149.233.36/ai_statistics_read)")
print(" 2. 自定义配置")
choice = input("\n请选择 (1/2, 默认1): ").strip() or '1'
if choice == '2':
# 自定义数据库配置
print("\n请输入数据库连接信息:\n")
host = input("数据库地址: ").strip()
port = input("端口 (默认: 3306): ").strip() or '3306'
user = input("用户名: ").strip()
password = input("密码: ").strip()
database = input("数据库名: ").strip()
db_config = {
'host': host,
'port': int(port),
'user': user,
'password': password,
'database': database,
'charset': 'utf8mb4'
}
else:
# 使用默认配置
db_config = None
print("\n使用默认数据库配置...")
# 创建同步器并执行
syncer = SingleCookieToDB(db_config)
print("\n配置确认:")
print(f" 数据库: {syncer.db_config['host']}:{syncer.db_config.get('port', 3306)}/{syncer.db_config['database']}")
print(f" 用户: {syncer.db_config['user']}")
print("="*70)
# 运行交互式模式
syncer.run_interactive()
if __name__ == '__main__':
main()