Files
ai_wht_B/测试日志接口性能.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

298 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试日志管理接口性能并生成CSV报告和接口文档
"""
import requests
import time
import csv
from datetime import datetime
# API配置
API_BASE_URL = "http://127.0.0.1:8216"
USERNAME = "13621242430"
PASSWORD = "admin123"
# 测试结果存储
test_results = []
api_docs = []
def login():
"""登录获取token"""
api_docs.append({
'endpoint': '/api/auth/login',
'method': 'POST',
'description': '用户登录',
'auth': '无需认证',
'params': 'username, password',
'response': '返回token和用户信息'
})
print("\n" + "="*80)
print("正在登录...")
url = f"{API_BASE_URL}/api/auth/login"
data = {"username": USERNAME, "password": PASSWORD}
start_time = time.time()
response = requests.post(url, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
if response.status_code == 200:
token = response.json()['data']['token']
print(f"✓ 登录成功,耗时: {elapsed_time:.2f}ms")
return token
else:
print(f"✗ 登录失败: {response.status_code}")
return None
def test_get_logs(token):
"""测试获取操作日志"""
print("\n" + "-"*80)
print("[1/3] 测试获取操作日志...")
api_docs.append({
'endpoint': '/api/log/logs',
'method': 'GET',
'description': '获取操作日志(分页查询,含用户信息)',
'auth': '需要认证',
'params': 'page, size',
'response': '返回日志列表含用户名、操作类型、目标类型、IP地址、状态'
})
url = f"{API_BASE_URL}/api/log/logs"
headers = {"Authorization": f"Bearer {token}"}
test_cases = [
{"page": 1, "size": 10, "desc": "第1页10条"},
{"page": 1, "size": 20, "desc": "第1页20条"},
{"page": 2, "size": 10, "desc": "第2页10条"},
]
for i, params in enumerate(test_cases, 1):
desc = params.pop('desc')
try:
start_time = time.time()
response = requests.get(url, headers=headers, params=params, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text[:100]
print(f" [{i}] {desc}: {response.status_code} - {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/log/logs ({desc})',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
time.sleep(0.1)
except Exception as e:
print(f" [{i}] {desc}: 失败 - {str(e)}")
test_results.append({
'endpoint': f'/api/log/logs ({desc})',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
def test_get_log_files(token):
"""测试获取日志文件列表"""
print("\n" + "-"*80)
print("[2/3] 测试获取日志文件列表...")
api_docs.append({
'endpoint': '/api/log/logs/files',
'method': 'GET',
'description': '获取可用的日志文件列表',
'auth': '需要认证',
'params': '',
'response': '返回日志文件列表(含文件名、大小、修改时间、类型)'
})
url = f"{API_BASE_URL}/api/log/logs/files"
headers = {"Authorization": f"Bearer {token}"}
try:
start_time = time.time()
response = requests.get(url, headers=headers, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text[:100]
print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': '/api/log/logs/files',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': '/api/log/logs/files',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
def test_get_file_logs(token):
"""测试获取文件日志内容"""
print("\n" + "-"*80)
print("[3/3] 测试获取文件日志内容...")
api_docs.append({
'endpoint': '/api/log/logs/file',
'method': 'GET',
'description': '获取文件日志内容最后N行',
'auth': '需要认证',
'params': 'type (article/error/whoosh), lines (默认100), date (可选)',
'response': '返回日志文件内容、文件信息(大小、修改时间、总行数)'
})
url = f"{API_BASE_URL}/api/log/logs/file"
headers = {"Authorization": f"Bearer {token}"}
test_cases = [
{"type": "article", "lines": 50, "desc": "article日志50行"},
{"type": "error", "lines": 50, "desc": "error日志50行"},
{"type": "article", "lines": 100, "desc": "article日志100行"},
]
for i, params in enumerate(test_cases, 1):
desc = params.pop('desc')
try:
start_time = time.time()
response = requests.get(url, headers=headers, params=params, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code in [200, 404] # 404也算正常文件不存在
error = '' if success else response.text[:100]
print(f" [{i}] {desc}: {response.status_code} - {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/log/logs/file ({desc})',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
time.sleep(0.1)
except Exception as e:
print(f" [{i}] {desc}: 失败 - {str(e)}")
test_results.append({
'endpoint': f'/api/log/logs/file ({desc})',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
def save_to_csv():
"""保存测试结果到CSV文件"""
timestamp = datetime.now().strftime('%Y%m%d')
filename = f'日志接口性能测试_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=[
'endpoint', 'method', 'status_code', 'elapsed_time_ms', 'success', 'error'
])
writer.writeheader()
writer.writerows(test_results)
print(f"\n✓ 性能测试结果已保存到: {filename}")
return filename
def save_api_docs():
"""保存接口文档到CSV文件"""
timestamp = datetime.now().strftime('%Y%m%d')
filename = f'日志接口文档_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=[
'endpoint', 'method', 'description', 'auth', 'params', 'response'
])
writer.writeheader()
writer.writerows(api_docs)
print(f"✓ 接口文档已保存到: {filename}")
return filename
def print_summary():
"""打印测试摘要"""
print("\n" + "="*80)
print("测试摘要")
print("="*80)
total = len(test_results)
success = sum(1 for r in test_results if r['success'])
failed = total - success
times = [float(r['elapsed_time_ms']) for r in test_results if r['elapsed_time_ms'] != '0']
avg_time = sum(times) / len(times) if times else 0
max_time = max(times) if times else 0
min_time = min(times) if times else 0
print(f"总测试数: {total}")
print(f"成功: {success} ({success/total*100:.1f}%)")
print(f"失败: {failed} ({failed/total*100:.1f}%)")
print(f"平均耗时: {avg_time:.2f}ms")
print(f"最大耗时: {max_time:.2f}ms")
print(f"最小耗时: {min_time:.2f}ms")
print("="*80)
def main():
"""主函数"""
print("="*80)
print("日志管理接口性能测试工具")
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"API地址: {API_BASE_URL}")
print("="*80)
# 登录
token = login()
if not token:
print("\n✗ 登录失败,无法继续测试")
return
# 执行所有测试
test_get_logs(token)
test_get_log_files(token)
test_get_file_logs(token)
# 打印摘要
print_summary()
# 保存结果
csv_file = save_to_csv()
doc_file = save_api_docs()
print(f"\n✓ 测试完成!")
print(f" - 性能数据: {csv_file}")
print(f" - 接口文档: {doc_file}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n✗ 测试被用户中断")
except Exception as e:
print(f"\n✗ 测试异常: {e}")
import traceback
traceback.print_exc()