Files
ai_wht_B/测试员工接口性能.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

372 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试员工接口性能并生成CSV报告和接口文档
"""
import requests
import time
import csv
from datetime import datetime
# API配置
API_BASE_URL = "http://127.0.0.1:8216"
USERNAME = "13621242430"
PASSWORD = "admin123"
# 测试结果存储
test_results = []
api_docs = []
def login():
"""登录获取token"""
print("\n" + "="*80)
print("正在登录...")
url = f"{API_BASE_URL}/api/auth/login"
data = {"username": USERNAME, "password": PASSWORD}
start_time = time.time()
response = requests.post(url, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
if response.status_code == 200:
token = response.json()['data']['token']
print(f"✓ 登录成功,耗时: {elapsed_time:.2f}ms")
return token
else:
print(f"✗ 登录失败: {response.status_code}")
return None
def test_get_employees_list(token):
"""测试获取员工列表"""
print("\n" + "-"*80)
print("[1/4] 测试获取员工列表...")
url = f"{API_BASE_URL}/api/employees/list"
headers = {"Authorization": f"Bearer {token}"}
# 测试不同的查询参数组合
test_cases = [
{"page": 1, "pageSize": 20, "desc": "基本查询"},
{"page": 1, "pageSize": 10, "desc": "每页10条"},
{"page": 1, "pageSize": 20, "keyword": "员工", "desc": "关键词搜索"},
{"page": 1, "pageSize": 20, "status": "active", "desc": "状态筛选"},
{"page": 2, "pageSize": 20, "desc": "第2页"},
]
for i, params in enumerate(test_cases, 1):
desc = params.pop('desc')
try:
start_time = time.time()
response = requests.get(url, headers=headers, params=params, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" [{i}] {desc}: {response.status_code} - {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/employees/list ({desc})',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
time.sleep(0.1)
except Exception as e:
print(f" [{i}] {desc}: 失败 - {str(e)}")
test_results.append({
'endpoint': f'/api/employees/list ({desc})',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/employees/list',
'method': 'GET',
'description': '获取员工列表(分页查询)',
'auth': '需要认证',
'params': 'page, pageSize, keyword, status, isBoundXHS',
'response': '返回员工列表(含姓名、手机、角色、绑定状态等)'
})
def test_get_employees_stats(token):
"""测试获取员工统计"""
print("\n" + "-"*80)
print("[2/4] 测试获取员工统计...")
url = f"{API_BASE_URL}/api/employees/stats"
headers = {"Authorization": f"Bearer {token}"}
# 测试多次以获取平均性能
test_count = 5
print(f" 执行{test_count}次测试以获取平均性能...")
times = []
for i in range(test_count):
try:
start_time = time.time()
response = requests.get(url, headers=headers, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
times.append(elapsed_time)
success = response.status_code == 200
print(f" [{i+1}] 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
time.sleep(0.1)
except Exception as e:
print(f" [{i+1}] 失败: {str(e)}")
times.append(0)
# 计算平均耗时
valid_times = [t for t in times if t > 0]
avg_time = sum(valid_times) / len(valid_times) if valid_times else 0
max_time = max(valid_times) if valid_times else 0
min_time = min(valid_times) if valid_times else 0
print(f" 平均耗时: {avg_time:.2f}ms, 最大: {max_time:.2f}ms, 最小: {min_time:.2f}ms")
test_results.append({
'endpoint': '/api/employees/stats',
'method': 'GET',
'status_code': 200 if valid_times else 0,
'elapsed_time_ms': f"{avg_time:.2f}",
'success': len(valid_times) == test_count,
'error': f'测试{test_count}次, 平均耗时{avg_time:.2f}ms'
})
api_docs.append({
'endpoint': '/api/employees/stats',
'method': 'GET',
'description': '获取员工统计(总数、激活数、绑定小红书数)',
'auth': '需要认证',
'params': '',
'response': '返回员工统计数据total_count, active_count, bound_xhs_count'
})
def test_add_employee(token):
"""测试添加员工"""
print("\n" + "-"*80)
print("[3/4] 测试添加员工...")
# 先添加文档
api_docs.append({
'endpoint': '/api/employees/add',
'method': 'POST',
'description': '添加员工',
'auth': '需要认证+角色(enterprise)',
'params': 'name, phone, password, role, department',
'response': '返回新创建的员工ID'
})
url = f"{API_BASE_URL}/api/employees/add"
headers = {"Authorization": f"Bearer {token}"}
# 生成唯一的测试数据
timestamp = int(time.time())
data = {
"name": f"测试员工{timestamp}",
"phone": f"1382{timestamp % 10000000:07d}",
"password": "test123456",
"role": "employee",
"department": "测试部门"
}
try:
start_time = time.time()
response = requests.post(url, headers=headers, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': '/api/employees/add',
'method': 'POST',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
# 保存创建的员工ID用于后续删除测试
created_employee_id = None
if success:
created_employee_id = response.json().get('data', {}).get('id')
print(f" ✓ 创建成功员工ID: {created_employee_id}")
return created_employee_id
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': '/api/employees/add',
'method': 'POST',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
return None
def test_delete_employee(token, employee_id):
"""测试删除员工"""
print("\n" + "-"*80)
print("[4/4] 测试删除员工...")
# 先添加文档
api_docs.append({
'endpoint': '/api/employees/<employee_id>',
'method': 'DELETE',
'description': '删除员工(软删除)',
'auth': '需要认证+角色(enterprise)',
'params': 'employee_id (路径参数)',
'response': '返回删除成功消息'
})
if not employee_id:
print(" 跳过没有可用的员工ID")
test_results.append({
'endpoint': '/api/employees/<id> (DELETE)',
'method': 'DELETE',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': '没有可用的员工ID'
})
return
url = f"{API_BASE_URL}/api/employees/{employee_id}"
headers = {"Authorization": f"Bearer {token}"}
try:
start_time = time.time()
response = requests.delete(url, headers=headers, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 员工ID: {employee_id}, 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/employees/{employee_id} (DELETE)',
'method': 'DELETE',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': f'/api/employees/{employee_id} (DELETE)',
'method': 'DELETE',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
def save_to_csv():
"""保存测试结果到CSV文件"""
timestamp = datetime.now().strftime('%Y%m%d')
filename = f'员工接口性能测试_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=[
'endpoint', 'method', 'status_code', 'elapsed_time_ms', 'success', 'error'
])
writer.writeheader()
writer.writerows(test_results)
print(f"\n✓ 性能测试结果已保存到: {filename}")
return filename
def save_api_docs():
"""保存接口文档到CSV文件"""
timestamp = datetime.now().strftime('%Y%m%d')
filename = f'员工接口文档_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=[
'endpoint', 'method', 'description', 'auth', 'params', 'response'
])
writer.writeheader()
writer.writerows(api_docs)
print(f"✓ 接口文档已保存到: {filename}")
return filename
def print_summary():
"""打印测试摘要"""
print("\n" + "="*80)
print("测试摘要")
print("="*80)
total = len(test_results)
success = sum(1 for r in test_results if r['success'])
failed = total - success
times = [float(r['elapsed_time_ms']) for r in test_results if r['elapsed_time_ms'] != '0']
avg_time = sum(times) / len(times) if times else 0
max_time = max(times) if times else 0
min_time = min(times) if times else 0
print(f"总测试数: {total}")
print(f"成功: {success} ({success/total*100:.1f}%)")
print(f"失败: {failed} ({failed/total*100:.1f}%)")
print(f"平均耗时: {avg_time:.2f}ms")
print(f"最大耗时: {max_time:.2f}ms")
print(f"最小耗时: {min_time:.2f}ms")
print("="*80)
def main():
"""主函数"""
print("="*80)
print("员工接口性能测试工具")
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"API地址: {API_BASE_URL}")
print("="*80)
# 登录
token = login()
if not token:
print("\n✗ 登录失败,无法继续测试")
return
# 执行所有测试
test_get_employees_list(token)
test_get_employees_stats(token)
created_employee_id = test_add_employee(token)
test_delete_employee(token, created_employee_id)
# 打印摘要
print_summary()
# 保存结果
csv_file = save_to_csv()
doc_file = save_api_docs()
print(f"\n✓ 测试完成!")
print(f" - 性能数据: {csv_file}")
print(f" - 接口文档: {doc_file}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n✗ 测试被用户中断")
except Exception as e:
print(f"\n✗ 测试异常: {e}")
import traceback
traceback.print_exc()