""" MIP广告点击服务使用示例 展示如何通过 API 与服务交互 """ import requests import json from typing import List class MIPAdClient: """MIP广告点击服务客户端""" def __init__(self, base_url: str = "http://localhost:5000"): self.base_url = base_url def check_health(self): """检查服务健康状态""" response = requests.get(f"{self.base_url}/health") return response.json() def add_url(self, url: str): """添加单个URL""" response = requests.post( f"{self.base_url}/api/urls", json={"url": url} ) return response.json() def add_urls_batch(self, urls: List[str]): """批量添加URL""" response = requests.post( f"{self.base_url}/api/urls", json={"urls": urls} ) return response.json() def get_all_urls(self): """获取所有URL列表""" response = requests.get(f"{self.base_url}/api/urls") return response.json() def get_url_detail(self, url: str): """获取URL详细信息""" response = requests.get(f"{self.base_url}/api/urls/{url}") return response.json() def delete_url(self, url: str): """删除URL""" response = requests.delete(f"{self.base_url}/api/urls/{url}") return response.json() def reset_url(self, url: str): """重置URL(重新开始点击)""" response = requests.post(f"{self.base_url}/api/urls/{url}/reset") return response.json() def get_statistics(self): """获取统计数据""" response = requests.get(f"{self.base_url}/api/statistics") return response.json() def start_scheduler(self): """启动调度器""" response = requests.post(f"{self.base_url}/api/scheduler/start") return response.json() def stop_scheduler(self): """停止调度器""" response = requests.post(f"{self.base_url}/api/scheduler/stop") return response.json() def get_scheduler_status(self): """获取调度器状态""" response = requests.get(f"{self.base_url}/api/scheduler/status") return response.json() def example_usage(): """使用示例""" # 创建客户端 client = MIPAdClient() print("=" * 60) print("MIP广告点击服务使用示例") print("=" * 60) # 1. 检查服务状态 print("\n1. 检查服务健康状态") health = client.check_health() print(f"服务状态: {health}") # 2. 添加单个URL print("\n2. 添加单个URL") test_url = "https://example.com/mip-page-1" result = client.add_url(test_url) print(f"添加结果: {result}") # 3. 批量添加URL print("\n3. 批量添加URL") test_urls = [ "https://example.com/mip-page-2", "https://example.com/mip-page-3", "https://example.com/mip-page-4" ] result = client.add_urls_batch(test_urls) print(f"批量添加结果: {result}") # 4. 获取所有URL print("\n4. 获取所有URL列表") urls = client.get_all_urls() print(f"URL总数: {len(urls.get('data', []))}") if urls.get('data'): print("URL列表:") for url_data in urls['data'][:3]: # 只显示前3个 print(f" - {url_data['url']}") print(f" 状态: {url_data['status']}") print(f" 点击进度: {url_data['click_count']}/{url_data['target_clicks']}") print(f" 回复数: {url_data['reply_count']}") # 5. 获取URL详细信息 print("\n5. 获取URL详细信息") detail = client.get_url_detail(test_url) if detail.get('success'): print(f"URL详情: {json.dumps(detail['data'], indent=2, ensure_ascii=False)}") # 6. 获取统计数据 print("\n6. 获取统计数据") stats = client.get_statistics() if stats.get('success'): print("统计信息:") for key, value in stats['data'].items(): print(f" {key}: {value}") # 7. 查看调度器状态 print("\n7. 查看调度器状态") status = client.get_scheduler_status() print(f"调度器状态: {status}") # 8. 重置URL(如果需要) print("\n8. 重置URL示例") # reset_result = client.reset_url(test_url) # print(f"重置结果: {reset_result}") print("(跳过实际重置操作)") # 9. 删除URL(如果需要) print("\n9. 删除URL示例") # delete_result = client.delete_url(test_url) # print(f"删除结果: {delete_result}") print("(跳过实际删除操作)") print("\n" + "=" * 60) print("示例完成") print("=" * 60) if __name__ == "__main__": try: example_usage() except requests.exceptions.ConnectionError: print("错误: 无法连接到服务,请确保服务已启动") print("启动命令: python app.py") except Exception as e: print(f"错误: {str(e)}")