166 lines
5.0 KiB
Python
166 lines
5.0 KiB
Python
"""
|
||
MIP广告点击服务使用示例
|
||
展示如何通过 API 与服务交互
|
||
"""
|
||
|
||
import requests
|
||
import json
|
||
from typing import List
|
||
|
||
|
||
class MIPAdClient:
|
||
"""MIP广告点击服务客户端"""
|
||
|
||
def __init__(self, base_url: str = "http://localhost:5000"):
|
||
self.base_url = base_url
|
||
|
||
def check_health(self):
|
||
"""检查服务健康状态"""
|
||
response = requests.get(f"{self.base_url}/health")
|
||
return response.json()
|
||
|
||
def add_url(self, url: str):
|
||
"""添加单个URL"""
|
||
response = requests.post(
|
||
f"{self.base_url}/api/urls",
|
||
json={"url": url}
|
||
)
|
||
return response.json()
|
||
|
||
def add_urls_batch(self, urls: List[str]):
|
||
"""批量添加URL"""
|
||
response = requests.post(
|
||
f"{self.base_url}/api/urls",
|
||
json={"urls": urls}
|
||
)
|
||
return response.json()
|
||
|
||
def get_all_urls(self):
|
||
"""获取所有URL列表"""
|
||
response = requests.get(f"{self.base_url}/api/urls")
|
||
return response.json()
|
||
|
||
def get_url_detail(self, url: str):
|
||
"""获取URL详细信息"""
|
||
response = requests.get(f"{self.base_url}/api/urls/{url}")
|
||
return response.json()
|
||
|
||
def delete_url(self, url: str):
|
||
"""删除URL"""
|
||
response = requests.delete(f"{self.base_url}/api/urls/{url}")
|
||
return response.json()
|
||
|
||
def reset_url(self, url: str):
|
||
"""重置URL(重新开始点击)"""
|
||
response = requests.post(f"{self.base_url}/api/urls/{url}/reset")
|
||
return response.json()
|
||
|
||
def get_statistics(self):
|
||
"""获取统计数据"""
|
||
response = requests.get(f"{self.base_url}/api/statistics")
|
||
return response.json()
|
||
|
||
def start_scheduler(self):
|
||
"""启动调度器"""
|
||
response = requests.post(f"{self.base_url}/api/scheduler/start")
|
||
return response.json()
|
||
|
||
def stop_scheduler(self):
|
||
"""停止调度器"""
|
||
response = requests.post(f"{self.base_url}/api/scheduler/stop")
|
||
return response.json()
|
||
|
||
def get_scheduler_status(self):
|
||
"""获取调度器状态"""
|
||
response = requests.get(f"{self.base_url}/api/scheduler/status")
|
||
return response.json()
|
||
|
||
|
||
def example_usage():
|
||
"""使用示例"""
|
||
|
||
# 创建客户端
|
||
client = MIPAdClient()
|
||
|
||
print("=" * 60)
|
||
print("MIP广告点击服务使用示例")
|
||
print("=" * 60)
|
||
|
||
# 1. 检查服务状态
|
||
print("\n1. 检查服务健康状态")
|
||
health = client.check_health()
|
||
print(f"服务状态: {health}")
|
||
|
||
# 2. 添加单个URL
|
||
print("\n2. 添加单个URL")
|
||
test_url = "https://example.com/mip-page-1"
|
||
result = client.add_url(test_url)
|
||
print(f"添加结果: {result}")
|
||
|
||
# 3. 批量添加URL
|
||
print("\n3. 批量添加URL")
|
||
test_urls = [
|
||
"https://example.com/mip-page-2",
|
||
"https://example.com/mip-page-3",
|
||
"https://example.com/mip-page-4"
|
||
]
|
||
result = client.add_urls_batch(test_urls)
|
||
print(f"批量添加结果: {result}")
|
||
|
||
# 4. 获取所有URL
|
||
print("\n4. 获取所有URL列表")
|
||
urls = client.get_all_urls()
|
||
print(f"URL总数: {len(urls.get('data', []))}")
|
||
if urls.get('data'):
|
||
print("URL列表:")
|
||
for url_data in urls['data'][:3]: # 只显示前3个
|
||
print(f" - {url_data['url']}")
|
||
print(f" 状态: {url_data['status']}")
|
||
print(f" 点击进度: {url_data['click_count']}/{url_data['target_clicks']}")
|
||
print(f" 回复数: {url_data['reply_count']}")
|
||
|
||
# 5. 获取URL详细信息
|
||
print("\n5. 获取URL详细信息")
|
||
detail = client.get_url_detail(test_url)
|
||
if detail.get('success'):
|
||
print(f"URL详情: {json.dumps(detail['data'], indent=2, ensure_ascii=False)}")
|
||
|
||
# 6. 获取统计数据
|
||
print("\n6. 获取统计数据")
|
||
stats = client.get_statistics()
|
||
if stats.get('success'):
|
||
print("统计信息:")
|
||
for key, value in stats['data'].items():
|
||
print(f" {key}: {value}")
|
||
|
||
# 7. 查看调度器状态
|
||
print("\n7. 查看调度器状态")
|
||
status = client.get_scheduler_status()
|
||
print(f"调度器状态: {status}")
|
||
|
||
# 8. 重置URL(如果需要)
|
||
print("\n8. 重置URL示例")
|
||
# reset_result = client.reset_url(test_url)
|
||
# print(f"重置结果: {reset_result}")
|
||
print("(跳过实际重置操作)")
|
||
|
||
# 9. 删除URL(如果需要)
|
||
print("\n9. 删除URL示例")
|
||
# delete_result = client.delete_url(test_url)
|
||
# print(f"删除结果: {delete_result}")
|
||
print("(跳过实际删除操作)")
|
||
|
||
print("\n" + "=" * 60)
|
||
print("示例完成")
|
||
print("=" * 60)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
example_usage()
|
||
except requests.exceptions.ConnectionError:
|
||
print("错误: 无法连接到服务,请确保服务已启动")
|
||
print("启动命令: python app.py")
|
||
except Exception as e:
|
||
print(f"错误: {str(e)}")
|