201 lines
6.0 KiB
Python
201 lines
6.0 KiB
Python
"""
|
||
大麦固定代理使用示例
|
||
演示如何在实际项目中使用固定代理IP
|
||
"""
|
||
import asyncio
|
||
import sys
|
||
from browser_pool import get_browser_pool
|
||
from damai_proxy_config import get_proxy_1, get_proxy_2, get_random_proxy
|
||
|
||
|
||
async def example1_use_specific_proxy():
|
||
"""示例1: 使用指定的代理IP"""
|
||
print("\n" + "="*60)
|
||
print("示例1: 使用指定的代理IP(代理1)")
|
||
print("="*60)
|
||
|
||
# 获取代理1的配置
|
||
proxy_config = get_proxy_1()
|
||
print(f"📌 使用代理: {proxy_config['server']}")
|
||
|
||
# 获取浏览器池
|
||
pool = get_browser_pool()
|
||
|
||
try:
|
||
# 获取浏览器实例(带代理)
|
||
# 注意:需要修改browser_pool以支持带认证的代理
|
||
browser, context, page = await pool.get_browser(
|
||
proxy=proxy_config["server"]
|
||
)
|
||
|
||
# 访问测试页面
|
||
print("🌐 访问IP检测页面...")
|
||
await page.goto("http://httpbin.org/ip", timeout=30000)
|
||
|
||
# 获取IP信息
|
||
ip_info = await page.evaluate("() => document.body.innerText")
|
||
print(f"✅ 当前IP:\n{ip_info}")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 错误: {str(e)}")
|
||
|
||
|
||
async def example2_use_random_proxy():
|
||
"""示例2: 随机使用一个代理IP"""
|
||
print("\n" + "="*60)
|
||
print("示例2: 随机使用一个代理IP")
|
||
print("="*60)
|
||
|
||
# 随机获取一个代理
|
||
proxy_config = get_random_proxy()
|
||
print(f"📌 随机选择代理: {proxy_config['name']}")
|
||
print(f" 服务器: {proxy_config['server']}")
|
||
|
||
# 后续操作类似示例1
|
||
print("✅ 代理配置已获取,可以用于浏览器实例化")
|
||
|
||
|
||
async def example3_use_with_playwright_directly():
|
||
"""示例3: 直接在Playwright中使用代理(带认证)"""
|
||
print("\n" + "="*60)
|
||
print("示例3: 直接在Playwright中使用代理(完整认证)")
|
||
print("="*60)
|
||
|
||
from playwright.async_api import async_playwright
|
||
|
||
# 获取代理配置
|
||
proxy_config = get_proxy_2()
|
||
print(f"📌 使用代理2: {proxy_config['server']}")
|
||
|
||
playwright = None
|
||
browser = None
|
||
|
||
try:
|
||
# 启动Playwright
|
||
playwright = await async_playwright().start()
|
||
|
||
# 配置代理(完整配置,包含认证信息)
|
||
proxy_settings = {
|
||
"server": proxy_config["server"],
|
||
"username": proxy_config["username"],
|
||
"password": proxy_config["password"]
|
||
}
|
||
|
||
# 启动浏览器
|
||
browser = await playwright.chromium.launch(
|
||
headless=True,
|
||
proxy=proxy_settings,
|
||
args=['--disable-blink-features=AutomationControlled']
|
||
)
|
||
|
||
# 创建上下文和页面
|
||
context = await browser.new_context()
|
||
page = await context.new_page()
|
||
|
||
# 访问测试页面
|
||
print("🌐 访问大麦网...")
|
||
await page.goto("https://www.damai.cn/", timeout=30000)
|
||
|
||
title = await page.title()
|
||
print(f"✅ 页面标题: {title}")
|
||
print(f" 当前URL: {page.url}")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 错误: {str(e)}")
|
||
|
||
finally:
|
||
if browser:
|
||
await browser.close()
|
||
if playwright:
|
||
await playwright.stop()
|
||
|
||
|
||
async def example4_switch_proxy_on_error():
|
||
"""示例4: 代理失败时自动切换"""
|
||
print("\n" + "="*60)
|
||
print("示例4: 代理失败时自动切换到另一个代理")
|
||
print("="*60)
|
||
|
||
from damai_proxy_config import get_all_enabled_proxies
|
||
from playwright.async_api import async_playwright
|
||
|
||
proxies = get_all_enabled_proxies()
|
||
print(f"📊 可用代理数: {len(proxies)}")
|
||
|
||
for i, proxy_config in enumerate(proxies):
|
||
print(f"\n🔄 尝试代理 {i+1}/{len(proxies)}: {proxy_config['name']}")
|
||
|
||
playwright = None
|
||
browser = None
|
||
|
||
try:
|
||
# 启动Playwright
|
||
playwright = await async_playwright().start()
|
||
|
||
# 配置代理
|
||
proxy_settings = {
|
||
"server": proxy_config["server"],
|
||
"username": proxy_config["username"],
|
||
"password": proxy_config["password"]
|
||
}
|
||
|
||
# 启动浏览器
|
||
browser = await playwright.chromium.launch(
|
||
headless=True,
|
||
proxy=proxy_settings
|
||
)
|
||
|
||
context = await browser.new_context()
|
||
page = await context.new_page()
|
||
|
||
# 测试访问
|
||
await page.goto("http://httpbin.org/ip", timeout=15000)
|
||
ip_info = await page.evaluate("() => document.body.innerText")
|
||
|
||
print(f"✅ {proxy_config['name']} 可用")
|
||
print(f" IP信息: {ip_info.strip()}")
|
||
|
||
# 成功则退出循环
|
||
await browser.close()
|
||
await playwright.stop()
|
||
break
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ {proxy_config['name']} 不可用: {str(e)}")
|
||
if browser:
|
||
await browser.close()
|
||
if playwright:
|
||
await playwright.stop()
|
||
|
||
# 如果是最后一个代理也失败,则报错
|
||
if i == len(proxies) - 1:
|
||
print("❌ 所有代理都不可用!")
|
||
|
||
|
||
async def main():
|
||
"""运行所有示例"""
|
||
# Windows环境下设置事件循环策略
|
||
if sys.platform == 'win32':
|
||
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
|
||
|
||
print("\n" + "🎯"*30)
|
||
print("大麦固定代理IP使用示例集")
|
||
print("🎯"*30)
|
||
|
||
# 示例2: 随机代理
|
||
await example2_use_random_proxy()
|
||
|
||
# 示例3: 完整的Playwright代理使用
|
||
await example3_use_with_playwright_directly()
|
||
|
||
# 示例4: 代理容错切换
|
||
await example4_switch_proxy_on_error()
|
||
|
||
print("\n" + "="*60)
|
||
print("🎉 所有示例运行完成!")
|
||
print("="*60)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|