#!/usr/bin/env python # -*- coding: utf-8 -*- """ 测试文章接口性能并生成CSV报告和接口文档 """ import requests import time import csv from datetime import datetime import json # API配置 API_BASE_URL = "http://127.0.0.1:8216" USERNAME = "13621242430" PASSWORD = "admin123" # 测试结果存储 test_results = [] api_docs = [] def login(): """登录获取token""" print("\n" + "="*80) print("正在登录...") url = f"{API_BASE_URL}/api/auth/login" data = {"username": USERNAME, "password": PASSWORD} start_time = time.time() response = requests.post(url, json=data, timeout=30) elapsed_time = (time.time() - start_time) * 1000 if response.status_code == 200: token = response.json()['data']['token'] print(f"✓ 登录成功,耗时: {elapsed_time:.2f}ms") # 记录登录接口 test_results.append({ 'endpoint': '/api/auth/login', 'method': 'POST', 'status_code': response.status_code, 'elapsed_time_ms': f"{elapsed_time:.2f}", 'success': True, 'error': '' }) return token else: print(f"✗ 登录失败: {response.status_code}") test_results.append({ 'endpoint': '/api/auth/login', 'method': 'POST', 'status_code': response.status_code, 'elapsed_time_ms': f"{elapsed_time:.2f}", 'success': False, 'error': response.text }) return None def test_get_articles_list(token): """测试获取文章列表接口""" print("\n" + "-"*80) print("[1/7] 测试获取文章列表...") url = f"{API_BASE_URL}/api/articles/list" headers = {"Authorization": f"Bearer {token}"} # 测试不同的查询参数组合 test_cases = [ {"page": 1, "pageSize": 20, "desc": "基本查询"}, {"page": 1, "pageSize": 10, "keyword": "文章", "desc": "关键词搜索"}, {"page": 2, "pageSize": 20, "desc": "第二页"}, ] for i, params in enumerate(test_cases, 1): desc = params.pop('desc') try: start_time = time.time() response = requests.get(url, headers=headers, params=params, timeout=30) elapsed_time = (time.time() - start_time) * 1000 success = response.status_code == 200 error = '' if success else response.text print(f" [{i}] {desc}: {response.status_code} - {elapsed_time:.2f}ms") test_results.append({ 'endpoint': f'/api/articles/list ({desc})', 'method': 'GET', 'status_code': response.status_code, 'elapsed_time_ms': f"{elapsed_time:.2f}", 'success': success, 'error': error }) time.sleep(0.1) except Exception as e: print(f" [{i}] {desc}: 失败 - {str(e)}") test_results.append({ 'endpoint': f'/api/articles/list ({desc})', 'method': 'GET', 'status_code': 0, 'elapsed_time_ms': '0', 'success': False, 'error': str(e) }) # 添加接口文档 api_docs.append({ 'endpoint': '/api/articles/list', 'method': 'GET', 'description': '获取文章列表(聚合图片和标签)', 'auth': '需要认证', 'params': 'page, pageSize, keyword, product_id, status', 'response': '返回文章列表,包含图片和标签信息' }) def test_get_articles_dashboard(token): """测试获取文章仪表盘统计""" print("\n" + "-"*80) print("[2/7] 测试获取文章仪表盘统计...") url = f"{API_BASE_URL}/api/articles/list_dashboard" headers = {"Authorization": f"Bearer {token}"} try: start_time = time.time() response = requests.get(url, headers=headers, timeout=30) elapsed_time = (time.time() - start_time) * 1000 success = response.status_code == 200 error = '' if success else response.text print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms") test_results.append({ 'endpoint': '/api/articles/list_dashboard', 'method': 'GET', 'status_code': response.status_code, 'elapsed_time_ms': f"{elapsed_time:.2f}", 'success': success, 'error': error }) except Exception as e: print(f" 失败: {str(e)}") test_results.append({ 'endpoint': '/api/articles/list_dashboard', 'method': 'GET', 'status_code': 0, 'elapsed_time_ms': '0', 'success': False, 'error': str(e) }) api_docs.append({ 'endpoint': '/api/articles/list_dashboard', 'method': 'GET', 'description': '获取文章仪表盘统计(总数、可发、已发)', 'auth': '需要认证', 'params': '无', 'response': 'articles_total, articles_available, articles_published' }) def test_generate_article(token): """测试生成文案接口""" print("\n" + "-"*80) print("[3/7] 测试生成文案...") url = f"{API_BASE_URL}/api/articles/generate" headers = {"Authorization": f"Bearer {token}"} # 测试数据(根据实际数据库调整) data = { "product_id": 1, "prompt_workflow_id": 1, "topics": ["测试主题1", "测试主题2"], "count": 2 } try: start_time = time.time() response = requests.post(url, headers=headers, json=data, timeout=30) elapsed_time = (time.time() - start_time) * 1000 success = response.status_code == 200 error = '' if success else response.text print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms") test_results.append({ 'endpoint': '/api/articles/generate', 'method': 'POST', 'status_code': response.status_code, 'elapsed_time_ms': f"{elapsed_time:.2f}", 'success': success, 'error': error }) except Exception as e: print(f" 失败: {str(e)}") test_results.append({ 'endpoint': '/api/articles/generate', 'method': 'POST', 'status_code': 0, 'elapsed_time_ms': '0', 'success': False, 'error': str(e) }) api_docs.append({ 'endpoint': '/api/articles/generate', 'method': 'POST', 'description': '生成文案(批量生成文章)', 'auth': '需要认证', 'params': 'product_id, prompt_workflow_id, topics, count', 'response': '返回生成的文章ID和标题列表' }) def test_get_article_detail(token): """测试获取文章详情""" print("\n" + "-"*80) print("[4/7] 测试获取文章详情...") # 先获取一个文章ID list_url = f"{API_BASE_URL}/api/articles/list" headers = {"Authorization": f"Bearer {token}"} list_response = requests.get(list_url, headers=headers, params={"page": 1, "pageSize": 1}, timeout=30) article_id = None if list_response.status_code == 200: articles = list_response.json().get('data', {}).get('list', []) if articles: article_id = articles[0]['id'] if article_id: url = f"{API_BASE_URL}/api/articles/{article_id}" try: start_time = time.time() response = requests.get(url, headers=headers, timeout=30) elapsed_time = (time.time() - start_time) * 1000 success = response.status_code == 200 error = '' if success else response.text print(f" 文章ID: {article_id}, 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms") test_results.append({ 'endpoint': f'/api/articles/{article_id}', 'method': 'GET', 'status_code': response.status_code, 'elapsed_time_ms': f"{elapsed_time:.2f}", 'success': success, 'error': error }) except Exception as e: print(f" 失败: {str(e)}") test_results.append({ 'endpoint': f'/api/articles/{article_id}', 'method': 'GET', 'status_code': 0, 'elapsed_time_ms': '0', 'success': False, 'error': str(e) }) else: print(" 跳过:没有找到文章") test_results.append({ 'endpoint': '/api/articles/', 'method': 'GET', 'status_code': 0, 'elapsed_time_ms': '0', 'success': False, 'error': '没有找到文章' }) api_docs.append({ 'endpoint': '/api/articles/', 'method': 'GET', 'description': '获取文章详情(包含图片、标签、发布记录)', 'auth': '需要认证', 'params': 'article_id (路径参数)', 'response': '完整的文章信息,包括images、tags、publish_records' }) def test_update_article(token): """测试更新文章""" print("\n" + "-"*80) print("[5/7] 测试更新文章...") # 先获取一个文章ID list_url = f"{API_BASE_URL}/api/articles/list" headers = {"Authorization": f"Bearer {token}"} list_response = requests.get(list_url, headers=headers, params={"page": 1, "pageSize": 1}, timeout=30) article_id = None if list_response.status_code == 200: articles = list_response.json().get('data', {}).get('list', []) if articles: article_id = articles[0]['id'] if article_id: url = f"{API_BASE_URL}/api/articles/{article_id}" data = { "title": f"测试更新标题_{int(time.time())}", "status": "draft" } try: start_time = time.time() response = requests.put(url, headers=headers, json=data, timeout=30) elapsed_time = (time.time() - start_time) * 1000 success = response.status_code == 200 error = '' if success else response.text print(f" 文章ID: {article_id}, 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms") test_results.append({ 'endpoint': f'/api/articles/{article_id} (PUT)', 'method': 'PUT', 'status_code': response.status_code, 'elapsed_time_ms': f"{elapsed_time:.2f}", 'success': success, 'error': error }) except Exception as e: print(f" 失败: {str(e)}") test_results.append({ 'endpoint': f'/api/articles/{article_id} (PUT)', 'method': 'PUT', 'status_code': 0, 'elapsed_time_ms': '0', 'success': False, 'error': str(e) }) else: print(" 跳过:没有找到文章") test_results.append({ 'endpoint': '/api/articles/ (PUT)', 'method': 'PUT', 'status_code': 0, 'elapsed_time_ms': '0', 'success': False, 'error': '没有找到文章' }) api_docs.append({ 'endpoint': '/api/articles/', 'method': 'PUT', 'description': '更新文章信息', 'auth': '需要认证', 'params': 'article_id (路径), title, content, status等字段', 'response': '更新成功消息' }) def test_batch_published_account_cycle(token): """测试批量发布文章-账号循环分配""" print("\n" + "-"*80) print("[6/7] 测试批量发布文章(账号循环)...") url = f"{API_BASE_URL}/api/articles/batch-published-account-cycle" headers = {"Authorization": f"Bearer {token}"} data = { "article_ids": [1, 2], "author_ids": [1, 2] } try: start_time = time.time() response = requests.post(url, headers=headers, json=data, timeout=30) elapsed_time = (time.time() - start_time) * 1000 success = response.status_code == 200 error = '' if success else response.text print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms") test_results.append({ 'endpoint': '/api/articles/batch-published-account-cycle', 'method': 'POST', 'status_code': response.status_code, 'elapsed_time_ms': f"{elapsed_time:.2f}", 'success': success, 'error': error }) except Exception as e: print(f" 失败: {str(e)}") test_results.append({ 'endpoint': '/api/articles/batch-published-account-cycle', 'method': 'POST', 'status_code': 0, 'elapsed_time_ms': '0', 'success': False, 'error': str(e) }) api_docs.append({ 'endpoint': '/api/articles/batch-published-account-cycle', 'method': 'POST', 'description': '批量发布文章 - 账号循环分配', 'auth': '需要认证+角色(admin/editor/reviewer/publisher/enterprise)', 'params': 'article_ids (数组), author_ids (数组)', 'response': '发布结果统计和详细列表' }) def test_batch_published_review(token): """测试批量发布文章""" print("\n" + "-"*80) print("[7/7] 测试批量发布文章...") url = f"{API_BASE_URL}/api/articles/batch-published-review" headers = {"Authorization": f"Bearer {token}"} data = { "article_ids": [1, 2], "author_id": 1 } try: start_time = time.time() response = requests.post(url, headers=headers, json=data, timeout=30) elapsed_time = (time.time() - start_time) * 1000 success = response.status_code == 200 error = '' if success else response.text print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms") test_results.append({ 'endpoint': '/api/articles/batch-published-review', 'method': 'POST', 'status_code': response.status_code, 'elapsed_time_ms': f"{elapsed_time:.2f}", 'success': success, 'error': error }) except Exception as e: print(f" 失败: {str(e)}") test_results.append({ 'endpoint': '/api/articles/batch-published-review', 'method': 'POST', 'status_code': 0, 'elapsed_time_ms': '0', 'success': False, 'error': str(e) }) api_docs.append({ 'endpoint': '/api/articles/batch-published-review', 'method': 'POST', 'description': '批量发布文章到指定作者', 'auth': '需要认证+角色(admin/editor/reviewer/publisher/enterprise)', 'params': 'article_ids (数组), author_id', 'response': '发布结果统计和详细列表' }) def save_to_csv(): """保存测试结果到CSV文件""" timestamp = datetime.now().strftime('%Y%m%d') filename = f'文章接口性能测试_{timestamp}.csv' with open(filename, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=[ 'endpoint', 'method', 'status_code', 'elapsed_time_ms', 'success', 'error' ]) writer.writeheader() writer.writerows(test_results) print(f"\n✓ 性能测试结果已保存到: {filename}") return filename def save_api_docs(): """保存接口文档到CSV文件""" timestamp = datetime.now().strftime('%Y%m%d') filename = f'文章接口文档_{timestamp}.csv' with open(filename, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=[ 'endpoint', 'method', 'description', 'auth', 'params', 'response' ]) writer.writeheader() writer.writerows(api_docs) print(f"✓ 接口文档已保存到: {filename}") return filename def print_summary(): """打印测试摘要""" print("\n" + "="*80) print("测试摘要") print("="*80) total = len(test_results) success = sum(1 for r in test_results if r['success']) failed = total - success avg_time = sum(float(r['elapsed_time_ms']) for r in test_results) / total if total > 0 else 0 max_time = max(float(r['elapsed_time_ms']) for r in test_results) if total > 0 else 0 min_time = min(float(r['elapsed_time_ms']) for r in test_results) if total > 0 else 0 print(f"总测试数: {total}") print(f"成功: {success} ({success/total*100:.1f}%)") print(f"失败: {failed} ({failed/total*100:.1f}%)") print(f"平均耗时: {avg_time:.2f}ms") print(f"最大耗时: {max_time:.2f}ms") print(f"最小耗时: {min_time:.2f}ms") print("="*80) def main(): """主函数""" print("="*80) print("文章接口性能测试工具") print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"API地址: {API_BASE_URL}") print("="*80) # 登录 token = login() if not token: print("\n✗ 登录失败,无法继续测试") return # 执行所有测试 test_get_articles_list(token) test_get_articles_dashboard(token) test_generate_article(token) test_get_article_detail(token) test_update_article(token) test_batch_published_account_cycle(token) test_batch_published_review(token) # 打印摘要 print_summary() # 保存结果 csv_file = save_to_csv() doc_file = save_api_docs() print(f"\n✓ 测试完成!") print(f" - 性能数据: {csv_file}") print(f" - 接口文档: {doc_file}") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n✗ 测试被用户中断") except Exception as e: print(f"\n✗ 测试异常: {e}") import traceback traceback.print_exc()