1091 lines
43 KiB
Python
1091 lines
43 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
指纹浏览器管理模块
|
||
支持 AdsPower 指纹浏览器 + Playwright CDP 连接
|
||
用于绕过小红书风控检测
|
||
"""
|
||
|
||
import requests
|
||
import time
|
||
import random
|
||
import logging
|
||
import asyncio
|
||
import os
|
||
from typing import Dict, Any, Optional, Tuple
|
||
from playwright.async_api import async_playwright, Browser, Page, BrowserContext
|
||
|
||
# 创建不使用代理的 Session(用于本地 AdsPower API)
|
||
def get_no_proxy_session():
|
||
"""获取不使用代理的 requests Session"""
|
||
session = requests.Session()
|
||
session.trust_env = False # 禁用环境变量中的代理
|
||
return session
|
||
|
||
# 全局无代理 Session
|
||
_no_proxy_session = None
|
||
|
||
def get_local_session():
|
||
"""获取本地API调用专用Session(无代理)"""
|
||
global _no_proxy_session
|
||
if _no_proxy_session is None:
|
||
_no_proxy_session = get_no_proxy_session()
|
||
return _no_proxy_session
|
||
|
||
from loguru import logger
|
||
|
||
# AdsPower 本地API配置(从配置文件读取)
|
||
def get_adspower_config():
|
||
"""Helper function to get AdsPower config"""
|
||
try:
|
||
from config import get_config
|
||
config = get_config()
|
||
return {
|
||
'api_base': config.get_str('adspower.api_base', 'http://local.adspower.net:50325'),
|
||
'enabled': config.get_bool('adspower.enabled', True),
|
||
'default_group_id': config.get_str('adspower.default_group_id', '0'),
|
||
'api_key': config.get_str('adspower.api_key', 'e5afd5a4cead5589247febbeabc39bcb'),
|
||
'user_id': config.get_str('adspower.user_id', 'user_h235l72'),
|
||
'fingerprint': config.get_dict('adspower.fingerprint') or {
|
||
'automatic_timezone': '1',
|
||
'language': ['zh-CN', 'zh'],
|
||
'ua': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
||
}
|
||
}
|
||
except Exception as e:
|
||
logger.warning(f"[AdsPower] 无法加载配置,使用默认值: {e}")
|
||
return {
|
||
'api_base': 'http://local.adspower.net:50325',
|
||
'enabled': True,
|
||
'default_group_id': '0',
|
||
'api_key': 'e5afd5a4cead5589247febbeabc39bcb',
|
||
'user_id': 'user_h235l72'
|
||
}
|
||
|
||
ADSPOWER_CONFIG = get_adspower_config()
|
||
|
||
|
||
class FingerprintBrowserManager:
|
||
"""
|
||
指纹浏览器管理器
|
||
支持 AdsPower 指纹浏览器的启动、连接和管理
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.api_base = ADSPOWER_CONFIG['api_base']
|
||
self.api_key = ADSPOWER_CONFIG.get('api_key', '')
|
||
self.enabled = ADSPOWER_CONFIG['enabled']
|
||
self.current_browser = None
|
||
self.current_context = None
|
||
self.current_page = None
|
||
self.current_profile_id = None
|
||
self.playwright = None
|
||
|
||
def _get_headers(self):
|
||
"""获取API请求头(使用Bearer Token认证)"""
|
||
headers = {'Content-Type': 'application/json'}
|
||
if self.api_key:
|
||
headers['Authorization'] = f'Bearer {self.api_key}'
|
||
return headers
|
||
|
||
def _add_api_key(self, params: dict) -> dict:
|
||
"""添加API Key到请求参数(备用方法)"""
|
||
# 现在主要使用 Authorization header,这个方法作为备用
|
||
return params
|
||
|
||
async def check_adspower_status(self) -> bool:
|
||
"""
|
||
检查 AdsPower 是否运行中
|
||
|
||
Returns:
|
||
bool: AdsPower 是否可用
|
||
"""
|
||
try:
|
||
import json as json_module
|
||
|
||
logger.info("\n" + "="*70)
|
||
logger.info("[AdsPower API] 检查运行状态")
|
||
logger.info("="*70)
|
||
logger.info(f"URL: {self.api_base}/status")
|
||
logger.info(f"Method: GET")
|
||
logger.info(f"Timeout: 5s")
|
||
|
||
session = get_local_session()
|
||
response = session.get(f"{self.api_base}/status", timeout=5)
|
||
|
||
logger.info("\n" + "-"*70)
|
||
logger.info("[API响应]")
|
||
logger.info("-"*70)
|
||
logger.info(f"Status Code: {response.status_code}")
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
logger.info("Response Body:")
|
||
logger.info(json_module.dumps(data, indent=2, ensure_ascii=False))
|
||
logger.info("="*70 + "\n")
|
||
|
||
if data.get('code') == 0:
|
||
logger.info("[AdsPower] 状态正常")
|
||
return True
|
||
else:
|
||
logger.info(f"Response Body (Raw): {response.text}")
|
||
logger.info("="*70 + "\n")
|
||
|
||
logger.warning(f"[AdsPower] 状态异常: {response.text}")
|
||
return False
|
||
except requests.exceptions.ConnectionError:
|
||
logger.warning("[AdsPower] 未运行,请先启动 AdsPower")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"[AdsPower] 检查状态失败: {e}")
|
||
return False
|
||
|
||
async def find_profile_by_name(self, name: str) -> Optional[str]:
|
||
"""
|
||
根据名称查找配置
|
||
|
||
Args:
|
||
name: 配置名称
|
||
|
||
Returns:
|
||
str: 配置文件ID,找不到返回None
|
||
"""
|
||
try:
|
||
profiles = await self.get_browser_profiles()
|
||
for profile in profiles:
|
||
if profile.get('name') == name:
|
||
profile_id = profile.get('user_id')
|
||
logger.info(f"[指纹浏览器] 找到同名配置: {profile_id} ({name})")
|
||
return profile_id
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"[指纹浏览器] 查找配置异常: {e}")
|
||
return None
|
||
|
||
async def get_browser_profiles(self) -> list:
|
||
"""
|
||
获取所有浏览器配置文件列表
|
||
|
||
Returns:
|
||
list: 配置文件列表
|
||
"""
|
||
try:
|
||
session = get_local_session()
|
||
response = session.get(
|
||
f"{self.api_base}/api/v1/user/list",
|
||
params={'page_size': 100},
|
||
headers=self._get_headers(),
|
||
timeout=10
|
||
)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get('code') == 0:
|
||
profiles = data.get('data', {}).get('list', [])
|
||
logger.info(f"[指纹浏览器] 获取到 {len(profiles)} 个浏览器配置")
|
||
return profiles
|
||
logger.warning(f"[指纹浏览器] 获取配置列表失败: {response.text}")
|
||
return []
|
||
except Exception as e:
|
||
logger.error(f"[指纹浏览器] 获取配置列表异常: {e}")
|
||
return []
|
||
|
||
async def query_profile_proxy(self, profile_id: str) -> dict:
|
||
"""
|
||
查询指定配置的代理信息
|
||
|
||
Args:
|
||
profile_id: 配置文件ID
|
||
|
||
Returns:
|
||
dict: 代理配置信息
|
||
"""
|
||
try:
|
||
session = get_local_session()
|
||
response = session.get(
|
||
f"{self.api_base}/api/v1/user/list",
|
||
params={'user_id': profile_id},
|
||
headers=self._get_headers(),
|
||
timeout=10
|
||
)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get('code') == 0:
|
||
profiles = data.get('data', {}).get('list', [])
|
||
if profiles:
|
||
profile = profiles[0]
|
||
proxy_config = profile.get('user_proxy_config', {})
|
||
logger.info(f"[指纹浏览器] 查询到配置 {profile_id} 的代理: {proxy_config}")
|
||
return proxy_config
|
||
return {}
|
||
except Exception as e:
|
||
logger.error(f"[指纹浏览器] 查询配置代理异常: {e}")
|
||
return {}
|
||
|
||
async def create_browser_profile(self, name: str = None, proxy_config: dict = None, cookies: list = None) -> Optional[str]:
|
||
"""
|
||
创建新的浏览器配置文件
|
||
|
||
Args:
|
||
name: 配置文件名称
|
||
proxy_config: 代理配置 {'server': 'http://ip:port', 'username': '...', 'password': '...'}
|
||
cookies: Cookie列表(Playwright格式),创建时直接注入
|
||
|
||
Returns:
|
||
str: 配置文件ID,失败返回None
|
||
"""
|
||
try:
|
||
if not name:
|
||
name = f"xhs_profile_{int(time.time())}"
|
||
|
||
# 从配置获取指纹信息
|
||
fingerprint = ADSPOWER_CONFIG.get('fingerprint', {})
|
||
|
||
# 构建创建参数(使用API v2)
|
||
create_params = {
|
||
'name': name,
|
||
'group_id': ADSPOWER_CONFIG['default_group_id'],
|
||
'fingerprint_config': {
|
||
'automatic_timezone': '1' if fingerprint.get('automatic_timezone', True) else '0',
|
||
'language': fingerprint.get('language', ['zh-CN', 'zh']),
|
||
'ua': fingerprint.get('user_agent') or fingerprint.get('ua', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
|
||
}
|
||
}
|
||
|
||
# 如果有代理配置,添加代理
|
||
if proxy_config:
|
||
# 解析代理服务器地址
|
||
server = proxy_config.get('server', '')
|
||
if server.startswith('http://'):
|
||
server = server[7:]
|
||
parts = server.split(':')
|
||
if len(parts) == 2:
|
||
create_params['user_proxy_config'] = {
|
||
'proxy_soft': 'other',
|
||
'proxy_type': 'http',
|
||
'proxy_host': parts[0],
|
||
'proxy_port': parts[1],
|
||
'proxy_user': proxy_config.get('username', ''),
|
||
'proxy_password': proxy_config.get('password', '')
|
||
}
|
||
logger.info(f"[指纹浏览器] 配置代理: {parts[0]}:{parts[1]}")
|
||
|
||
# 如果有Cookie,转换为AdsPower格式并添加
|
||
if cookies:
|
||
logger.info(f"[指纹浏览器] 准备注入 {len(cookies)} 个Cookie到新环境")
|
||
# Playwright Cookie格式转换为AdsPower格式
|
||
adspower_cookies = []
|
||
for cookie in cookies:
|
||
adspower_cookie = {
|
||
'domain': cookie.get('domain', ''),
|
||
'name': cookie.get('name', ''),
|
||
'value': cookie.get('value', ''),
|
||
'path': cookie.get('path', '/'),
|
||
'secure': cookie.get('secure', False),
|
||
'sameSite': cookie.get('sameSite', 'unspecified')
|
||
}
|
||
# 如果有过期时间
|
||
if 'expires' in cookie and cookie['expires'] != -1:
|
||
adspower_cookie['expirationDate'] = cookie['expires']
|
||
adspower_cookies.append(adspower_cookie)
|
||
|
||
# 转换为JSON字符串(AdsPower要求)
|
||
import json as json_module
|
||
create_params['cookie'] = json_module.dumps(adspower_cookies, ensure_ascii=False)
|
||
logger.info(f"[指纹浏览器] Cookie已转换为AdsPower格式(JSON字符串)")
|
||
|
||
session = get_local_session()
|
||
response = session.post(
|
||
f"{self.api_base}/api/v2/browser-profile/create", # 使用v2接口
|
||
json=create_params,
|
||
headers=self._get_headers(),
|
||
timeout=30
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get('code') == 0:
|
||
profile_id = data.get('data', {}).get('profile_id') # v2返回profile_id
|
||
if cookies:
|
||
logger.success(f"[指纹浏览器] 创建配置成功(已注入Cookie): {profile_id}")
|
||
else:
|
||
logger.success(f"[指纹浏览器] 创建配置成功: {profile_id}")
|
||
return profile_id
|
||
|
||
logger.warning(f"[指纹浏览器] 创建配置失败: {response.text}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"[指纹浏览器] 创建配置异常: {e}")
|
||
return None
|
||
|
||
async def update_browser_proxy(self, profile_id: str, proxy_config: dict) -> bool:
|
||
"""
|
||
更新指定配置的代理IP(使用AdsPower API动态更新)
|
||
|
||
Args:
|
||
profile_id: 配置文件ID
|
||
proxy_config: 代理配置 {'server': 'http://ip:port', 'username': '...', 'password': '...'}
|
||
|
||
Returns:
|
||
bool: 是否更新成功
|
||
"""
|
||
try:
|
||
import json as json_module
|
||
|
||
if not proxy_config:
|
||
logger.warning("[指纹浏览器] 没有代理配置,跳过更新")
|
||
return False
|
||
|
||
# 解析代理服务器地址
|
||
server = proxy_config.get('server', '')
|
||
if server.startswith('http://'):
|
||
server = server[7:]
|
||
elif server.startswith('https://'):
|
||
server = server[8:]
|
||
|
||
parts = server.split(':')
|
||
if len(parts) != 2:
|
||
logger.warning(f"[指纹浏览器] 代理地址格式错误: {server}")
|
||
return False
|
||
|
||
proxy_host = parts[0]
|
||
proxy_port = int(parts[1]) # 端口必须是整数
|
||
|
||
logger.info(f"[指纹浏览器] 更新代理配置: {proxy_host}:{proxy_port}")
|
||
|
||
session = get_local_session()
|
||
|
||
# 注意:不再清除旧代理配置,直接覆盖更新
|
||
# 因为清除后再设置可能导致配置不一致
|
||
|
||
# 第二步:设置新的代理配置
|
||
# 检查是否有认证信息(白名单模式不需要认证)
|
||
proxy_user = proxy_config.get('username', '')
|
||
proxy_password = proxy_config.get('password', '')
|
||
|
||
user_proxy_config = {
|
||
'proxy_soft': 'other',
|
||
'proxy_type': 'http',
|
||
'proxy_host': proxy_host,
|
||
'proxy_port': proxy_port
|
||
}
|
||
|
||
# 只有在有认证信息时才添加用户名密码
|
||
if proxy_user and proxy_password:
|
||
user_proxy_config['proxy_user'] = proxy_user
|
||
user_proxy_config['proxy_password'] = proxy_password
|
||
logger.info(f"[指纹浏览器] 使用认证代理: {proxy_host}:{proxy_port}")
|
||
else:
|
||
logger.info(f"[指纹浏览器] 使用白名单代理(无认证): {proxy_host}:{proxy_port}")
|
||
|
||
update_params = {
|
||
'user_id': profile_id,
|
||
'user_proxy_config': user_proxy_config
|
||
}
|
||
|
||
# 打印完整的请求参数用于调试
|
||
logger.info("\n" + "="*70)
|
||
logger.info("[AdsPower API] 更新代理配置")
|
||
logger.info("="*70)
|
||
logger.info(f"URL: {self.api_base}/api/v1/user/update")
|
||
logger.info(f"Method: POST")
|
||
logger.info(f"Headers:")
|
||
headers = self._get_headers()
|
||
for k, v in headers.items():
|
||
if k == 'Authorization' and v:
|
||
logger.info(f" {k}: Bearer {v.split()[-1][:8]}...")
|
||
else:
|
||
logger.info(f" {k}: {v}")
|
||
logger.info("Request Body:")
|
||
logger.info(json_module.dumps(update_params, indent=2, ensure_ascii=False))
|
||
|
||
response = session.post(
|
||
f"{self.api_base}/api/v1/user/update",
|
||
json=update_params,
|
||
headers=headers,
|
||
timeout=30
|
||
)
|
||
|
||
# 打印完整的响应用于调试
|
||
logger.info("\n" + "-"*70)
|
||
logger.info("[API响应]")
|
||
logger.info("-"*70)
|
||
logger.info(f"Status Code: {response.status_code}")
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
logger.info("Response Body:")
|
||
logger.info(json_module.dumps(data, indent=2, ensure_ascii=False))
|
||
logger.info("="*70 + "\n")
|
||
|
||
if data.get('code') == 0:
|
||
logger.info(f"[指纹浏览器] 代理配置API返回成功: {profile_id}")
|
||
|
||
# 验证代理是否真正写入
|
||
await asyncio.sleep(0.5) # 等待配置生效
|
||
verify_config = await self.query_profile_proxy(profile_id)
|
||
actual_host = verify_config.get('proxy_host', '')
|
||
actual_port = verify_config.get('proxy_port', '')
|
||
|
||
if actual_host == proxy_host and str(actual_port) == str(proxy_port):
|
||
logger.info(f"[指纹浏览器] >> 代理配置验证通过: {actual_host}:{actual_port}")
|
||
return True
|
||
else:
|
||
logger.warning(f"[指纹浏览器] !! 代理配置验证失败! 期望: {proxy_host}:{proxy_port}, 实际: {actual_host}:{actual_port}")
|
||
logger.warning(f"[指纹浏览器] 完整配置: {verify_config}")
|
||
return False
|
||
else:
|
||
logger.warning(f"[指纹浏览器] 更新代理失败: {data.get('msg', '未知错误')}")
|
||
return False
|
||
else:
|
||
logger.info(f"Response Body (Raw): {response.text}")
|
||
logger.info("="*70 + "\n")
|
||
|
||
logger.warning(f"[指纹浏览器] 更新代理请求失败: {response.text}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"[指纹浏览器] 更新代理异常: {e}")
|
||
return False
|
||
|
||
async def start_browser(self, profile_id: str, proxy_config: dict = None) -> Optional[str]:
|
||
"""
|
||
启动指定配置的浏览器,返回 CDP 调试地址
|
||
|
||
Args:
|
||
profile_id: 配置文件ID
|
||
proxy_config: 可选的代理配置,在启动前更新
|
||
|
||
Returns:
|
||
str: CDP WebSocket URL,失败返回None
|
||
"""
|
||
try:
|
||
logger.info("\n" + "="*70)
|
||
logger.info("[指纹浏览器] 启动浏览器")
|
||
logger.info("="*70)
|
||
logger.info(f"配置ID: {profile_id}")
|
||
|
||
# 先停止可能正在运行的旧浏览器,避免状态混乱
|
||
logger.info(f"\n步骤1: 检查并清理旧浏览器实例...")
|
||
logger.info(f" 尝试停止: {profile_id}")
|
||
await self.stop_browser(profile_id)
|
||
await asyncio.sleep(1) # 等待浏览器完全关闭
|
||
logger.success(">> 旧实例清理完成")
|
||
|
||
# 如果有代理配置,在启动前更新代理(关键:必须在stop之后、start之前)
|
||
if proxy_config:
|
||
logger.info(f"\n步骤2: 更新代理配置...")
|
||
logger.info(f" 代理服务器: {proxy_config.get('server', 'N/A')}")
|
||
if proxy_config.get('username') and proxy_config.get('password'):
|
||
logger.info(f" 认证模式: 是")
|
||
logger.info(f" 用户名: {proxy_config['username']}")
|
||
else:
|
||
logger.info(f" 认证模式: 否 (白名单)")
|
||
|
||
update_result = await self.update_browser_proxy(profile_id, proxy_config)
|
||
if update_result:
|
||
logger.success(">> 代理配置更新成功")
|
||
else:
|
||
logger.warning("!! 代理配置更新失败")
|
||
await asyncio.sleep(0.5) # 等待配置生效
|
||
else:
|
||
logger.info(f"\n步骤2: 跳过代理配置 (未提供代理)")
|
||
|
||
logger.info(f"\n步骤3: 调用AdsPower API启动浏览器...")
|
||
|
||
import json as json_module
|
||
|
||
logger.info("\n" + "="*70)
|
||
logger.info("[AdsPower API] 启动浏览器")
|
||
logger.info("="*70)
|
||
logger.info(f"URL: {self.api_base}/api/v1/browser/start")
|
||
logger.info(f"Method: GET")
|
||
logger.info(f"Params:")
|
||
logger.info(f" user_id: {profile_id}")
|
||
logger.info(f"Headers:")
|
||
headers = self._get_headers()
|
||
for k, v in headers.items():
|
||
if k == 'Authorization' and v:
|
||
logger.info(f" {k}: Bearer {v.split()[-1][:8]}...")
|
||
else:
|
||
logger.info(f" {k}: {v}")
|
||
logger.info(f"Timeout: 60s")
|
||
|
||
session = get_local_session()
|
||
response = session.get(
|
||
f"{self.api_base}/api/v1/browser/start",
|
||
params={'user_id': profile_id},
|
||
headers=headers,
|
||
timeout=60
|
||
)
|
||
|
||
logger.info("\n" + "-"*70)
|
||
logger.info("[API响应]")
|
||
logger.info("-"*70)
|
||
logger.info(f"Status Code: {response.status_code}")
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
logger.info("Response Body:")
|
||
logger.info(json_module.dumps(data, indent=2, ensure_ascii=False))
|
||
logger.info("="*70 + "\n")
|
||
|
||
if data.get('code') == 0:
|
||
ws_url = data.get('data', {}).get('ws', {}).get('puppeteer')
|
||
if ws_url:
|
||
logger.success(f">> 浏览器启动成功")
|
||
logger.success(f" CDP地址: {ws_url}")
|
||
logger.success(f" 配置ID: {profile_id}")
|
||
self.current_profile_id = profile_id
|
||
return ws_url
|
||
else:
|
||
logger.error("!! 响应中未CDP地址")
|
||
else:
|
||
logger.error(f"!! 启动失败: {data.get('msg', '未知错误')}")
|
||
else:
|
||
logger.error(f"!! HTTP请求失败: {response.status_code}")
|
||
logger.info(f"Response Body (Raw): {response.text}")
|
||
logger.info("="*70 + "\n")
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error("\n" + "="*70)
|
||
logger.error("!! [指纹浏览器] 启动浏览器异常")
|
||
logger.error("="*70)
|
||
logger.error(f"错误信息: {str(e)}")
|
||
logger.error("="*70 + "\n")
|
||
return None
|
||
|
||
async def stop_browser(self, profile_id: str = None) -> bool:
|
||
"""
|
||
停止指定配置的浏览器
|
||
|
||
Args:
|
||
profile_id: 配置文件ID,不传则使用当前配置
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
try:
|
||
pid = profile_id or self.current_profile_id
|
||
if not pid:
|
||
logger.warning("[指纹浏览器] 没有需要停止的浏览器")
|
||
return False
|
||
|
||
session = get_local_session()
|
||
response = session.get(
|
||
f"{self.api_base}/api/v1/browser/stop",
|
||
params={'user_id': pid},
|
||
headers=self._get_headers(),
|
||
timeout=10
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get('code') == 0:
|
||
logger.info(f"[指纹浏览器] 浏览器已停止: {pid}")
|
||
if pid == self.current_profile_id:
|
||
self.current_profile_id = None
|
||
return True
|
||
|
||
logger.warning(f"[指纹浏览器] 停止浏览器失败: {response.text}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"[指纹浏览器] 停止浏览器异常: {e}")
|
||
return False
|
||
|
||
async def delete_profile(self, profile_id: str) -> bool:
|
||
"""
|
||
删除指定的浏览器配置(使用API v2)
|
||
|
||
Args:
|
||
profile_id: 配置文件ID
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
try:
|
||
logger.info(f"[指纹浏览器] 开始删除配置Profile: {profile_id}")
|
||
|
||
session = get_local_session()
|
||
response = session.post(
|
||
f"{self.api_base}/api/v2/browser-profile/delete",
|
||
json={"profile_id": [profile_id]},
|
||
headers=self._get_headers(),
|
||
timeout=10
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get('code') == 0:
|
||
logger.success(f"[指纹浏览器] 成功删除Profile: {profile_id}")
|
||
return True
|
||
else:
|
||
logger.error(f"[指纹浏览器] 删除Profile失败: {data.get('msg')}")
|
||
return False
|
||
else:
|
||
logger.error(f"[指纹浏览器] 删除Profile HTTP请求失败: {response.status_code}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"[指纹浏览器] 删除Profile异常: {str(e)}")
|
||
return False
|
||
|
||
async def get_profile_proxy_id(self, profile_id: str) -> Optional[str]:
|
||
"""
|
||
获取Profile关联的代理ID(如果使用了API v2代理池)
|
||
|
||
Args:
|
||
profile_id: 配置文件ID
|
||
|
||
Returns:
|
||
str: 代理ID,如果没有或使用的是直接配置则返回None
|
||
"""
|
||
try:
|
||
logger.info(f"[指纹浏览器] 查询Profile的代理ID: {profile_id}")
|
||
|
||
session = get_local_session()
|
||
response = session.get(
|
||
f"{self.api_base}/api/v2/browser-profile/detail",
|
||
params={'profile_id': profile_id},
|
||
headers=self._get_headers(),
|
||
timeout=10
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get('code') == 0:
|
||
# 检查是否使用了代理池中的代理
|
||
proxy_id = data.get('data', {}).get('user_proxy_config', {}).get('proxy_id')
|
||
if proxy_id:
|
||
logger.info(f"[指纹浏览器] Profile使用了代理池代理ID: {proxy_id}")
|
||
return proxy_id
|
||
else:
|
||
logger.info(f"[指纹浏览器] Profile使用直接配置的代理,无需删除代理池记录")
|
||
return None
|
||
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"[指纹浏览器] 查询Profile代理ID异常: {str(e)}")
|
||
return None
|
||
|
||
async def delete_proxy(self, proxy_id: str) -> bool:
|
||
"""
|
||
删除AdsPower代理池中的代理(使用API v2)
|
||
|
||
Args:
|
||
proxy_id: 代理ID
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
try:
|
||
logger.info(f"[指纹浏览器] 开始删除代理: {proxy_id}")
|
||
|
||
session = get_local_session()
|
||
response = session.post(
|
||
f"{self.api_base}/api/v2/proxy-list/delete",
|
||
json={"proxy_id": [proxy_id]},
|
||
headers=self._get_headers(),
|
||
timeout=10
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get('code') == 0:
|
||
logger.success(f"[指纹浏览器] 成功删除代理: {proxy_id}")
|
||
return True
|
||
else:
|
||
logger.error(f"[指纹浏览器] 删除代理失败: {data.get('msg')}")
|
||
return False
|
||
else:
|
||
logger.error(f"[指纹浏览器] 删除代理HTTP请求失败: {response.status_code}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"[指纹浏览器] 删除代理异常: {str(e)}")
|
||
return False
|
||
|
||
async def connect_browser(self, cdp_url: str) -> Tuple[Optional[Browser], Optional[BrowserContext], Optional[Page]]:
|
||
"""
|
||
通过 CDP 连接到指纹浏览器
|
||
|
||
Args:
|
||
cdp_url: CDP WebSocket URL
|
||
|
||
Returns:
|
||
Tuple[Browser, BrowserContext, Page]: 浏览器、上下文、页面对象
|
||
"""
|
||
try:
|
||
logger.info("\n" + "="*70)
|
||
logger.info("[指纹浏览器] 通过CDP连接浏览器")
|
||
logger.info("="*70)
|
||
logger.info(f"CDP地址: {cdp_url}")
|
||
|
||
logger.info("\n步骤1: 启动Playwright...")
|
||
self.playwright = await async_playwright().start()
|
||
logger.success(">> Playwright启动成功")
|
||
|
||
logger.info("\n步骤2: 连接到指纹浏览器...")
|
||
logger.info(f" 使用协议: Chromium CDP")
|
||
logger.info(f" WebSocket URL: {cdp_url}")
|
||
|
||
# 连接到指纹浏览器
|
||
browser = await self.playwright.chromium.connect_over_cdp(cdp_url)
|
||
logger.success(">> 浏览器连接成功")
|
||
|
||
logger.info("\n步骤3: 获取浏览器上下文...")
|
||
# 获取上下文(指纹浏览器通常只有一个context)
|
||
contexts = browser.contexts
|
||
logger.info(f" 发现上下文数: {len(contexts)}")
|
||
|
||
if contexts:
|
||
context = contexts[0]
|
||
logger.success(f">> 使用现有上下文: Context[0]")
|
||
else:
|
||
logger.info(" 未找到现有上下文,创建新上下文...")
|
||
context = await browser.new_context()
|
||
logger.success(">> 新上下文创建成功")
|
||
|
||
logger.info("\n步骤4: 获取或创建页面...")
|
||
# 获取或创建页面
|
||
pages = context.pages
|
||
logger.info(f" 发现页面数: {len(pages)}")
|
||
|
||
if pages:
|
||
page = pages[0]
|
||
logger.success(f">> 使用现有页面: Page[0]")
|
||
logger.info(f" 当前URL: {page.url}")
|
||
else:
|
||
logger.info(" 未找到现有页面,创建新页面...")
|
||
page = await context.new_page()
|
||
logger.success(">> 新页面创建成功")
|
||
|
||
self.current_browser = browser
|
||
self.current_context = context
|
||
self.current_page = page
|
||
|
||
logger.info("\n" + "-"*70)
|
||
logger.info("[连接完成摘要]")
|
||
logger.info("-"*70)
|
||
logger.success(f">> 浏览器实例: {type(browser).__name__}")
|
||
logger.success(f">> 上下文数: {len(browser.contexts)}")
|
||
logger.success(f">> 页面数: {len(context.pages)}")
|
||
logger.success(f">> 当前页面URL: {page.url}")
|
||
logger.info("="*70 + "\n")
|
||
|
||
return browser, context, page
|
||
|
||
except Exception as e:
|
||
logger.error("\n" + "="*70)
|
||
logger.error("!! [指纹浏览器] CDP连接失败")
|
||
logger.error("="*70)
|
||
logger.error(f"错误信息: {str(e)}")
|
||
logger.error("提示: 请检查CDP地址是否有效")
|
||
logger.error("="*70 + "\n")
|
||
return None, None, None
|
||
|
||
async def disconnect(self):
|
||
"""断开浏览器连接(不关闭浏览器)"""
|
||
try:
|
||
if self.current_browser:
|
||
# 注意:connect_over_cdp 模式下不要 close,只 disconnect
|
||
await self.current_browser.close()
|
||
self.current_browser = None
|
||
self.current_context = None
|
||
self.current_page = None
|
||
logger.info("[指纹浏览器] 已断开连接")
|
||
|
||
if self.playwright:
|
||
await self.playwright.stop()
|
||
self.playwright = None
|
||
except Exception as e:
|
||
logger.error(f"[指纹浏览器] 断开连接异常: {e}")
|
||
|
||
async def get_all_profiles(self) -> list:
|
||
"""
|
||
获取所有可用的浏览器配置文件,随机排序
|
||
|
||
Returns:
|
||
list: 配置文件ID列表
|
||
"""
|
||
profiles = await self.get_browser_profiles()
|
||
profile_ids = []
|
||
|
||
for profile in profiles:
|
||
user_id = profile.get('user_id', '')
|
||
name = profile.get('name', '')
|
||
if user_id:
|
||
profile_ids.append({'id': user_id, 'name': name})
|
||
|
||
# 随机打乱顺序,支持多配置轮换使用
|
||
random.shuffle(profile_ids)
|
||
logger.info(f"[指纹浏览器] 获取到 {len(profile_ids)} 个配置,已随机排序")
|
||
return profile_ids
|
||
|
||
async def get_or_create_profile(self, proxy_config: dict = None, phone: str = None, force_create: bool = False, cookies: list = None) -> Optional[str]:
|
||
"""
|
||
获取或创建浏览器配置文件
|
||
如果提供了phone参数:
|
||
1. 先查找是否已存在同名配置
|
||
2. 如果存在,直接返回该配置ID(会在start_browser中更新代理)
|
||
3. 如果不存在,创建新配置
|
||
否则随机选择已有配置
|
||
|
||
Args:
|
||
proxy_config: 代理配置
|
||
phone: 手机号(用作配置名称)
|
||
force_create: 强制创建新配置(用于临时发布环境)
|
||
cookies: Cookie列表,创建时直接注入
|
||
|
||
Returns:
|
||
str: 配置文件ID
|
||
"""
|
||
# 如果强制创建,直接创建临时配置
|
||
if force_create:
|
||
logger.info("[指纹浏览器] 强制创建临时发布环境...")
|
||
profile_name = f"XHS_TEMP_{int(time.time())}"
|
||
profile_id = await self.create_browser_profile(
|
||
name=profile_name,
|
||
proxy_config=proxy_config,
|
||
cookies=cookies # 传递cookies
|
||
)
|
||
if profile_id:
|
||
logger.success(f"[指纹浏览器] 临时环境创建成功: {profile_id} ({profile_name})")
|
||
return profile_id
|
||
else:
|
||
logger.error("[指纹浏览器] 临时环境创建失败")
|
||
return None
|
||
|
||
# 如果提供了手机号
|
||
if phone:
|
||
profile_name = f"XHS_{phone}"
|
||
logger.info(f"[指纹浏览器] 处理手机号 {phone} 的配置...")
|
||
|
||
# 先查找是否已存在同名配置
|
||
existing_profile_id = await self.find_profile_by_name(profile_name)
|
||
|
||
if existing_profile_id:
|
||
logger.success(f"[指纹浏览器] 复用现有配置: {existing_profile_id} ({profile_name})")
|
||
logger.info(" 注意:代理会在start_browser中更新")
|
||
return existing_profile_id
|
||
|
||
# 不存在,创建新配置
|
||
logger.info(f"[指纹浏览器] 未找到现有配置,为手机号 {phone} 创建新配置...")
|
||
profile_id = await self.create_browser_profile(
|
||
name=profile_name,
|
||
proxy_config=proxy_config,
|
||
cookies=cookies # 传递cookies
|
||
)
|
||
if profile_id:
|
||
logger.success(f"[指纹浏览器] 创建配置成功: {profile_id} ({profile_name})")
|
||
return profile_id
|
||
else:
|
||
logger.error("[指纹浏览器] 创建配置失败")
|
||
return None
|
||
|
||
# 没有phone参数,使用旧逻辑:随机选择配置
|
||
# 获取所有配置(已随机排序)
|
||
profiles = await self.get_all_profiles()
|
||
|
||
if profiles:
|
||
# 返回第一个(随机选择的)
|
||
profile = profiles[0]
|
||
logger.info(f"[指纹浏览器] 随机选择配置: {profile['id']} ({profile['name']})")
|
||
return profile['id']
|
||
|
||
# 没有可用配置,创建新的
|
||
logger.info("[指纹浏览器] 没有可用配置,创建新配置...")
|
||
return await self.create_browser_profile(
|
||
proxy_config=proxy_config,
|
||
cookies=cookies # 传递cookies
|
||
)
|
||
|
||
async def get_profile_cookies(self, profile_id: str) -> Optional[list]:
|
||
"""
|
||
查询指定配置的Cookie
|
||
|
||
Args:
|
||
profile_id: 配置文件ID
|
||
|
||
Returns:
|
||
list: Cookie列表 (Playwright完整格式):
|
||
[
|
||
{
|
||
"name": "cookie_name",
|
||
"value": "cookie_value",
|
||
"domain": ".xiaohongshu.com",
|
||
"path": "/",
|
||
"httpOnly": false,
|
||
"secure": true,
|
||
"sameSite": "Lax",
|
||
"expires": 1234567890
|
||
}
|
||
]
|
||
"""
|
||
try:
|
||
logger.info(f"[查询Cookie] 开始查询配置ID: {profile_id}")
|
||
|
||
session = get_local_session()
|
||
response = session.get(
|
||
f"{self.api_base}/api/v2/browser-profile/cookies",
|
||
params={'profile_id': profile_id},
|
||
headers=self._get_headers(),
|
||
timeout=10
|
||
)
|
||
|
||
logger.info(f"[查询Cookie] API响应状态: {response.status_code}")
|
||
logger.info(f"[查询Cookie] 响应内容: {response.text[:500]}")
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
logger.info(f"[查询Cookie] JSON解析结果: code={data.get('code')}, msg={data.get('msg')}")
|
||
|
||
if data.get('code') == 0:
|
||
cookies_str = data.get('data', {}).get('cookies', '[]')
|
||
logger.info(f"[查询Cookie] cookies字符串长度: {len(cookies_str)}")
|
||
logger.info(f"[查询Cookie] cookies字符串前100字符: {cookies_str[:100]}")
|
||
|
||
# 如果是空字符串,返回空列表
|
||
if not cookies_str or cookies_str.strip() == '':
|
||
logger.warning("[查询Cookie] cookies字符串为空,返回空列表")
|
||
return []
|
||
|
||
# 解析JSON字符串
|
||
import json
|
||
cookies = json.loads(cookies_str)
|
||
|
||
logger.success(f"[查询Cookie] 成功获取 {len(cookies)} 个Cookie")
|
||
return cookies
|
||
else:
|
||
logger.error(f"[查询Cookie] API返回错误: {data.get('msg', 'unknown')}")
|
||
return None
|
||
else:
|
||
logger.error(f"[查询Cookie] HTTP错误: {response.status_code}")
|
||
logger.error(f"[查询Cookie] 响应内容: {response.text}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"[查询Cookie] 异常: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
|
||
async def human_type(page: Page, selector: str, text: str, clear_first: bool = True):
|
||
"""
|
||
模拟人类打字速度输入文本
|
||
|
||
Args:
|
||
page: Playwright Page 对象
|
||
selector: 输入框选择器
|
||
text: 要输入的文本
|
||
clear_first: 是否先清空输入框
|
||
"""
|
||
try:
|
||
# 聚焦输入框
|
||
await page.focus(selector)
|
||
|
||
# 先清空
|
||
if clear_first:
|
||
await page.fill(selector, '')
|
||
await asyncio.sleep(random.uniform(0.1, 0.3))
|
||
|
||
# 模拟人类打字
|
||
for char in text:
|
||
await page.keyboard.type(char)
|
||
# 随机延迟 50ms - 150ms
|
||
await asyncio.sleep(random.uniform(0.05, 0.15))
|
||
|
||
logger.info(f"[人类输入] 已输入 {len(text)} 个字符")
|
||
except Exception as e:
|
||
logger.error(f"[人类输入] 输入失败: {e}")
|
||
raise
|
||
|
||
|
||
async def human_click(page: Page, selector: str, wait_after: float = 0.5):
|
||
"""
|
||
模拟人类点击行为
|
||
|
||
Args:
|
||
page: Playwright Page 对象
|
||
selector: 元素选择器
|
||
wait_after: 点击后等待时间
|
||
"""
|
||
try:
|
||
# 先移动到元素位置
|
||
element = await page.query_selector(selector)
|
||
if element:
|
||
box = await element.bounding_box()
|
||
if box:
|
||
# 在元素范围内随机一个点击位置
|
||
x = box['x'] + random.uniform(box['width'] * 0.3, box['width'] * 0.7)
|
||
y = box['y'] + random.uniform(box['height'] * 0.3, box['height'] * 0.7)
|
||
|
||
# 移动鼠标
|
||
await page.mouse.move(x, y)
|
||
await asyncio.sleep(random.uniform(0.1, 0.3))
|
||
|
||
# 点击
|
||
await page.mouse.click(x, y)
|
||
logger.info(f"[人类点击] 点击位置: ({x:.0f}, {y:.0f})")
|
||
else:
|
||
await page.click(selector)
|
||
else:
|
||
await page.click(selector)
|
||
|
||
await asyncio.sleep(wait_after)
|
||
except Exception as e:
|
||
logger.error(f"[人类点击] 点击失败: {e}")
|
||
raise
|
||
|
||
|
||
# 全局单例
|
||
_fingerprint_manager = None
|
||
|
||
def get_fingerprint_manager() -> FingerprintBrowserManager:
|
||
"""获取指纹浏览器管理器单例"""
|
||
global _fingerprint_manager
|
||
if _fingerprint_manager is None:
|
||
_fingerprint_manager = FingerprintBrowserManager()
|
||
return _fingerprint_manager
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 测试代码
|
||
async def test():
|
||
manager = get_fingerprint_manager()
|
||
|
||
# 检查 AdsPower 状态
|
||
if await manager.check_adspower_status():
|
||
print("AdsPower 运行正常")
|
||
|
||
# 使用默认配置(不使用代理)
|
||
proxy = None # 或者手动指定: {'server': 'http://ip:port', 'username': 'user', 'password': 'pass'}
|
||
print(f"代理IP: {proxy or '未配置'}")
|
||
|
||
# 获取或创建配置
|
||
profile_id = await manager.get_or_create_profile(proxy_config=proxy)
|
||
if profile_id:
|
||
print(f"配置ID: {profile_id}")
|
||
|
||
# 启动浏览器
|
||
cdp_url = await manager.start_browser(profile_id)
|
||
if cdp_url:
|
||
print(f"CDP URL: {cdp_url}")
|
||
|
||
# 连接浏览器
|
||
browser, context, page = await manager.connect_browser(cdp_url)
|
||
if page:
|
||
# 访问测试页面
|
||
await page.goto("https://httpbin.org/ip")
|
||
content = await page.content()
|
||
print(f"页面内容: {content[:200]}")
|
||
|
||
# 断开连接
|
||
await manager.disconnect()
|
||
|
||
# 停止浏览器
|
||
await manager.stop_browser(profile_id)
|
||
else:
|
||
print("AdsPower 未运行")
|
||
|
||
asyncio.run(test())
|