201 lines
6.0 KiB
Python
201 lines
6.0 KiB
Python
|
|
"""
|
|||
|
|
大麦固定代理使用示例
|
|||
|
|
演示如何在实际项目中使用固定代理IP
|
|||
|
|
"""
|
|||
|
|
import asyncio
|
|||
|
|
import sys
|
|||
|
|
from browser_pool import get_browser_pool
|
|||
|
|
from damai_proxy_config import get_proxy_1, get_proxy_2, get_random_proxy
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def example1_use_specific_proxy():
|
|||
|
|
"""示例1: 使用指定的代理IP"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("示例1: 使用指定的代理IP(代理1)")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
# 获取代理1的配置
|
|||
|
|
proxy_config = get_proxy_1()
|
|||
|
|
print(f"📌 使用代理: {proxy_config['server']}")
|
|||
|
|
|
|||
|
|
# 获取浏览器池
|
|||
|
|
pool = get_browser_pool()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 获取浏览器实例(带代理)
|
|||
|
|
# 注意:需要修改browser_pool以支持带认证的代理
|
|||
|
|
browser, context, page = await pool.get_browser(
|
|||
|
|
proxy=proxy_config["server"]
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 访问测试页面
|
|||
|
|
print("🌐 访问IP检测页面...")
|
|||
|
|
await page.goto("http://httpbin.org/ip", timeout=30000)
|
|||
|
|
|
|||
|
|
# 获取IP信息
|
|||
|
|
ip_info = await page.evaluate("() => document.body.innerText")
|
|||
|
|
print(f"✅ 当前IP:\n{ip_info}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"❌ 错误: {str(e)}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def example2_use_random_proxy():
|
|||
|
|
"""示例2: 随机使用一个代理IP"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("示例2: 随机使用一个代理IP")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
# 随机获取一个代理
|
|||
|
|
proxy_config = get_random_proxy()
|
|||
|
|
print(f"📌 随机选择代理: {proxy_config['name']}")
|
|||
|
|
print(f" 服务器: {proxy_config['server']}")
|
|||
|
|
|
|||
|
|
# 后续操作类似示例1
|
|||
|
|
print("✅ 代理配置已获取,可以用于浏览器实例化")
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def example3_use_with_playwright_directly():
|
|||
|
|
"""示例3: 直接在Playwright中使用代理(带认证)"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("示例3: 直接在Playwright中使用代理(完整认证)")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
from playwright.async_api import async_playwright
|
|||
|
|
|
|||
|
|
# 获取代理配置
|
|||
|
|
proxy_config = get_proxy_2()
|
|||
|
|
print(f"📌 使用代理2: {proxy_config['server']}")
|
|||
|
|
|
|||
|
|
playwright = None
|
|||
|
|
browser = None
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 启动Playwright
|
|||
|
|
playwright = await async_playwright().start()
|
|||
|
|
|
|||
|
|
# 配置代理(完整配置,包含认证信息)
|
|||
|
|
proxy_settings = {
|
|||
|
|
"server": proxy_config["server"],
|
|||
|
|
"username": proxy_config["username"],
|
|||
|
|
"password": proxy_config["password"]
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 启动浏览器
|
|||
|
|
browser = await playwright.chromium.launch(
|
|||
|
|
headless=True,
|
|||
|
|
proxy=proxy_settings,
|
|||
|
|
args=['--disable-blink-features=AutomationControlled']
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 创建上下文和页面
|
|||
|
|
context = await browser.new_context()
|
|||
|
|
page = await context.new_page()
|
|||
|
|
|
|||
|
|
# 访问测试页面
|
|||
|
|
print("🌐 访问大麦网...")
|
|||
|
|
await page.goto("https://www.damai.cn/", timeout=30000)
|
|||
|
|
|
|||
|
|
title = await page.title()
|
|||
|
|
print(f"✅ 页面标题: {title}")
|
|||
|
|
print(f" 当前URL: {page.url}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"❌ 错误: {str(e)}")
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
if browser:
|
|||
|
|
await browser.close()
|
|||
|
|
if playwright:
|
|||
|
|
await playwright.stop()
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def example4_switch_proxy_on_error():
|
|||
|
|
"""示例4: 代理失败时自动切换"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("示例4: 代理失败时自动切换到另一个代理")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
from damai_proxy_config import get_all_enabled_proxies
|
|||
|
|
from playwright.async_api import async_playwright
|
|||
|
|
|
|||
|
|
proxies = get_all_enabled_proxies()
|
|||
|
|
print(f"📊 可用代理数: {len(proxies)}")
|
|||
|
|
|
|||
|
|
for i, proxy_config in enumerate(proxies):
|
|||
|
|
print(f"\n🔄 尝试代理 {i+1}/{len(proxies)}: {proxy_config['name']}")
|
|||
|
|
|
|||
|
|
playwright = None
|
|||
|
|
browser = None
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 启动Playwright
|
|||
|
|
playwright = await async_playwright().start()
|
|||
|
|
|
|||
|
|
# 配置代理
|
|||
|
|
proxy_settings = {
|
|||
|
|
"server": proxy_config["server"],
|
|||
|
|
"username": proxy_config["username"],
|
|||
|
|
"password": proxy_config["password"]
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 启动浏览器
|
|||
|
|
browser = await playwright.chromium.launch(
|
|||
|
|
headless=True,
|
|||
|
|
proxy=proxy_settings
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
context = await browser.new_context()
|
|||
|
|
page = await context.new_page()
|
|||
|
|
|
|||
|
|
# 测试访问
|
|||
|
|
await page.goto("http://httpbin.org/ip", timeout=15000)
|
|||
|
|
ip_info = await page.evaluate("() => document.body.innerText")
|
|||
|
|
|
|||
|
|
print(f"✅ {proxy_config['name']} 可用")
|
|||
|
|
print(f" IP信息: {ip_info.strip()}")
|
|||
|
|
|
|||
|
|
# 成功则退出循环
|
|||
|
|
await browser.close()
|
|||
|
|
await playwright.stop()
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"⚠️ {proxy_config['name']} 不可用: {str(e)}")
|
|||
|
|
if browser:
|
|||
|
|
await browser.close()
|
|||
|
|
if playwright:
|
|||
|
|
await playwright.stop()
|
|||
|
|
|
|||
|
|
# 如果是最后一个代理也失败,则报错
|
|||
|
|
if i == len(proxies) - 1:
|
|||
|
|
print("❌ 所有代理都不可用!")
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def main():
|
|||
|
|
"""运行所有示例"""
|
|||
|
|
# Windows环境下设置事件循环策略
|
|||
|
|
if sys.platform == 'win32':
|
|||
|
|
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
|
|||
|
|
|
|||
|
|
print("\n" + "🎯"*30)
|
|||
|
|
print("大麦固定代理IP使用示例集")
|
|||
|
|
print("🎯"*30)
|
|||
|
|
|
|||
|
|
# 示例2: 随机代理
|
|||
|
|
await example2_use_random_proxy()
|
|||
|
|
|
|||
|
|
# 示例3: 完整的Playwright代理使用
|
|||
|
|
await example3_use_with_playwright_directly()
|
|||
|
|
|
|||
|
|
# 示例4: 代理容错切换
|
|||
|
|
await example4_switch_proxy_on_error()
|
|||
|
|
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("🎉 所有示例运行完成!")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
asyncio.run(main())
|