Files
ai_wht_B/测试工作台接口性能.py

302 lines
9.8 KiB
Python
Raw Permalink Normal View History

2026-01-06 14:18:39 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试工作台接口性能并生成CSV报告和接口文档
"""
import requests
import time
import csv
from datetime import datetime
# API配置
API_BASE_URL = "http://127.0.0.1:8216"
USERNAME = "13621242430"
PASSWORD = "admin123"
# 测试结果存储
test_results = []
api_docs = []
def login():
"""登录获取token"""
print("\n" + "="*80)
print("正在登录...")
url = f"{API_BASE_URL}/api/auth/login"
data = {"username": USERNAME, "password": PASSWORD}
start_time = time.time()
response = requests.post(url, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
if response.status_code == 200:
token = response.json()['data']['token']
print(f"✓ 登录成功,耗时: {elapsed_time:.2f}ms")
return token
else:
print(f"✗ 登录失败: {response.status_code}")
return None
def test_get_dashboard_overview(token):
"""测试获取工作台概览"""
print("\n" + "-"*80)
print("[1/3] 测试获取工作台概览...")
url = f"{API_BASE_URL}/api/dashboard/overview"
headers = {"Authorization": f"Bearer {token}"}
# 测试多次以获取稳定的性能数据
test_count = 5
print(f" 执行{test_count}次测试以获取平均性能...")
times = []
for i in range(test_count):
try:
start_time = time.time()
response = requests.get(url, headers=headers, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
times.append(elapsed_time)
success = response.status_code == 200
error = '' if success else response.text
print(f" [{i+1}] 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
time.sleep(0.1)
except Exception as e:
print(f" [{i+1}] 失败: {str(e)}")
times.append(0)
# 计算平均耗时
valid_times = [t for t in times if t > 0]
avg_time = sum(valid_times) / len(valid_times) if valid_times else 0
max_time = max(valid_times) if valid_times else 0
min_time = min(valid_times) if valid_times else 0
print(f" 平均耗时: {avg_time:.2f}ms, 最大: {max_time:.2f}ms, 最小: {min_time:.2f}ms")
test_results.append({
'endpoint': '/api/dashboard/overview',
'method': 'GET',
'status_code': 200 if valid_times else 0,
'elapsed_time_ms': f"{avg_time:.2f}",
'success': len(valid_times) == test_count,
'error': f'测试{test_count}次, 平均耗时{avg_time:.2f}ms'
})
api_docs.append({
'endpoint': '/api/dashboard/overview',
'method': 'GET',
'description': '获取工作台概览(企业统计数据、本月发布数、增长率)',
'auth': '需要认证',
'params': '',
'response': '返回用户总数、产品总数、文章总数、本月发布数及增长率'
})
def test_get_recent_publishes(token):
"""测试获取最近发布"""
print("\n" + "-"*80)
print("[2/3] 测试获取最近发布...")
url = f"{API_BASE_URL}/api/dashboard/recent-publishes"
headers = {"Authorization": f"Bearer {token}"}
# 测试不同的limit参数
test_cases = [
{"limit": 5, "desc": "默认5条"},
{"limit": 10, "desc": "10条记录"},
{"limit": 20, "desc": "20条记录"},
]
for i, params in enumerate(test_cases, 1):
desc = params.pop('desc')
try:
start_time = time.time()
response = requests.get(url, headers=headers, params=params, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" [{i}] {desc}: {response.status_code} - {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/dashboard/recent-publishes ({desc})',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
time.sleep(0.1)
except Exception as e:
print(f" [{i}] {desc}: 失败 - {str(e)}")
test_results.append({
'endpoint': f'/api/dashboard/recent-publishes ({desc})',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/dashboard/recent-publishes',
'method': 'GET',
'description': '获取最近发布记录(按时间倒序)',
'auth': '需要认证',
'params': 'limit (默认5)',
'response': '返回发布记录列表(含员工名、产品名、状态、相对时间)'
})
def test_get_hot_products(token):
"""测试获取热门产品"""
print("\n" + "-"*80)
print("[3/3] 测试获取热门产品...")
url = f"{API_BASE_URL}/api/dashboard/hot-products"
headers = {"Authorization": f"Bearer {token}"}
# 测试不同的limit参数
test_cases = [
{"limit": 4, "desc": "默认4个"},
{"limit": 8, "desc": "8个产品"},
{"limit": 12, "desc": "12个产品"},
]
for i, params in enumerate(test_cases, 1):
desc = params.pop('desc')
try:
start_time = time.time()
response = requests.get(url, headers=headers, params=params, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" [{i}] {desc}: {response.status_code} - {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/dashboard/hot-products ({desc})',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
time.sleep(0.1)
except Exception as e:
print(f" [{i}] {desc}: 失败 - {str(e)}")
test_results.append({
'endpoint': f'/api/dashboard/hot-products ({desc})',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/dashboard/hot-products',
'method': 'GET',
'description': '获取热门产品(按发布次数排序)',
'auth': '需要认证',
'params': 'limit (默认4)',
'response': '返回产品列表(含产品名、发布次数、占比百分比)'
})
def save_to_csv():
"""保存测试结果到CSV文件"""
timestamp = datetime.now().strftime('%Y%m%d')
filename = f'工作台接口性能测试_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=[
'endpoint', 'method', 'status_code', 'elapsed_time_ms', 'success', 'error'
])
writer.writeheader()
writer.writerows(test_results)
print(f"\n✓ 性能测试结果已保存到: {filename}")
return filename
def save_api_docs():
"""保存接口文档到CSV文件"""
timestamp = datetime.now().strftime('%Y%m%d')
filename = f'工作台接口文档_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=[
'endpoint', 'method', 'description', 'auth', 'params', 'response'
])
writer.writeheader()
writer.writerows(api_docs)
print(f"✓ 接口文档已保存到: {filename}")
return filename
def print_summary():
"""打印测试摘要"""
print("\n" + "="*80)
print("测试摘要")
print("="*80)
total = len(test_results)
success = sum(1 for r in test_results if r['success'])
failed = total - success
times = [float(r['elapsed_time_ms']) for r in test_results if r['elapsed_time_ms'] != '0']
avg_time = sum(times) / len(times) if times else 0
max_time = max(times) if times else 0
min_time = min(times) if times else 0
print(f"总测试数: {total}")
print(f"成功: {success} ({success/total*100:.1f}%)")
print(f"失败: {failed} ({failed/total*100:.1f}%)")
print(f"平均耗时: {avg_time:.2f}ms")
print(f"最大耗时: {max_time:.2f}ms")
print(f"最小耗时: {min_time:.2f}ms")
print("="*80)
def main():
"""主函数"""
print("="*80)
print("工作台接口性能测试工具")
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"API地址: {API_BASE_URL}")
print("="*80)
# 登录
token = login()
if not token:
print("\n✗ 登录失败,无法继续测试")
return
# 执行所有测试
test_get_dashboard_overview(token)
test_get_recent_publishes(token)
test_get_hot_products(token)
# 打印摘要
print_summary()
# 保存结果
csv_file = save_to_csv()
doc_file = save_api_docs()
print(f"\n✓ 测试完成!")
print(f" - 性能数据: {csv_file}")
print(f" - 接口文档: {doc_file}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n✗ 测试被用户中断")
except Exception as e:
print(f"\n✗ 测试异常: {e}")
import traceback
traceback.print_exc()