Files
ai_mip/adspower_client.py
2026-02-24 12:46:35 +08:00

1507 lines
56 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
from typing import Dict, Optional
from loguru import logger
from playwright.sync_api import sync_playwright, Browser, Page
from config import Config
class AdsPowerClient:
"""AdsPower API客户端 - 集成 Playwright CDP
参考官方示例: localAPI-main/py-examples/example-start-profile.py
"""
# 大麦IP代理配置
DAMAI_API_URL = "https://api2.damaiip.com/index.php?s=/front/user/getIPlist&xsn=2912cb2b22d3b7ae724f045012790479&osn=TC_NO176707424165606223&tiqu=1"
DAMAI_USER = "69538fdef04e1"
DAMAI_PASSWORD = "63v0kQBr2yJXnjf"
def __init__(self, api_url: str = None, user_id: str = None, api_key: str = None):
self.api_url = api_url or Config.ADSPOWER_API_URL
self.user_id = user_id or Config.ADSPOWER_USER_ID
self.api_key = api_key or Config.ADSPOWER_API_KEY
self.playwright = None
self.browser = None
# 调试信息
if self.api_key:
logger.debug(f"AdsPowerClient 初始化 - API Key: {self.api_key[:8]}... (len={len(self.api_key)})")
else:
logger.debug("AdsPowerClient 初始化 - 未配置 API Key")
def _make_request(self, method: str, endpoint: str, json: Dict = None, params: Dict = None) -> Optional[Dict]:
"""
通用 API 请求方法
Args:
method: HTTP方法 (GET/POST/PUT/DELETE)
endpoint: API端点'/api/v2/browser-profile/create'
json: JSON请求体
params: URL查询参数
Returns:
响应JSON或None
"""
try:
import json as json_lib # 避免参数名冲突
url = f"{self.api_url}{endpoint}"
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 记录请求
logger.info("\n" + "="*70)
logger.info(f"API请求: {method} {endpoint}")
logger.info("="*70)
logger.info(f"URL: {url}")
if params:
logger.info(f"Params:\n{json_lib.dumps(params, indent=2, ensure_ascii=False)}")
if json:
logger.info(f"Body:\n{json_lib.dumps(json, indent=2, ensure_ascii=False)}")
response = requests.request(
method=method,
url=url,
json=json,
params=params,
headers=headers,
timeout=30
)
# 记录响应
logger.info("\n" + "-"*70)
logger.info("API响应")
logger.info("-"*70)
logger.info(f"Status: {response.status_code}")
try:
response_data = response.json()
logger.info(f"Body:\n{json_lib.dumps(response_data, indent=2, ensure_ascii=False)}")
except:
logger.info(f"Body (Raw):\n{response.text}")
response_data = None
logger.info("="*70 + "\n")
return response_data
except Exception as e:
logger.error(f"API请求异常 [{method} {endpoint}]: {str(e)}")
import traceback
traceback.print_exc()
return None
def update_profile_proxy(self, profile_id: str, proxy_id: str) -> bool:
"""
更新 Profile 的代理配置
使用 AdsPower API v2
Args:
profile_id: Profile ID
proxy_id: 代理ID
Returns:
是否成功
"""
try:
url = f"{self.api_url}/api/v2/browser-profile/update"
# 准备请求头
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 准备请求体
payload = {
"profile_id": profile_id,
"proxy_id": proxy_id
}
logger.info("\n" + "="*70)
logger.info("更新 Profile 代理")
logger.info("="*70)
logger.info(f"URL: {url}")
logger.info(f"Method: POST")
logger.info(f"Payload:")
logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
response = requests.post(url, json=payload, headers=headers, timeout=30)
logger.info("\n" + "-"*70)
logger.info("响应信息")
logger.info("-"*70)
logger.info(f"Status Code: {response.status_code}")
try:
response_json = response.json()
logger.info("Response Body:")
logger.info(json.dumps(response_json, indent=2, ensure_ascii=False))
except:
logger.info(f"Response Body (Raw):")
logger.info(response.text)
logger.info("="*70 + "\n")
result = response_json if 'response_json' in locals() else response.json()
if result.get('code') == 0:
logger.success(f"成功更新 Profile 代理: {profile_id} -> Proxy {proxy_id}")
# 验证代理是否真正更新
import time
time.sleep(1) # 等待配置生效
verify_result = self.get_profile_info(profile_id)
if verify_result:
actual_proxy_id = verify_result.get('data', {}).get('proxy_id', '')
logger.info(f"验证代理配置 - 期望: {proxy_id}, 实际: {actual_proxy_id}")
if actual_proxy_id == proxy_id:
logger.success("✅ 代理配置验证通过")
return True
else:
logger.warning(f"❌ 代理配置验证失败 - Profile中的proxy_id不匹配")
return False
else:
logger.warning("无法验证代理配置但API返回成功")
return True
else:
logger.error(f"更新 Profile 代理失败: {result.get('msg')}")
return False
except Exception as e:
logger.error(f"更新 Profile 代理异常: {str(e)}")
return False
def get_profile_info(self, profile_id: str) -> Optional[Dict]:
"""
查询 Profile 详细信息
使用 AdsPower API v2
Args:
profile_id: Profile ID
Returns:
Profile 信息
"""
try:
url = f"{self.api_url}/api/v2/browser-profile/list"
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
payload = {
"profile_id": profile_id
}
response = requests.post(url, json=payload, headers=headers, timeout=30)
result = response.json()
if result.get('code') == 0:
profiles = result.get('data', {}).get('list', [])
if profiles:
return {'code': 0, 'data': profiles[0]}
return None
except Exception as e:
logger.error(f"查询 Profile 信息异常: {str(e)}")
return None
def update_profile_proxy_v1(self, profile_id: str, proxy_config: Dict) -> bool:
"""
使用 API v1 方式直接更新 Profile 的代理配置
传入完整的 user_proxy_config 对象
Args:
profile_id: Profile ID (user_id)
proxy_config: 代理配置字典,格式:
{
"proxy_type": "http",
"proxy_host": "112.83.100.233",
"proxy_port": "11059",
"proxy_user": "69538fdef04e1", # 可选
"proxy_password": "63v0kQBr2yJXnjf", # 可选
"proxy_soft": "other"
}
Returns:
是否成功
"""
try:
url = f"{self.api_url}/api/v1/user/update"
# 准备请求头
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 构建 user_proxy_config
proxy_port = proxy_config.get("proxy_port")
if isinstance(proxy_port, str):
proxy_port = int(proxy_port)
user_proxy_config = {
"proxy_soft": proxy_config.get("proxy_soft", "other"),
"proxy_type": proxy_config.get("proxy_type", "http"),
"proxy_host": proxy_config["proxy_host"],
"proxy_port": proxy_port # 使用整数类型
}
# 如果有认证信息,添加到配置中
if proxy_config.get("proxy_user") and proxy_config.get("proxy_password"):
user_proxy_config["proxy_user"] = proxy_config["proxy_user"]
user_proxy_config["proxy_password"] = proxy_config["proxy_password"]
logger.info("使用认证代理模式")
else:
logger.info("使用白名单代理模式(无认证)")
# 准备请求体
payload = {
"user_id": profile_id,
"user_proxy_config": user_proxy_config
}
logger.info("\n" + "="*70)
logger.info("更新 Profile 代理 (API v1)")
logger.info("="*70)
logger.info(f"URL: {url}")
logger.info(f"Method: POST")
logger.info(f"Headers:")
for k, v in headers.items():
if k == 'Authorization' and v:
logger.info(f" {k}: Bearer {v.split()[-1][:8]}...")
else:
logger.info(f" {k}: {v}")
logger.info(f"Payload:")
logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
response = requests.post(url, json=payload, headers=headers, timeout=30)
logger.info("\n" + "-"*70)
logger.info("响应信息")
logger.info("-"*70)
logger.info(f"Status Code: {response.status_code}")
try:
response_json = response.json()
logger.info("Response Body:")
logger.info(json.dumps(response_json, indent=2, ensure_ascii=False))
except:
logger.info(f"Response Body (Raw):")
logger.info(response.text)
logger.info("="*70 + "\n")
result = response_json if 'response_json' in locals() else response.json()
if result.get('code') == 0:
logger.success(f"成功更新 Profile 代理 (API v1): {profile_id}")
# 验证代理是否真正更新
import time
time.sleep(1) # 等待配置生效
verify_result = self.get_profile_info(profile_id)
if verify_result:
actual_proxy = verify_result.get('data', {}).get('user_proxy_config', {})
actual_host = actual_proxy.get('proxy_host', '')
actual_port = str(actual_proxy.get('proxy_port', ''))
expected_host = proxy_config['proxy_host']
expected_port = str(proxy_port)
logger.info(f"验证代理配置:")
logger.info(f" 期望: {expected_host}:{expected_port}")
logger.info(f" 实际: {actual_host}:{actual_port}")
if actual_host == expected_host and actual_port == expected_port:
logger.success("✅ 代理配置验证通过")
return True
else:
logger.warning(f"❌ 代理配置验证失败 - 地址不匹配")
return False
else:
logger.warning("无法验证代理配置但API返回成功")
return True
else:
logger.error(f"更新 Profile 代理失败 (API v1): {result.get('msg')}")
return False
except Exception as e:
logger.error(f"更新 Profile 代理异常 (API v1): {str(e)}")
return False
def update_profile_proxy_v2_direct(self, profile_id: str, proxy_config: Dict) -> bool:
"""
使用 API v2 方式直接传入 user_proxy_config 更新代理
根据官方文档API v2 支持 proxy_id 和 user_proxy_config 两种方式
Args:
profile_id: Profile ID
proxy_config: 代理配置字典,格式:
{
"proxy_type": "http",
"proxy_host": "112.83.100.233",
"proxy_port": "11059", # 可以是字符串或整数
"proxy_user": "69538fdef04e1", # 可选
"proxy_password": "63v0kQBr2yJXnjf", # 可选
"proxy_soft": "other"
}
Returns:
是否成功
"""
try:
url = f"{self.api_url}/api/v2/browser-profile/update"
# 准备请求头
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 解析端口(确保是整数类型)
proxy_port = proxy_config.get("proxy_port")
if isinstance(proxy_port, str):
proxy_port = int(proxy_port)
# 构建 user_proxy_config
user_proxy_config = {
"proxy_soft": proxy_config.get("proxy_soft", "other"),
"proxy_type": proxy_config.get("proxy_type", "http"),
"proxy_host": proxy_config["proxy_host"],
"proxy_port": proxy_port # 使用整数类型!
}
# 如果有认证信息,添加到配置中
if proxy_config.get("proxy_user") and proxy_config.get("proxy_password"):
user_proxy_config["proxy_user"] = proxy_config["proxy_user"]
user_proxy_config["proxy_password"] = proxy_config["proxy_password"]
logger.info("使用认证代理模式")
else:
logger.info("使用白名单代理模式(无认证)")
# 准备请求体注意API v2 使用 profile_id 而不是 user_id
payload = {
"profile_id": profile_id,
"user_proxy_config": user_proxy_config
}
logger.info("\n" + "="*70)
logger.info("更新 Profile 代理 (API v2 - Direct user_proxy_config)")
logger.info("="*70)
logger.info(f"URL: {url}")
logger.info(f"Method: POST")
logger.info(f"Headers:")
for k, v in headers.items():
if k == 'Authorization' and v:
logger.info(f" {k}: Bearer {v.split()[-1][:8]}...")
else:
logger.info(f" {k}: {v}")
logger.info(f"Payload:")
logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
logger.info(f"注意: proxy_port类型为 {type(user_proxy_config['proxy_port']).__name__}")
response = requests.post(url, json=payload, headers=headers, timeout=30)
logger.info("\n" + "-"*70)
logger.info("响应信息")
logger.info("-"*70)
logger.info(f"Status Code: {response.status_code}")
try:
response_json = response.json()
logger.info("Response Body:")
logger.info(json.dumps(response_json, indent=2, ensure_ascii=False))
except:
logger.info(f"Response Body (Raw):")
logger.info(response.text)
logger.info("="*70 + "\n")
result = response_json if 'response_json' in locals() else response.json()
if result.get('code') == 0:
logger.success(f"成功更新 Profile 代理 (API v2 Direct): {profile_id}")
# 验证代理是否真正更新
import time
time.sleep(1)
verify_result = self.get_profile_info(profile_id)
if verify_result:
actual_proxy = verify_result.get('data', {}).get('user_proxy_config', {})
actual_host = actual_proxy.get('proxy_host', '')
actual_port = str(actual_proxy.get('proxy_port', ''))
expected_host = proxy_config['proxy_host']
expected_port = str(proxy_port)
logger.info(f"验证代理配置:")
logger.info(f" 期望: {expected_host}:{expected_port}")
logger.info(f" 实际: {actual_host}:{actual_port}")
if actual_host == expected_host and actual_port == expected_port:
logger.success("✅ 代理配置验证通过")
return True
else:
logger.warning(f"❌ 代理配置验证失败 - 地址不匹配")
return False
else:
logger.warning("无法验证代理配置但API返回成功")
return True
else:
logger.error(f"更新 Profile 代理失败 (API v2 Direct): {result.get('msg')}")
return False
except Exception as e:
logger.error(f"更新 Profile 代理异常 (API v2 Direct): {str(e)}")
return False
def setup_proxy_and_start(self, user_id: str = None, use_proxy: bool = True) -> Optional[Dict]:
"""
设置代理并启动浏览器
Args:
user_id: AdsPower用户ID
use_proxy: 是否使用代理
Returns:
浏览器信息,如果使用代理会包含 proxy_id 字段
"""
target_user_id = user_id or self.user_id
proxy_id = None
if use_proxy:
# 1. 获取大麦IP代理
proxy_info = self.get_damai_proxy()
if not proxy_info:
logger.warning("获取代理失败,将不使用代理启动浏览器")
return self.start_browser(user_id=target_user_id)
# 2. 创建 AdsPower 代理
proxy_config = {
"type": "http",
"host": proxy_info["host"],
"port": proxy_info["port"],
"user": self.DAMAI_USER,
"password": self.DAMAI_PASSWORD,
"ipchecker": "ip2location",
"remark": "Damai Auto Proxy"
}
proxy_id = self.create_proxy(proxy_config)
if not proxy_id:
logger.warning("创建代理失败,将不使用代理启动浏览器")
return self.start_browser(user_id=target_user_id)
# 3. 更新 Profile 使用新代理
if not self.update_profile_proxy(target_user_id, proxy_id):
logger.warning("更新 Profile 代理失败,将不使用代理启动浏览器")
# 删除刚创建的代理
self.delete_proxy(proxy_id)
return self.start_browser(user_id=target_user_id)
# 4. 启动浏览器
result = self.start_browser(user_id=target_user_id)
# 5. 如果启动成功且有代理,在返回结果中添加 proxy_id
if result and proxy_id:
result['proxy_id'] = proxy_id
return result
def start_browser(self, user_id: str = None) -> Optional[Dict]:
"""
启动浏览器
使用 AdsPower API v2
Args:
user_id: AdsPower用户IDProfile ID
Returns:
包含浏览器连接信息的字典
"""
target_user_id = user_id or self.user_id
if not target_user_id:
logger.error("未设置 AdsPower User ID")
return None
try:
# 使用 AdsPower API v2
url = f"{self.api_url}/api/v2/browser-profile/start"
# 准备请求头
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 准备请求体
payload = {
"profile_id": target_user_id,
"launch_args": [
"--no-sandbox", # 禁用沙箱root用户必需
"--disable-setuid-sandbox", # 禁用setuid沙箱
"--disable-dev-shm-usage" # 避免/dev/shm空间不足
],
"headless": "0",
"last_opened_tabs": "1",
"proxy_detection": "1",
"password_filling": "0",
"password_saving": "0",
"cdp_mask": "1",
"delete_cache": "0",
"device_scale": "1"
}
# 打印请求信息
logger.info("\n" + "="*70)
logger.info("📤 启动浏览器请求 (API v2)")
logger.info("="*70)
logger.info(f"URL: {url}")
logger.info(f"Method: POST")
logger.info(f"Headers:")
for key, value in headers.items():
if key == 'Authorization' and 'Bearer' in value:
display_value = f"Bearer {value.split(' ')[1][:8]}..."
else:
display_value = value
logger.info(f" {key}: {display_value}")
logger.info(f"Payload:")
logger.info(f" {json.dumps(payload, indent=2, ensure_ascii=False)}")
response = requests.post(url, json=payload, headers=headers, timeout=30)
# 打印响应信息
logger.info("\n" + "-"*70)
logger.info("📥 响应信息")
logger.info("-"*70)
logger.info(f"Status Code: {response.status_code}")
logger.info(f"Headers:")
for key, value in response.headers.items():
logger.info(f" {key}: {value}")
# 格式化 JSON 响应
try:
response_json = response.json()
logger.info(f"Response Body:")
logger.info(json.dumps(response_json, indent=2, ensure_ascii=False))
except:
logger.info(f"Response Body (Raw):")
logger.info(response.text)
logger.info("="*70 + "\n")
result = response_json if 'response_json' in locals() else response.json()
if result.get('code') == 0:
logger.info(f"成功启动浏览器User ID: {target_user_id}")
# v2 API 的响应结构可能不同,需要根据实际返回调整
if 'data' in result and 'ws' in result['data']:
logger.info(f"Selenium: {result['data']['ws'].get('selenium', 'N/A')}")
logger.info(f"Puppeteer: {result['data']['ws'].get('puppeteer', 'N/A')}")
return result
else:
logger.error(f"启动浏览器失败: {result.get('msg')}")
return None
except Exception as e:
logger.error(f"启动浏览器异常: {str(e)}")
return None
def connect_browser(self, browser_info: Dict) -> Optional[Browser]:
"""
通过 CDP 连接到 AdsPower 浏览器
Args:
browser_info: start_browser 返回的浏览器信息
Returns:
Playwright Browser 实例
"""
try:
# 获取 CDP WebSocket 端点
ws_endpoint = browser_info['data']['ws']['puppeteer']
# 检查是否在 asyncio 事件循环中
import asyncio
try:
loop = asyncio.get_running_loop()
# 如果有运行中的循环,使用线程来执行同步代码
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(self._connect_browser_sync, ws_endpoint)
return future.result(timeout=30)
except RuntimeError:
# 没有运行中的循环,直接执行
return self._connect_browser_sync(ws_endpoint)
except Exception as e:
logger.error(f"CDP 连接失败: {str(e)}")
import traceback
traceback.print_exc()
return None
def _connect_browser_sync(self, ws_endpoint: str) -> Optional[Browser]:
"""同步执行浏览器连接"""
try:
# 创建新的 Playwright 实例
playwright = sync_playwright().start()
# 通过 CDP 连接到浏览器
browser = playwright.chromium.connect_over_cdp(ws_endpoint)
logger.info("成功通过 CDP 连接到 AdsPower 浏览器")
# 保存引用
self.playwright = playwright
self.browser = browser
return browser
except Exception as e:
logger.error(f"CDP 连接失败(sync): {str(e)}")
return None
def get_page(self, browser: Browser) -> Optional[Page]:
"""
获取或创建浏览器页面
Args:
browser: Browser 实例
Returns:
Page 实例
"""
try:
# 获取默认上下文
if browser.contexts:
context = browser.contexts[0]
else:
context = browser.new_context()
# 获取或创建页面
if context.pages:
page = context.pages[0]
else:
page = context.new_page()
logger.info("成功获取浏览器页面")
return page
except Exception as e:
logger.error(f"获取页面失败: {str(e)}")
return None
def stop_browser(self, user_id: str = None) -> bool:
"""
停止浏览器
使用 AdsPower API v2
Args:
user_id: AdsPower用户ID
Returns:
是否成功停止
"""
target_user_id = user_id or self.user_id
if not target_user_id:
logger.error("未设置 AdsPower User ID")
return False
# 清理 Playwright 资源
try:
if self.browser:
self.browser.close()
self.browser = None
if self.playwright:
self.playwright.stop()
self.playwright = None
except Exception as e:
logger.warning(f"清理 Playwright 资源异常: {str(e)}")
try:
# 使用 AdsPower API v2
url = f"{self.api_url}/api/v2/browser-profile/stop"
# 准备请求头
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 准备请求体
payload = {
"profile_id": target_user_id
}
logger.info("\n" + "="*70)
logger.info("🛑 停止浏览器请求 (API v2)")
logger.info("="*70)
logger.info(f"URL: {url}")
logger.info(f"Method: POST")
logger.info(f"Headers:")
for key, value in headers.items():
if key == 'Authorization' and 'Bearer' in value:
display_value = f"Bearer {value.split(' ')[1][:8]}..."
else:
display_value = value
logger.info(f" {key}: {display_value}")
logger.info(f"Payload:")
logger.info(f" {json.dumps(payload, indent=2, ensure_ascii=False)}")
response = requests.post(url, json=payload, headers=headers, timeout=30)
logger.info("\n" + "-"*70)
logger.info("📥 响应信息")
logger.info("-"*70)
logger.info(f"Status Code: {response.status_code}")
# 格式化 JSON 响应
try:
response_json = response.json()
logger.info(f"Response Body:")
logger.info(json.dumps(response_json, indent=2, ensure_ascii=False))
except:
logger.info(f"Response Body (Raw):")
logger.info(response.text)
logger.info("="*70 + "\n")
result = response_json if 'response_json' in locals() else response.json()
if result.get('code') == 0:
logger.info(f"成功停止浏览器User ID: {target_user_id}")
return True
else:
logger.error(f"停止浏览器失败: {result.get('msg')}")
return False
except Exception as e:
logger.error(f"停止浏览器异常: {str(e)}")
return False
def close_browser(self, profile_id: str = None) -> bool:
"""
关闭浏览器stop_browser的别名
Args:
profile_id: Profile ID
Returns:
是否成功关闭
"""
return self.stop_browser(user_id=profile_id)
def get_damai_proxy(self) -> Optional[Dict]:
"""
从大麦IP代理池获取代理
Returns:
包含host和port的字典失败返回None
"""
try:
logger.info("\n" + "="*70)
logger.info("获取大麦IP代理")
logger.info("="*70)
logger.info(f"URL: {self.DAMAI_API_URL}")
# 禁用代理直接连接大麦API
proxies = {
'http': None,
'https': None
}
response = requests.get(self.DAMAI_API_URL, proxies=proxies, timeout=10)
logger.info(f"Status Code: {response.status_code}")
logger.info(f"Response: {response.text}")
if response.status_code == 200 and response.text:
# 解析返回的IP:端口格式
proxy_str = response.text.strip()
if ':' in proxy_str:
host, port = proxy_str.split(':', 1)
logger.success(f"成功获取代理: {host}:{port}")
logger.info("="*70 + "\n")
return {
"host": host,
"port": port
}
else:
logger.error(f"代理格式错误: {proxy_str}")
return None
else:
logger.error(f"获取代理失败: {response.text}")
return None
except Exception as e:
logger.error(f"获取大麦代理异常: {str(e)}")
return None
def create_proxy(self, proxy_config: Dict) -> Optional[str]:
"""
在 AdsPower 中创建代理
使用 AdsPower API v2
Args:
proxy_config: 代理配置字典
Returns:
代理ID失败返回None
"""
try:
url = f"{self.api_url}/api/v2/proxy-list/create"
# 准备请求头
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 准备请求体(数组格式)
payload = [proxy_config]
logger.info("\n" + "="*70)
logger.info("创建 AdsPower 代理")
logger.info("="*70)
logger.info(f"URL: {url}")
logger.info(f"Method: POST")
logger.info(f"Payload:")
logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
response = requests.post(url, json=payload, headers=headers, timeout=30)
logger.info("\n" + "-"*70)
logger.info("响应信息")
logger.info("-"*70)
logger.info(f"Status Code: {response.status_code}")
try:
response_json = response.json()
logger.info("Response Body:")
logger.info(json.dumps(response_json, indent=2, ensure_ascii=False))
except:
logger.info(f"Response Body (Raw):")
logger.info(response.text)
logger.info("="*70 + "\n")
result = response_json if 'response_json' in locals() else response.json()
if result.get('code') == 0:
# 获取创建的代理ID
proxy_data = result.get('data', {})
proxy_ids = proxy_data.get('proxy_id', [])
if proxy_ids and len(proxy_ids) > 0:
proxy_id = proxy_ids[0]
logger.success(f"成功创建代理ID: {proxy_id}")
return proxy_id
else:
logger.error("创建代理成功但未返回ID")
return None
else:
logger.error(f"创建代理失败: {result.get('msg')}")
return None
except Exception as e:
logger.error(f"创建代理异常: {str(e)}")
return None
def list_proxies(self, proxy_ids: list = None, page: int = 1, limit: int = 100) -> Optional[Dict]:
"""
查询代理列表
使用 AdsPower API v2
Args:
proxy_ids: 代理ID列表可选
page: 页码默认1
limit: 每页数量默认100
Returns:
代理列表数据
"""
try:
url = f"{self.api_url}/api/v2/proxy-list/list"
# 准备请求头
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 准备请求体
payload = {
"page": str(page),
"limit": str(limit)
}
if proxy_ids:
payload["proxy_id"] = proxy_ids
logger.info("\n" + "="*70)
logger.info("查询代理列表 (API v2)")
logger.info("="*70)
logger.info(f"URL: {url}")
logger.info(f"Method: POST")
logger.info(f"Headers:")
for k, v in headers.items():
if k == 'Authorization' and v:
logger.info(f" {k}: Bearer {v.split()[-1][:8]}...")
else:
logger.info(f" {k}: {v}")
logger.info(f"Payload:")
logger.info(f" {json.dumps(payload, indent=2, ensure_ascii=False)}")
response = requests.post(url, json=payload, headers=headers, timeout=30)
logger.info("\n" + "-"*70)
logger.info("响应信息")
logger.info("-"*70)
logger.info(f"Status Code: {response.status_code}")
logger.info("Response Body:")
try:
response_json = response.json()
logger.info(json.dumps(response_json, indent=2, ensure_ascii=False))
except:
logger.info(response.text)
logger.info("="*70 + "\n")
result = response_json if 'response_json' in locals() else response.json()
if result.get('code') == 0:
proxy_list = result.get('data', {}).get('list', [])
logger.success(f"查询成功,找到 {len(proxy_list)} 个代理")
return result
else:
logger.error(f"查询代理列表失败: {result.get('msg')}")
return None
except Exception as e:
logger.error(f"查询代理列表异常: {str(e)}")
return None
def delete_proxy(self, proxy_id: str) -> bool:
"""
删除代理
使用 AdsPower API v2
Args:
proxy_id: 代理ID
Returns:
是否成功删除
"""
try:
url = f"{self.api_url}/api/v2/proxy-list/delete"
# 准备请求头
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 准备请求体(数组格式)
payload = {
"proxy_id": [proxy_id]
}
logger.info("\n" + "="*70)
logger.info("删除 AdsPower 代理")
logger.info("="*70)
logger.info(f"URL: {url}")
logger.info(f"Method: POST")
logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
response = requests.post(url, json=payload, headers=headers, timeout=30)
logger.info("\n" + "-"*70)
logger.info("响应信息")
logger.info("-"*70)
logger.info(f"Status Code: {response.status_code}")
try:
response_json = response.json()
logger.info(f"Response Body: {json.dumps(response_json, indent=2, ensure_ascii=False)}")
except:
logger.info(f"Response Body (Raw): {response.text}")
logger.info("="*70 + "\n")
result = response_json if 'response_json' in locals() else response.json()
if result.get('code') == 0:
logger.success(f"成功删除代理ID: {proxy_id}")
return True
else:
logger.error(f"删除代理失败: {result.get('msg')}")
return False
except Exception as e:
logger.error(f"删除代理异常: {str(e)}")
return False
def check_browser_status(self, user_id: str = None) -> Optional[Dict]:
"""
检查浏览器状态
Args:
user_id: AdsPower用户ID
Returns:
浏览器状态信息
"""
target_user_id = user_id or self.user_id
if not target_user_id:
logger.error("未设置 AdsPower User ID")
return None
try:
url = f"{self.api_url}/api/v1/browser/active?user_id={target_user_id}"
# 准备请求头
headers = {}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
response = requests.get(url, headers=headers, timeout=10)
result = response.json()
return result
except Exception as e:
logger.error(f"检查浏览器状态异常: {str(e)}")
return None
def list_groups(self, group_name: str = None, page: int = 1, page_size: int = 2000) -> Optional[Dict]:
"""
查询分组列表
使用 AdsPower API v1
Args:
group_name: 分组名称(可选)
page: 页码
page_size: 每页数量(范围 1 ~ 2000
Returns:
分组列表信息
"""
try:
url = f"{self.api_url}/api/v1/group/list"
# 准备请求头
headers = {}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 准备请求参数
params = {
"page": page,
"page_size": page_size
}
if group_name:
params["group_name"] = group_name
logger.info("\n" + "="*70)
logger.info("📂 查询分组列表")
logger.info("="*70)
logger.info(f"URL: {url}")
logger.info(f"Method: GET")
logger.info(f"Params: {params}")
response = requests.get(url, params=params, headers=headers, timeout=30)
logger.info("\n" + "-"*70)
logger.info("📥 响应信息")
logger.info("-"*70)
logger.info(f"Status Code: {response.status_code}")
try:
response_json = response.json()
logger.info(f"Response Body:")
logger.info(json.dumps(response_json, indent=2, ensure_ascii=False))
except:
logger.info(f"Response Body (Raw):")
logger.info(response.text)
logger.info("="*70 + "\n")
result = response_json if 'response_json' in locals() else response.json()
if result.get('code') == 0:
groups = result.get('data', {}).get('list', [])
logger.success(f"查询成功,找到 {len(groups)} 个分组")
if groups:
logger.info("\n分组列表:")
for idx, group in enumerate(groups, 1):
group_id = group.get('group_id', 'N/A')
group_name = group.get('group_name', 'N/A')
remark = group.get('remark', '')
logger.info(f" {idx}. ID: {group_id} | 名称: {group_name} | 备注: {remark}")
return result
else:
logger.error(f"查询分组失败: {result.get('msg')}")
return None
except Exception as e:
logger.error(f"查询分组异常: {str(e)}")
return None
def get_group_by_env(self) -> Optional[str]:
"""
根据当前运行环境获取对应的分组ID
dev环境查询 group_name=dev
生产环境查询 group_name=prod
Returns:
分组ID失败返回None
"""
# 根据环境决定分组名
group_name = 'dev' if Config.ENV == 'development' else 'prod'
logger.info(f"当前运行环境: {Config.ENV},查询分组: {group_name}")
# 等待一下避免请求过于频繁
import time
time.sleep(1.0)
# 查询分组
result = self.list_groups(group_name=group_name)
if result and result.get('code') == 0:
groups = result.get('data', {}).get('list', [])
if groups:
# 精确匹配分组名称API返回的可能包含多个包含关键词的分组
for group in groups:
if group.get('group_name') == group_name:
group_id = group.get('group_id')
logger.success(f"获取到分组ID: {group_id} (名称: {group_name})")
return group_id
# 如果没有精确匹配,记录警告
logger.warning(f"未找到精确匹配的分组 '{group_name}',返回的分组: {[g.get('group_name') for g in groups]}")
return None
else:
logger.warning(f"未找到名为 '{group_name}' 的分组")
return None
else:
logger.error(f"查询分组失败")
return None
def list_profiles(self, group_id: str = None, page: int = 1, page_size: int = 100,
profile_id: list = None, profile_no: list = None,
limit: int = None, sort_type: str = None, sort_order: str = None) -> Optional[Dict]:
"""
查询 Profile 列表
使用 AdsPower API v2
Args:
group_id: 组ID可选
page: 页码
page_size: 每页数量当limit未指定时使用
profile_id: 环境ID数组可选
profile_no: 环境编号数组(可选)
limit: 每页大小(范围 1 ~ 200
sort_type: 排序类型 (profile_no/last_open_time/created_time)
sort_order: 排序顺序 (asc/desc)
Returns:
Profile 列表信息
"""
try:
url = f"{self.api_url}/api/v2/browser-profile/list"
# 准备请求头
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 准备请求体
payload = {
"page": page,
"page_size": limit if limit else page_size
}
if group_id:
payload["group_id"] = group_id
if profile_id:
payload["profile_id"] = profile_id
if profile_no:
payload["profile_no"] = profile_no
if sort_type:
payload["sort_type"] = sort_type
if sort_order:
payload["sort_order"] = sort_order
# 打印请求信息
logger.info("\n" + "="*70)
logger.info("📋 查询 Profile 列表 (API v2)")
logger.info("="*70)
logger.info(f"URL: {url}")
logger.info(f"Method: POST")
logger.info(f"Headers:")
for key, value in headers.items():
if key == 'Authorization' and 'Bearer' in value:
display_value = f"Bearer {value.split(' ')[1][:8]}..."
else:
display_value = value
logger.info(f" {key}: {display_value}")
logger.info(f"Payload:")
logger.info(f" {json.dumps(payload, indent=2, ensure_ascii=False)}")
response = requests.post(url, json=payload, headers=headers, timeout=30)
# 打印响应信息
logger.info("\n" + "-"*70)
logger.info("📥 响应信息")
logger.info("-"*70)
logger.info(f"Status Code: {response.status_code}")
# 格式化 JSON 响应(限制长度)
try:
response_json = response.json()
# 如果响应太长,只显示关键部分
if len(str(response_json)) > 2000:
logger.info(f"Response Body (摘要):")
summary = {
"code": response_json.get("code"),
"msg": response_json.get("msg"),
"data": {
"total": response_json.get("data", {}).get("total"),
"list_count": len(response_json.get("data", {}).get("list", []))
}
}
logger.info(json.dumps(summary, indent=2, ensure_ascii=False))
else:
logger.info(f"Response Body:")
logger.info(json.dumps(response_json, indent=2, ensure_ascii=False))
except:
logger.info(f"Response Body (Raw):")
logger.info(response.text[:500])
logger.info("="*70 + "\n")
result = response_json if 'response_json' in locals() else response.json()
if result.get('code') == 0:
profiles = result.get('data', {}).get('list', [])
logger.success(f"查询成功,找到 {len(profiles)} 个 Profile")
# 显示 Profile 信息
if profiles:
logger.info("\nProfile 列表:")
for idx, profile in enumerate(profiles[:10], 1): # 只显示前10个
profile_id = profile.get('profile_id', 'N/A')
profile_name = profile.get('name', 'N/A')
group_name = profile.get('group_name', 'N/A')
logger.info(f" {idx}. ID: {profile_id} | 名称: {profile_name} | 组: {group_name}")
if len(profiles) > 10:
logger.info(f" ...还有 {len(profiles) - 10} 个 Profile")
return result
else:
logger.error(f"查询 Profile 失败: {result.get('msg')}")
return None
except Exception as e:
logger.error(f"查询 Profile 异常: {str(e)}")
return None
def create_profile(self, group_id: str, name: str = None, proxy_id: str = None) -> Optional[str]:
"""
创建新的 Profile
使用 AdsPower API v2
Args:
group_id: 分组ID
name: Profile名称可选不填则自动生成
proxy_id: 代理ID必填
Returns:
创建的 Profile ID失败返回 None
"""
try:
url = f"{self.api_url}/api/v2/browser-profile/create"
# 准备请求头
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 准备请求体
import time
profile_name = name or f"auto_{int(time.time())}"
payload = {
"group_id": group_id,
"name": profile_name,
"platform": "health.baidu.com",
"proxyid": proxy_id,
"fingerprint_config": {
"automatic_timezone": "1",
"language": ["zh-CN", "zh"],
"ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
}
logger.info("\n" + "="*70)
logger.info("创建 Profile (API v2)")
logger.info("="*70)
logger.info(f"URL: {url}")
logger.info(f"Method: POST")
logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
response = requests.post(url, json=payload, headers=headers, timeout=30)
logger.info("\n" + "-"*70)
logger.info("响应信息")
logger.info("-"*70)
logger.info(f"Status Code: {response.status_code}")
try:
response_json = response.json()
logger.info(f"Response Body: {json.dumps(response_json, indent=2, ensure_ascii=False)}")
except:
logger.info(f"Response Body (Raw): {response.text}")
logger.info("="*70 + "\n")
result = response_json if 'response_json' in locals() else response.json()
if result.get('code') == 0:
profile_id = result.get('data', {}).get('profile_id')
if profile_id:
logger.success(f"成功创建 ProfileID: {profile_id}")
return profile_id
else:
logger.error("创建 Profile 成功但未返回ID")
return None
else:
logger.error(f"创建 Profile 失败: {result.get('msg')}")
return None
except Exception as e:
logger.error(f"创建 Profile 异常: {str(e)}")
return None
def delete_profile(self, profile_id: str) -> bool:
"""
删除 Profile
使用 AdsPower API v2
Args:
profile_id: Profile ID
Returns:
是否成功删除
"""
try:
result = self._make_request(
'POST',
'/api/v2/browser-profile/delete',
json={"profile_id": [profile_id]}
)
if result and result.get('code') == 0:
logger.success(f"✅ 成功删除 Profile: {profile_id}")
return True
else:
logger.error(f"删除 Profile 失败: {result.get('msg') if result else '请求失败'}")
return False
except Exception as e:
logger.error(f"删除 Profile 异常: {str(e)}")
return False
def __del__(self):
"""析构函数,确保资源清理"""
try:
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
except:
pass
if __name__ == "__main__":
"""快速测试 AdsPower 连接"""
logger.info("开始测试 AdsPower API v2 连接")
logger.info("="*50)
# 创建客户端
client = AdsPowerClient()
# 步骤 1: 先查询 Profile 列表
logger.info("\n步骤 1: 查询 Profile 列表")
result = client.list_profiles()
if not result:
logger.error("❌ 查询 Profile 失败,测试终止")
exit(1)
profiles = result.get('data', {}).get('list', [])
if not profiles:
logger.error("❌ 没有可用的 Profile请先在 AdsPower 中创建 Profile")
exit(1)
# 使用第一个 Profile
first_profile = profiles[0]
profile_id = first_profile.get('profile_id')
profile_name = first_profile.get('name', 'N/A')
logger.success(f"\n将使用 Profile: {profile_name} (ID: {profile_id})")
# 提示用户是否继续
choice = input(f"\n是否启动此 Profile (y/n): ").strip().lower()
if choice != 'y':
logger.info("用户取消操作")
exit(0)
# 问是否使用代理
use_proxy_choice = input(f"\n是否使用大麦IP代理 (y/n): ").strip().lower()
use_proxy = use_proxy_choice == 'y'
# 步骤 2: 设置代理并启动浏览器
logger.info(f"\n步骤 2: {'[使用代理] ' if use_proxy else ''}启动浏览器 {profile_id}")
result = client.setup_proxy_and_start(user_id=profile_id, use_proxy=use_proxy)
if result:
logger.success("✅ 浏览器启动成功!")
# 等待用户确认
input("\n按 Enter 键停止浏览器...")
# 步骤 3: 停止浏览器
logger.info(f"\n步骤 3: 停止浏览器 {profile_id}")
if client.stop_browser(user_id=profile_id):
logger.success("✅ 浏览器停止成功!")
else:
logger.error("❌ 浏览器停止失败")
else:
logger.error("❌ 浏览器启动失败")
logger.info("="*50)
logger.info("测试完成")