Files
ai_wht_wechat/backend/example_use_damai_proxy.py

201 lines
6.0 KiB
Python
Raw Permalink Normal View History

2026-01-06 19:36:42 +08:00
"""
大麦固定代理使用示例
演示如何在实际项目中使用固定代理IP
"""
import asyncio
import sys
from browser_pool import get_browser_pool
from damai_proxy_config import get_proxy_1, get_proxy_2, get_random_proxy
async def example1_use_specific_proxy():
"""示例1: 使用指定的代理IP"""
print("\n" + "="*60)
print("示例1: 使用指定的代理IP代理1")
print("="*60)
# 获取代理1的配置
proxy_config = get_proxy_1()
print(f"📌 使用代理: {proxy_config['server']}")
# 获取浏览器池
pool = get_browser_pool()
try:
# 获取浏览器实例(带代理)
# 注意需要修改browser_pool以支持带认证的代理
browser, context, page = await pool.get_browser(
proxy=proxy_config["server"]
)
# 访问测试页面
print("🌐 访问IP检测页面...")
await page.goto("http://httpbin.org/ip", timeout=30000)
# 获取IP信息
ip_info = await page.evaluate("() => document.body.innerText")
print(f"✅ 当前IP:\n{ip_info}")
except Exception as e:
print(f"❌ 错误: {str(e)}")
async def example2_use_random_proxy():
"""示例2: 随机使用一个代理IP"""
print("\n" + "="*60)
print("示例2: 随机使用一个代理IP")
print("="*60)
# 随机获取一个代理
proxy_config = get_random_proxy()
print(f"📌 随机选择代理: {proxy_config['name']}")
print(f" 服务器: {proxy_config['server']}")
# 后续操作类似示例1
print("✅ 代理配置已获取,可以用于浏览器实例化")
async def example3_use_with_playwright_directly():
"""示例3: 直接在Playwright中使用代理带认证"""
print("\n" + "="*60)
print("示例3: 直接在Playwright中使用代理完整认证")
print("="*60)
from playwright.async_api import async_playwright
# 获取代理配置
proxy_config = get_proxy_2()
print(f"📌 使用代理2: {proxy_config['server']}")
playwright = None
browser = None
try:
# 启动Playwright
playwright = await async_playwright().start()
# 配置代理(完整配置,包含认证信息)
proxy_settings = {
"server": proxy_config["server"],
"username": proxy_config["username"],
"password": proxy_config["password"]
}
# 启动浏览器
browser = await playwright.chromium.launch(
headless=True,
proxy=proxy_settings,
args=['--disable-blink-features=AutomationControlled']
)
# 创建上下文和页面
context = await browser.new_context()
page = await context.new_page()
# 访问测试页面
print("🌐 访问大麦网...")
await page.goto("https://www.damai.cn/", timeout=30000)
title = await page.title()
print(f"✅ 页面标题: {title}")
print(f" 当前URL: {page.url}")
except Exception as e:
print(f"❌ 错误: {str(e)}")
finally:
if browser:
await browser.close()
if playwright:
await playwright.stop()
async def example4_switch_proxy_on_error():
"""示例4: 代理失败时自动切换"""
print("\n" + "="*60)
print("示例4: 代理失败时自动切换到另一个代理")
print("="*60)
from damai_proxy_config import get_all_enabled_proxies
from playwright.async_api import async_playwright
proxies = get_all_enabled_proxies()
print(f"📊 可用代理数: {len(proxies)}")
for i, proxy_config in enumerate(proxies):
print(f"\n🔄 尝试代理 {i+1}/{len(proxies)}: {proxy_config['name']}")
playwright = None
browser = None
try:
# 启动Playwright
playwright = await async_playwright().start()
# 配置代理
proxy_settings = {
"server": proxy_config["server"],
"username": proxy_config["username"],
"password": proxy_config["password"]
}
# 启动浏览器
browser = await playwright.chromium.launch(
headless=True,
proxy=proxy_settings
)
context = await browser.new_context()
page = await context.new_page()
# 测试访问
await page.goto("http://httpbin.org/ip", timeout=15000)
ip_info = await page.evaluate("() => document.body.innerText")
print(f"{proxy_config['name']} 可用")
print(f" IP信息: {ip_info.strip()}")
# 成功则退出循环
await browser.close()
await playwright.stop()
break
except Exception as e:
print(f"⚠️ {proxy_config['name']} 不可用: {str(e)}")
if browser:
await browser.close()
if playwright:
await playwright.stop()
# 如果是最后一个代理也失败,则报错
if i == len(proxies) - 1:
print("❌ 所有代理都不可用!")
async def main():
"""运行所有示例"""
# Windows环境下设置事件循环策略
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
print("\n" + "🎯"*30)
print("大麦固定代理IP使用示例集")
print("🎯"*30)
# 示例2: 随机代理
await example2_use_random_proxy()
# 示例3: 完整的Playwright代理使用
await example3_use_with_playwright_directly()
# 示例4: 代理容错切换
await example4_switch_proxy_on_error()
print("\n" + "="*60)
print("🎉 所有示例运行完成!")
print("="*60)
if __name__ == "__main__":
asyncio.run(main())