Files
ai_wht_B/测试文章接口性能.py

556 lines
18 KiB
Python
Raw Permalink Normal View History

2026-01-06 14:18:39 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试文章接口性能并生成CSV报告和接口文档
"""
import requests
import time
import csv
from datetime import datetime
import json
# API配置
API_BASE_URL = "http://127.0.0.1:8216"
USERNAME = "13621242430"
PASSWORD = "admin123"
# 测试结果存储
test_results = []
api_docs = []
def login():
"""登录获取token"""
print("\n" + "="*80)
print("正在登录...")
url = f"{API_BASE_URL}/api/auth/login"
data = {"username": USERNAME, "password": PASSWORD}
start_time = time.time()
response = requests.post(url, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
if response.status_code == 200:
token = response.json()['data']['token']
print(f"✓ 登录成功,耗时: {elapsed_time:.2f}ms")
# 记录登录接口
test_results.append({
'endpoint': '/api/auth/login',
'method': 'POST',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': True,
'error': ''
})
return token
else:
print(f"✗ 登录失败: {response.status_code}")
test_results.append({
'endpoint': '/api/auth/login',
'method': 'POST',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': False,
'error': response.text
})
return None
def test_get_articles_list(token):
"""测试获取文章列表接口"""
print("\n" + "-"*80)
print("[1/7] 测试获取文章列表...")
url = f"{API_BASE_URL}/api/articles/list"
headers = {"Authorization": f"Bearer {token}"}
# 测试不同的查询参数组合
test_cases = [
{"page": 1, "pageSize": 20, "desc": "基本查询"},
{"page": 1, "pageSize": 10, "keyword": "文章", "desc": "关键词搜索"},
{"page": 2, "pageSize": 20, "desc": "第二页"},
]
for i, params in enumerate(test_cases, 1):
desc = params.pop('desc')
try:
start_time = time.time()
response = requests.get(url, headers=headers, params=params, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" [{i}] {desc}: {response.status_code} - {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/articles/list ({desc})',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
time.sleep(0.1)
except Exception as e:
print(f" [{i}] {desc}: 失败 - {str(e)}")
test_results.append({
'endpoint': f'/api/articles/list ({desc})',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
# 添加接口文档
api_docs.append({
'endpoint': '/api/articles/list',
'method': 'GET',
'description': '获取文章列表(聚合图片和标签)',
'auth': '需要认证',
'params': 'page, pageSize, keyword, product_id, status',
'response': '返回文章列表,包含图片和标签信息'
})
def test_get_articles_dashboard(token):
"""测试获取文章仪表盘统计"""
print("\n" + "-"*80)
print("[2/7] 测试获取文章仪表盘统计...")
url = f"{API_BASE_URL}/api/articles/list_dashboard"
headers = {"Authorization": f"Bearer {token}"}
try:
start_time = time.time()
response = requests.get(url, headers=headers, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': '/api/articles/list_dashboard',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': '/api/articles/list_dashboard',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/articles/list_dashboard',
'method': 'GET',
'description': '获取文章仪表盘统计(总数、可发、已发)',
'auth': '需要认证',
'params': '',
'response': 'articles_total, articles_available, articles_published'
})
def test_generate_article(token):
"""测试生成文案接口"""
print("\n" + "-"*80)
print("[3/7] 测试生成文案...")
url = f"{API_BASE_URL}/api/articles/generate"
headers = {"Authorization": f"Bearer {token}"}
# 测试数据(根据实际数据库调整)
data = {
"product_id": 1,
"prompt_workflow_id": 1,
"topics": ["测试主题1", "测试主题2"],
"count": 2
}
try:
start_time = time.time()
response = requests.post(url, headers=headers, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': '/api/articles/generate',
'method': 'POST',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': '/api/articles/generate',
'method': 'POST',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/articles/generate',
'method': 'POST',
'description': '生成文案(批量生成文章)',
'auth': '需要认证',
'params': 'product_id, prompt_workflow_id, topics, count',
'response': '返回生成的文章ID和标题列表'
})
def test_get_article_detail(token):
"""测试获取文章详情"""
print("\n" + "-"*80)
print("[4/7] 测试获取文章详情...")
# 先获取一个文章ID
list_url = f"{API_BASE_URL}/api/articles/list"
headers = {"Authorization": f"Bearer {token}"}
list_response = requests.get(list_url, headers=headers, params={"page": 1, "pageSize": 1}, timeout=30)
article_id = None
if list_response.status_code == 200:
articles = list_response.json().get('data', {}).get('list', [])
if articles:
article_id = articles[0]['id']
if article_id:
url = f"{API_BASE_URL}/api/articles/{article_id}"
try:
start_time = time.time()
response = requests.get(url, headers=headers, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 文章ID: {article_id}, 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/articles/{article_id}',
'method': 'GET',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': f'/api/articles/{article_id}',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
else:
print(" 跳过:没有找到文章")
test_results.append({
'endpoint': '/api/articles/<id>',
'method': 'GET',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': '没有找到文章'
})
api_docs.append({
'endpoint': '/api/articles/<article_id>',
'method': 'GET',
'description': '获取文章详情(包含图片、标签、发布记录)',
'auth': '需要认证',
'params': 'article_id (路径参数)',
'response': '完整的文章信息包括images、tags、publish_records'
})
def test_update_article(token):
"""测试更新文章"""
print("\n" + "-"*80)
print("[5/7] 测试更新文章...")
# 先获取一个文章ID
list_url = f"{API_BASE_URL}/api/articles/list"
headers = {"Authorization": f"Bearer {token}"}
list_response = requests.get(list_url, headers=headers, params={"page": 1, "pageSize": 1}, timeout=30)
article_id = None
if list_response.status_code == 200:
articles = list_response.json().get('data', {}).get('list', [])
if articles:
article_id = articles[0]['id']
if article_id:
url = f"{API_BASE_URL}/api/articles/{article_id}"
data = {
"title": f"测试更新标题_{int(time.time())}",
"status": "draft"
}
try:
start_time = time.time()
response = requests.put(url, headers=headers, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 文章ID: {article_id}, 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': f'/api/articles/{article_id} (PUT)',
'method': 'PUT',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': f'/api/articles/{article_id} (PUT)',
'method': 'PUT',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
else:
print(" 跳过:没有找到文章")
test_results.append({
'endpoint': '/api/articles/<id> (PUT)',
'method': 'PUT',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': '没有找到文章'
})
api_docs.append({
'endpoint': '/api/articles/<article_id>',
'method': 'PUT',
'description': '更新文章信息',
'auth': '需要认证',
'params': 'article_id (路径), title, content, status等字段',
'response': '更新成功消息'
})
def test_batch_published_account_cycle(token):
"""测试批量发布文章-账号循环分配"""
print("\n" + "-"*80)
print("[6/7] 测试批量发布文章(账号循环)...")
url = f"{API_BASE_URL}/api/articles/batch-published-account-cycle"
headers = {"Authorization": f"Bearer {token}"}
data = {
"article_ids": [1, 2],
"author_ids": [1, 2]
}
try:
start_time = time.time()
response = requests.post(url, headers=headers, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': '/api/articles/batch-published-account-cycle',
'method': 'POST',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': '/api/articles/batch-published-account-cycle',
'method': 'POST',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/articles/batch-published-account-cycle',
'method': 'POST',
'description': '批量发布文章 - 账号循环分配',
'auth': '需要认证+角色(admin/editor/reviewer/publisher/enterprise)',
'params': 'article_ids (数组), author_ids (数组)',
'response': '发布结果统计和详细列表'
})
def test_batch_published_review(token):
"""测试批量发布文章"""
print("\n" + "-"*80)
print("[7/7] 测试批量发布文章...")
url = f"{API_BASE_URL}/api/articles/batch-published-review"
headers = {"Authorization": f"Bearer {token}"}
data = {
"article_ids": [1, 2],
"author_id": 1
}
try:
start_time = time.time()
response = requests.post(url, headers=headers, json=data, timeout=30)
elapsed_time = (time.time() - start_time) * 1000
success = response.status_code == 200
error = '' if success else response.text
print(f" 状态码: {response.status_code} - 耗时: {elapsed_time:.2f}ms")
test_results.append({
'endpoint': '/api/articles/batch-published-review',
'method': 'POST',
'status_code': response.status_code,
'elapsed_time_ms': f"{elapsed_time:.2f}",
'success': success,
'error': error
})
except Exception as e:
print(f" 失败: {str(e)}")
test_results.append({
'endpoint': '/api/articles/batch-published-review',
'method': 'POST',
'status_code': 0,
'elapsed_time_ms': '0',
'success': False,
'error': str(e)
})
api_docs.append({
'endpoint': '/api/articles/batch-published-review',
'method': 'POST',
'description': '批量发布文章到指定作者',
'auth': '需要认证+角色(admin/editor/reviewer/publisher/enterprise)',
'params': 'article_ids (数组), author_id',
'response': '发布结果统计和详细列表'
})
def save_to_csv():
"""保存测试结果到CSV文件"""
timestamp = datetime.now().strftime('%Y%m%d')
filename = f'文章接口性能测试_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=[
'endpoint', 'method', 'status_code', 'elapsed_time_ms', 'success', 'error'
])
writer.writeheader()
writer.writerows(test_results)
print(f"\n✓ 性能测试结果已保存到: {filename}")
return filename
def save_api_docs():
"""保存接口文档到CSV文件"""
timestamp = datetime.now().strftime('%Y%m%d')
filename = f'文章接口文档_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=[
'endpoint', 'method', 'description', 'auth', 'params', 'response'
])
writer.writeheader()
writer.writerows(api_docs)
print(f"✓ 接口文档已保存到: {filename}")
return filename
def print_summary():
"""打印测试摘要"""
print("\n" + "="*80)
print("测试摘要")
print("="*80)
total = len(test_results)
success = sum(1 for r in test_results if r['success'])
failed = total - success
avg_time = sum(float(r['elapsed_time_ms']) for r in test_results) / total if total > 0 else 0
max_time = max(float(r['elapsed_time_ms']) for r in test_results) if total > 0 else 0
min_time = min(float(r['elapsed_time_ms']) for r in test_results) if total > 0 else 0
print(f"总测试数: {total}")
print(f"成功: {success} ({success/total*100:.1f}%)")
print(f"失败: {failed} ({failed/total*100:.1f}%)")
print(f"平均耗时: {avg_time:.2f}ms")
print(f"最大耗时: {max_time:.2f}ms")
print(f"最小耗时: {min_time:.2f}ms")
print("="*80)
def main():
"""主函数"""
print("="*80)
print("文章接口性能测试工具")
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"API地址: {API_BASE_URL}")
print("="*80)
# 登录
token = login()
if not token:
print("\n✗ 登录失败,无法继续测试")
return
# 执行所有测试
test_get_articles_list(token)
test_get_articles_dashboard(token)
test_generate_article(token)
test_get_article_detail(token)
test_update_article(token)
test_batch_published_account_cycle(token)
test_batch_published_review(token)
# 打印摘要
print_summary()
# 保存结果
csv_file = save_to_csv()
doc_file = save_api_docs()
print(f"\n✓ 测试完成!")
print(f" - 性能数据: {csv_file}")
print(f" - 接口文档: {doc_file}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n✗ 测试被用户中断")
except Exception as e:
print(f"\n✗ 测试异常: {e}")
import traceback
traceback.print_exc()