Files
ai_wht_B/测试作者接口性能.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

656 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试作者接口性能并生成CSV报告和接口文档
"""
import requests
import time
import csv
from datetime import datetime
# API配置
API_BASE_URL = "http://127.0.0.1:8216"
USERNAME = "13621242430"
PASSWORD = "admin123"
# 测试结果存储
test_results = []
api_docs = []
def login():
"""登录获取token"""
print("\n" + "="*80)
print("正在登录...")
url = f"{API_BASE_URL}/api/auth/login"
data = {"username": USERNAME, "password": PASSWORD}
start_time = time.time()
response = requests.post(url, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
if response.status_code == 200:
token = response.json()['data']['token']
print(f"✓ 登录成功,耗时: {elapsed_time:.2f}ms")
return token
else:
print(f"✗ 登录失败: {response.status_code}")
return None
def test_get_authors(token):
"""测试获取作者列表接口(分页,按科室分组)"""
print("\n" + "-"*80)
print("[1/9] 测试获取作者列表(分页)...")
url = f"{API_BASE_URL}/api/authors"
headers = {"Authorization": f"Bearer {token}"}
# 测试用例1: 基本查询
test_cases = [
{"page": 1, "size": 10, "desc": "第1页"},
{"page": 1, "size": 5, "desc": "每页5条"},
{"page": 1, "size": 10, "search": "医生", "desc": "搜索关键词"},
]
for i, params in enumerate(test_cases, 1):
desc = params.pop('desc')
try:
start_time = time.time()
response = requests.get(url, headers=headers, params=params, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" [{i}] {desc}: {response.status_code} - {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/authors ({desc})',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
time.sleep(0.1)
except Exception as e:
print(f" [{i}] {desc}: 失败 - {str(e)}")
test_results.append({
'endpoint': f'/api/authors ({desc})',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/authors',
'method': 'GET',
'description': '获取作者列表(分页,按科室分组)',
'auth': '需要认证',
'params': 'page, size, search, department, status, channel',
'response': '返回按科室分组的作者列表'
})
def test_get_list_authors(token):
"""测试获取作者列表(基于用户关联)"""
print("\n" + "-"*80)
print("[2/9] 测试获取作者列表(用户关联)...")
url = f"{API_BASE_URL}/api/authors/list"
headers = {"Authorization": f"Bearer {token}"}
try:
start_time = time.time()
response = requests.get(url, headers=headers, params={"page": 1, "size": 10}, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': '/api/authors/list',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': '/api/authors/list',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/authors/list',
'method': 'GET',
'description': '获取作者列表(基于用户关联关系,按科室分组)',
'auth': '需要认证',
'params': 'page, size, search, department, status, channel',
'response': '返回当前用户关联的作者列表'
})
def test_get_detail_list_authors(token):
"""测试获取作者详细列表"""
print("\n" + "-"*80)
print("[3/9] 测试获取作者详细列表...")
url = f"{API_BASE_URL}/api/authors/detail_list"
headers = {"Authorization": f"Bearer {token}"}
test_cases = [
{"page": 1, "size": 10, "desc": "基本查询"},
{"page": 1, "size": 20, "search": "医生", "desc": "搜索查询"},
]
for i, params in enumerate(test_cases, 1):
desc = params.pop('desc')
try:
start_time = time.time()
response = requests.get(url, headers=headers, params=params, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" [{i}] {desc}: {response.status_code} - {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/authors/detail_list ({desc})',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
time.sleep(0.1)
except Exception as e:
print(f" [{i}] {desc}: 失败 - {str(e)}")
test_results.append({
'endpoint': f'/api/authors/detail_list ({desc})',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/authors/detail_list',
'method': 'GET',
'description': '获取作者详细列表直接查询不返回app_token',
'auth': '需要认证',
'params': 'page, size, search, department, status, channel',
'response': '返回作者详细信息列表'
})
def test_get_author_detail(token):
"""测试获取单个作者详情"""
print("\n" + "-"*80)
print("[4/9] 测试获取作者详情...")
# 先获取一个作者ID
list_url = f"{API_BASE_URL}/api/authors/detail_list"
headers = {"Authorization": f"Bearer {token}"}
list_response = requests.get(list_url, headers=headers, params={"page": 1, "size": 1}, timeout=30)
author_id = None
if list_response.status_code == 200:
authors = list_response.json().get('data', {}).get('list', [])
if authors:
author_id = authors[0]['author_id']
if author_id:
url = f"{API_BASE_URL}/api/authors/{author_id}"
try:
start_time = time.time()
response = requests.get(url, headers=headers, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 作者ID: {author_id}, 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/authors/{author_id}',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': f'/api/authors/{author_id}',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
else:
print(" 跳过:没有找到作者")
test_results.append({
'endpoint': '/api/authors/<id>',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': '没有找到作者'
})
api_docs.append({
'endpoint': '/api/authors/<author_id>',
'method': 'GET',
'description': '获取单个作者详情',
'auth': '需要认证',
'params': 'author_id (路径参数)',
'response': '返回完整的作者信息'
})
def test_create_author(token):
"""测试创建作者"""
print("\n" + "-"*80)
print("[5/9] 测试创建作者...")
# 先添加文档
api_docs.append({
'endpoint': '/api/authors',
'method': 'POST',
'description': '创建新作者',
'auth': '需要认证+角色(admin/editor)',
'params': 'author_name, app_id, app_token, department_name, title, hospital等',
'response': '返回新创建的作者信息'
})
url = f"{API_BASE_URL}/api/authors"
headers = {"Authorization": f"Bearer {token}"}
# 生成唯一的测试数据
timestamp = int(time.time())
data = {
"author_name": f"测试医生{timestamp}",
"app_id": f"test_app_{timestamp}",
"app_token": f"test_token_{timestamp}",
"department_name": "测试科室",
"title": "主任医师",
"hospital": "测试医院",
"channel": 1
}
try:
start_time = time.time()
response = requests.post(url, headers=headers, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 201
error = '' if success else response.text
print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': '/api/authors (创建)',
'method': 'POST',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
# 保存创建的作者ID用于后续测试
created_author_id = None
if success:
created_author_id = response.json().get('data', {}).get('id')
print(f" ✓ 创建成功作者ID: {created_author_id}")
return created_author_id
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': '/api/authors (创建)',
'method': 'POST',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
return None
def test_update_author(token, author_id):
"""测试更新作者"""
print("\n" + "-"*80)
print("[6/9] 测试更新作者...")
# 先添加文档
api_docs.append({
'endpoint': '/api/authors/<author_id>',
'method': 'PUT',
'description': '更新作者信息',
'auth': '需要认证+角色(admin/editor)',
'params': 'author_id (路径), author_name, department_name等',
'response': '返回更新后的作者信息'
})
if not author_id:
print(" 跳过没有可用的作者ID")
test_results.append({
'endpoint': '/api/authors/<id> (PUT)',
'method': 'PUT',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': '没有可用的作者ID'
})
return
url = f"{API_BASE_URL}/api/authors/{author_id}"
headers = {"Authorization": f"Bearer {token}"}
data = {
"author_name": f"更新测试医生{int(time.time())}",
"department_name": "更新测试科室",
"title": "副主任医师",
"hospital": "更新测试医院"
}
try:
start_time = time.time()
response = requests.put(url, headers=headers, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 作者ID: {author_id}, 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/authors/{author_id} (PUT)',
'method': 'PUT',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': f'/api/authors/{author_id} (PUT)',
'method': 'PUT',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
def test_get_departments(token):
"""测试获取科室列表"""
print("\n" + "-"*80)
print("[7/9] 测试获取科室列表...")
url = f"{API_BASE_URL}/api/authors/departments"
headers = {"Authorization": f"Bearer {token}"}
try:
start_time = time.time()
response = requests.get(url, headers=headers, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': '/api/authors/departments',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': '/api/authors/departments',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/authors/departments',
'method': 'GET',
'description': '获取所有作者科室列表',
'auth': '需要认证',
'params': '',
'response': '返回科室名称数组'
})
def test_search_authors(token):
"""测试搜索作者"""
print("\n" + "-"*80)
print("[8/9] 测试搜索作者...")
url = f"{API_BASE_URL}/api/authors/search"
headers = {"Authorization": f"Bearer {token}"}
test_cases = [
{"keyword": "医生", "limit": 10, "desc": "搜索医生"},
{"keyword": "科室", "limit": 5, "desc": "搜索科室"},
]
for i, params in enumerate(test_cases, 1):
desc = params.pop('desc')
try:
start_time = time.time()
response = requests.get(url, headers=headers, params=params, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" [{i}] {desc}: {response.status_code} - {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/authors/search ({desc})',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
time.sleep(0.1)
except Exception as e:
print(f" [{i}] {desc}: 失败 - {str(e)}")
test_results.append({
'endpoint': f'/api/authors/search ({desc})',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/authors/search',
'method': 'GET',
'description': '搜索作者(用于下拉选择等)',
'auth': '需要认证',
'params': 'keyword, limit',
'response': '返回匹配的作者列表'
})
def test_delete_author(token, author_id):
"""测试删除作者"""
print("\n" + "-"*80)
print("[9/9] 测试删除作者...")
# 先添加文档
api_docs.append({
'endpoint': '/api/authors/<author_id>',
'method': 'DELETE',
'description': '删除作者',
'auth': '需要认证+角色(admin)',
'params': 'author_id (路径参数)',
'response': '返回删除成功消息'
})
if not author_id:
print(" 跳过没有可用的作者ID")
test_results.append({
'endpoint': '/api/authors/<id> (DELETE)',
'method': 'DELETE',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': '没有可用的作者ID'
})
return
url = f"{API_BASE_URL}/api/authors/{author_id}"
headers = {"Authorization": f"Bearer {token}"}
try:
start_time = time.time()
response = requests.delete(url, headers=headers, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 作者ID: {author_id}, 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/authors/{author_id} (DELETE)',
'method': 'DELETE',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': f'/api/authors/{author_id} (DELETE)',
'method': 'DELETE',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
def save_to_csv():
"""保存测试结果到CSV文件"""
timestamp = datetime.now().strftime('%Y%m%d')
filename = f'作者接口性能测试_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=[
'endpoint', 'method', 'status_code', 'elapsed_time_ms', 'success', 'error'
])
writer.writeheader()
writer.writerows(test_results)
print(f"\n✓ 性能测试结果已保存到: {filename}")
return filename
def save_api_docs():
"""保存接口文档到CSV文件"""
timestamp = datetime.now().strftime('%Y%m%d')
filename = f'作者接口文档_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=[
'endpoint', 'method', 'description', 'auth', 'params', 'response'
])
writer.writeheader()
writer.writerows(api_docs)
print(f"✓ 接口文档已保存到: {filename}")
return filename
def print_summary():
"""打印测试摘要"""
print("\n" + "="*80)
print("测试摘要")
print("="*80)
total = len(test_results)
success = sum(1 for r in test_results if r['success'])
failed = total - success
times = [float(r['elapsed_time_ms']) for r in test_results if r['elapsed_time_ms'] != '0']
avg_time = sum(times) / len(times) if times else 0
max_time = max(times) if times else 0
min_time = min(times) if times else 0
print(f"总测试数: {total}")
print(f"成功: {success} ({success/total*100:.1f}%)")
print(f"失败: {failed} ({failed/total*100:.1f}%)")
print(f"平均耗时: {avg_time:.2f}ms")
print(f"最大耗时: {max_time:.2f}ms")
print(f"最小耗时: {min_time:.2f}ms")
print("="*80)
def main():
"""主函数"""
print("="*80)
print("作者接口性能测试工具")
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"API地址: {API_BASE_URL}")
print("="*80)
# 登录
token = login()
if not token:
print("\n✗ 登录失败,无法继续测试")
return
# 执行所有测试
test_get_authors(token)
test_get_list_authors(token)
test_get_detail_list_authors(token)
test_get_author_detail(token)
created_author_id = test_create_author(token)
test_update_author(token, created_author_id)
test_get_departments(token)
test_search_authors(token)
test_delete_author(token, created_author_id)
# 打印摘要
print_summary()
# 保存结果
csv_file = save_to_csv()
doc_file = save_api_docs()
print(f"\n✓ 测试完成!")
print(f" - 性能数据: {csv_file}")
print(f" - 接口文档: {doc_file}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n✗ 测试被用户中断")
except Exception as e:
print(f"\n✗ 测试异常: {e}")
import traceback
traceback.print_exc()