Files
ai_wht_B/测试搜索服务接口性能.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

447 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试搜索服务接口性能并生成CSV报告和接口文档
"""
import requests
import time
import csv
from datetime import datetime
# API配置
API_BASE_URL = "http://127.0.0.1:8321"
USERNAME = "13621242430"
PASSWORD = "admin123"
# 测试结果存储
test_results = []
api_docs = []
def login():
"""登录获取token"""
print("\n" + "="*80)
print("正在登录...")
url = f"{API_BASE_URL}/api/auth/login"
data = {"username": USERNAME, "password": PASSWORD}
start_time = time.time()
response = requests.post(url, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
if response.status_code == 200:
token = response.json()['data']['token']
print(f"✓ 登录成功,耗时: {elapsed_time:.2f}ms")
return token
else:
print(f"✗ 登录失败: {response.status_code}")
return None
def test_root_endpoint():
"""测试根路径接口(重定向)"""
print("\n" + "-"*80)
print("[1/6] 测试根路径接口...")
url = f"{API_BASE_URL}/"
try:
start_time = time.time()
response = requests.get(url, timeout=30, allow_redirects=False)
elapsed_time = (time.time() - start_time) * 1000
# 重定向返回302或301都算成功
success = response.status_code in [200, 301, 302]
error = '' if success else response.text
print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': '/',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': '/',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/',
'method': 'GET',
'description': '根路径接口重定向到dashboard.html',
'auth': '无需认证',
'params': '',
'response': '302重定向到/dashboard.html'
})
def test_health_check():
"""测试健康检查接口"""
print("\n" + "-"*80)
print("[2/6] 测试健康检查接口...")
url = f"{API_BASE_URL}/health"
# 测试多次以获取平均性能
test_count = 5
print(f" 执行{test_count}次测试以获取平均性能...")
times = []
for i in range(test_count):
try:
start_time = time.time()
response = requests.get(url, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
times.append(elapsed_time)
success = response.status_code == 200
print(f" [{i+1}] 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
time.sleep(0.1)
except Exception as e:
print(f" [{i+1}] 失败: {str(e)}")
times.append(0)
# 计算平均耗时
valid_times = [t for t in times if t > 0]
avg_time = sum(valid_times) / len(valid_times) if valid_times else 0
max_time = max(valid_times) if valid_times else 0
min_time = min(valid_times) if valid_times else 0
print(f" 平均耗时: {avg_time:.2f}ms, 最大: {max_time:.2f}ms, 最小: {min_time:.2f}ms")
test_results.append({
'endpoint': '/health',
'method': 'GET',
'status_code': 200 if valid_times else 0,
'elapsed_time_ms': f"{avg_time:.2f}",
'success': len(valid_times) == test_count,
'error': f'测试{test_count}次, 平均耗时{avg_time:.2f}ms'
})
api_docs.append({
'endpoint': '/health',
'method': 'GET',
'description': '健康检查接口(包含数据库连接测试)',
'auth': '无需认证',
'params': '',
'response': '返回服务健康状态、数据库连接状态、时间戳和版本号'
})
def test_get_logs(token):
"""测试获取操作日志接口"""
print("\n" + "-"*80)
print("[3/6] 测试获取操作日志...")
url = f"{API_BASE_URL}/api/logs"
headers = {"Authorization": f"Bearer {token}"}
test_cases = [
{"page": 1, "size": 10, "desc": "第1页"},
{"page": 1, "size": 20, "desc": "每页20条"},
{"page": 2, "size": 10, "desc": "第2页"},
]
for i, params in enumerate(test_cases, 1):
desc = params.pop('desc')
try:
start_time = time.time()
response = requests.get(url, headers=headers, params=params, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" [{i}] {desc}: {response.status_code} - {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/logs ({desc})',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
time.sleep(0.1)
except Exception as e:
print(f" [{i}] {desc}: 失败 - {str(e)}")
test_results.append({
'endpoint': f'/api/logs ({desc})',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/logs',
'method': 'GET',
'description': '获取操作日志(分页查询)',
'auth': '需要认证',
'params': 'page, size',
'response': '返回日志列表含用户信息、操作类型、IP地址等'
})
def test_get_log_files(token):
"""测试获取日志文件列表接口"""
print("\n" + "-"*80)
print("[4/6] 测试获取日志文件列表...")
url = f"{API_BASE_URL}/api/logs/files"
headers = {"Authorization": f"Bearer {token}"}
try:
start_time = time.time()
response = requests.get(url, headers=headers, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': '/api/logs/files',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': '/api/logs/files',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/logs/files',
'method': 'GET',
'description': '获取可用的日志文件列表',
'auth': '需要认证',
'params': '',
'response': '返回日志文件列表(含文件名、大小、修改时间、类型)'
})
def test_get_file_logs(token):
"""测试获取文件日志内容接口"""
print("\n" + "-"*80)
print("[5/6] 测试获取文件日志内容...")
url = f"{API_BASE_URL}/api/logs/file"
headers = {"Authorization": f"Bearer {token}"}
test_cases = [
{"type": "article", "lines": 50, "desc": "article日志50行"},
{"type": "error", "lines": 50, "desc": "error日志50行"},
{"type": "whoosh", "lines": 50, "desc": "whoosh日志50行"},
]
for i, params in enumerate(test_cases, 1):
desc = params.pop('desc')
try:
start_time = time.time()
response = requests.get(url, headers=headers, params=params, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code in [200, 404] # 404也算正常文件不存在
error = '' if success else response.text
print(f" [{i}] {desc}: {response.status_code} - {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/logs/file ({desc})',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
time.sleep(0.1)
except Exception as e:
print(f" [{i}] {desc}: 失败 - {str(e)}")
test_results.append({
'endpoint': f'/api/logs/file ({desc})',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/logs/file',
'method': 'GET',
'description': '获取文件日志内容最后N行',
'auth': '需要认证',
'params': 'type (article/error/whoosh), lines (默认100), date (可选)',
'response': '返回日志文件内容和文件信息'
})
def test_generate_article(token):
"""测试生成文章接口(核心接口)"""
print("\n" + "-"*80)
print("[6/6] 测试生成文章接口...")
url = f"{API_BASE_URL}/api/generate_article"
headers = {"Authorization": f"Bearer {token}"}
# 测试数据
timestamp = int(time.time())
data = {
"title": f"测试文章{timestamp}",
"content": "这是一篇测试文章内容,用于测试接口性能。" * 10,
"tags": "测试,性能,接口"
}
try:
start_time = time.time()
response = requests.post(url, headers=headers, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text[:100]
print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': '/api/generate_article',
'method': 'POST',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': '/api/generate_article',
'method': 'POST',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/generate_article',
'method': 'POST',
'description': '生成/上传文章包含Whoosh标签匹配图片',
'auth': '需要认证',
'params': 'title, content, tags, article_id (可选-更新模式), batch_id (可选)',
'response': '返回文章ID、匹配的图片、部门信息等'
})
def save_to_csv():
"""保存测试结果到CSV文件"""
timestamp = datetime.now().strftime('%Y%m%d')
filename = f'搜索服务接口性能测试_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=[
'endpoint', 'method', 'status_code', 'elapsed_time_ms', 'success', 'error'
])
writer.writeheader()
writer.writerows(test_results)
print(f"\n✓ 性能测试结果已保存到: {filename}")
return filename
def save_api_docs():
"""保存接口文档到CSV文件"""
timestamp = datetime.now().strftime('%Y%m%d')
filename = f'搜索服务接口文档_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=[
'endpoint', 'method', 'description', 'auth', 'params', 'response'
])
writer.writeheader()
writer.writerows(api_docs)
print(f"✓ 接口文档已保存到: {filename}")
return filename
def print_summary():
"""打印测试摘要"""
print("\n" + "="*80)
print("测试摘要")
print("="*80)
total = len(test_results)
success = sum(1 for r in test_results if r['success'])
failed = total - success
times = [float(r['elapsed_time_ms']) for r in test_results if r['elapsed_time_ms'] != '0']
avg_time = sum(times) / len(times) if times else 0
max_time = max(times) if times else 0
min_time = min(times) if times else 0
print(f"总测试数: {total}")
print(f"成功: {success} ({success/total*100:.1f}%)")
print(f"失败: {failed} ({failed/total*100:.1f}%)")
print(f"平均耗时: {avg_time:.2f}ms")
print(f"最大耗时: {max_time:.2f}ms")
print(f"最小耗时: {min_time:.2f}ms")
print("="*80)
def main():
"""主函数"""
print("="*80)
print("搜索服务接口性能测试工具")
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"API地址: {API_BASE_URL}")
print("="*80)
# 登录
token = login()
if not token:
print("\n✗ 登录失败,无法继续测试")
return
# 执行所有测试
test_root_endpoint()
test_health_check()
test_get_logs(token)
test_get_log_files(token)
test_get_file_logs(token)
test_generate_article(token)
# 打印摘要
print_summary()
# 保存结果
csv_file = save_to_csv()
doc_file = save_api_docs()
print(f"\n✓ 测试完成!")
print(f" - 性能数据: {csv_file}")
print(f" - 接口文档: {doc_file}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n✗ 测试被用户中断")
except Exception as e:
print(f"\n✗ 测试异常: {e}")
import traceback
traceback.print_exc()