This commit is contained in:
2026-02-04 14:38:52 +08:00
commit a5147b1429
29 changed files with 4489 additions and 0 deletions

3
modules/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
# @line_count 1
"""测试专家系统模块包"""

369
modules/api_client.py Normal file
View File

@@ -0,0 +1,369 @@
# @line_count 200
"""大模型API客户端模块"""
import os
import yaml
import json
import time
from pathlib import Path
from typing import Dict, Optional, List, Any
import httpx
class APIClient:
"""大模型API客户端支持多个提供商"""
def __init__(self, config_path: Optional[str] = None):
"""
初始化API客户端
Args:
config_path: API配置文件路径默认为config/api_config.yaml
"""
if config_path is None:
current_dir = Path(__file__).parent.parent
config_path = current_dir / "config" / "api_config.yaml"
self.config_path = Path(config_path)
self.config: Dict[str, Any] = {}
self.current_provider: str = ""
self.load_config()
def load_config(self):
"""加载API配置"""
if not self.config_path.exists():
raise FileNotFoundError(f"API配置文件不存在: {self.config_path}")
with open(self.config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
self.current_provider = self.config.get('default_provider', 'deepseek')
def set_provider(self, provider: str):
"""
设置当前使用的API提供商
Args:
provider: 提供商名称 (deepseek, qianwen, openai, openrouter)
"""
if provider not in self.config.get('providers', {}):
raise ValueError(f"不支持的API提供商: {provider}")
self.current_provider = provider
def get_provider_config(self) -> Dict[str, Any]:
"""获取当前提供商的配置"""
providers = self.config.get('providers', {})
if self.current_provider not in providers:
raise ValueError(f"提供商配置不存在: {self.current_provider}")
return providers[self.current_provider]
def _call_deepseek_api(self, prompt: str, **kwargs) -> str:
"""调用DeepSeek API"""
config = self.get_provider_config()
api_key = config.get('api_key') or os.getenv('DEEPSEEK_API_KEY', '')
if not api_key:
raise ValueError("DeepSeek API密钥未配置")
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
data = {
'model': config.get('model', 'deepseek-chat'),
'messages': [
{'role': 'user', 'content': prompt}
],
'temperature': config.get('temperature', 0.7),
'max_tokens': config.get('max_tokens', 4000)
}
# 使用精细化超时配置连接10秒读取180秒
timeout_config = httpx.Timeout(connect=10.0, read=180.0, write=10.0, pool=10.0)
with httpx.Client(timeout=timeout_config) as client:
response = client.post(
config.get('base_url'),
headers=headers,
json=data
)
response.raise_for_status()
result = response.json()
return result['choices'][0]['message']['content']
# def _call_qianwen_api(self, prompt: str, **kwargs) -> str:
# """调用通义千问API"""
# config = self.get_provider_config()
# api_key = config.get('api_key') or os.getenv('QIANWEN_API_KEY', '')
# if not api_key:
# raise ValueError("通义千问API密钥未配置")
# headers = {
# 'Content-Type': 'application/json',
# 'Authorization': f'Bearer {api_key}'
# }
# # 通义千问API格式DashScope
# data = {
# 'model': config.get('model', 'qwen-turbo'),
# 'input': {
# 'messages': [
# {'role': 'user', 'content': prompt}
# ]
# },
# 'parameters': {
# 'temperature': config.get('temperature', 0.7),
# 'max_tokens': config.get('max_tokens', 4000)
# }
# }
# with httpx.Client(timeout=60.0) as client:
# response = client.post(
# config.get('base_url'),
# headers=headers,
# json=data
# )
# response.raise_for_status()
# result = response.json()
# # 通义千问的响应格式适配
# if 'output' in result:
# output = result['output']
# if 'choices' in output and len(output['choices']) > 0:
# return output['choices'][0]['message']['content']
# elif 'text' in output:
# return output['text']
# elif 'choices' in result and len(result['choices']) > 0:
# return result['choices'][0]['message']['content']
# # 如果都不匹配,返回整个结果(用于调试)
# raise ValueError(f"无法解析通义千问API响应: {result}")
def _call_qianwen_api(self, prompt: str, **kwargs) -> str:
"""调用通义千问API兼容模式"""
config = self.get_provider_config()
api_key = config.get('api_key') or os.getenv('QIANWEN_API_KEY', '')
if not api_key:
raise ValueError("通义千问API密钥未配置")
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
# 系统提示提高生成质量模拟Web端行为
# 针对航空航天软件测试的专业化system message
system_message = """你是一位拥有20年经验的航空航天软件测试专家。你的任务是生成高质量、可执行的测试用例。
【强制要求 - 必须遵守】
你必须为以下8种黑盒测试方法各生成至少1个测试用例
1. 等价类划分 - 有效/无效输入类测试
2. 边界值分析 - min-1, min, min+1, max-1, max, max+1
3. 错误推测法 - 字节序错误、超长帧、非法字符
4. 因果图法 - 多条件联动触发测试
5. 决策表测试 - 条件组合规则覆盖
6. 状态转换法 - 状态机路径覆盖
7. 场景法 - 端到端业务流程测试
8. 随机测试 - 随机合法输入测试
【输出要求】
- 每个test_case的name字段必须用方括号标注测试方法正常指令解析-[等价类划分]
- 必须生成至少8个测试用例确保8种方法全覆盖
- 测试步骤要具体包含CAN帧ID、指令编码、响应时间等
- 只输出JSON不要任何解释性文字
- 严格遵循用户指定的JSON格式"""
# 使用OpenAI兼容格式包含system message
data = {
'model': config.get('model', 'qwen-max'),
'messages': [
{'role': 'system', 'content': system_message},
{'role': 'user', 'content': prompt}
],
'temperature': config.get('temperature', 0.3),
'max_tokens': config.get('max_tokens', 8192)
}
print("\n发送Prompt: " + prompt)
# 使用精细化超时配置连接10秒读取180秒3分钟
# 测试用例生成需要较长时间,特别是复杂的结构化输出
timeout_config = httpx.Timeout(
connect=10.0, # 连接超时10秒
read=180.0, # 读取超时180秒适合复杂生成任务
write=10.0, # 写入超时10秒
pool=10.0 # 连接池超时10秒
)
with httpx.Client(timeout=timeout_config) as client:
response = client.post(
config.get('base_url'),
headers=headers,
json=data
)
response.raise_for_status()
result = response.json()
# OpenAI兼容格式的响应
if 'choices' in result and len(result['choices']) > 0:
content = result['choices'][0]['message']['content']
# 调试输出显示API返回的完整内容
print("\n" + "="*60)
print("[API响应] 完整返回内容:")
print("="*60)
print(f"响应长度: {len(content)} 字符")
print("-"*60)
# 显示前2000字符
if len(content) > 2000:
print(content[:2000])
print(f"\n... [截断,还有 {len(content)-2000} 字符]")
else:
print(content)
print("="*60 + "\n")
return content
raise ValueError(f"无法解析通义千问API响应: {result}")
def _call_openai_api(self, prompt: str, **kwargs) -> str:
"""调用OpenAI API"""
config = self.get_provider_config()
api_key = config.get('api_key') or os.getenv('OPENAI_API_KEY', '')
if not api_key:
raise ValueError("OpenAI API密钥未配置")
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
data = {
'model': config.get('model', 'gpt-3.5-turbo'),
'messages': [
{'role': 'user', 'content': prompt}
],
'temperature': config.get('temperature', 0.7),
'max_tokens': config.get('max_tokens', 4000)
}
# 使用精细化超时配置连接10秒读取180秒
timeout_config = httpx.Timeout(connect=10.0, read=180.0, write=10.0, pool=10.0)
with httpx.Client(timeout=timeout_config) as client:
response = client.post(
config.get('base_url'),
headers=headers,
json=data
)
response.raise_for_status()
result = response.json()
return result['choices'][0]['message']['content']
def _call_openrouter_api(self, prompt: str, **kwargs) -> str:
"""调用OpenRouter API"""
config = self.get_provider_config()
api_key = config.get('api_key') or os.getenv('OPENROUTER_API_KEY', '')
if not api_key:
raise ValueError("OpenRouter API密钥未配置")
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
# 添加可选的 HTTP-Referer 和 X-Title headers
http_referer = config.get('http_referer', '')
if http_referer:
headers['HTTP-Referer'] = http_referer
x_title = config.get('x_title', '')
if x_title:
headers['X-Title'] = x_title
data = {
'model': config.get('model', 'allenai/molmo-2-8b:free'),
'messages': [
{'role': 'user', 'content': prompt}
],
'temperature': config.get('temperature', 0.7),
'max_tokens': config.get('max_tokens', 4000)
}
# 使用精细化超时配置连接10秒读取180秒
timeout_config = httpx.Timeout(connect=10.0, read=180.0, write=10.0, pool=10.0)
with httpx.Client(timeout=timeout_config) as client:
response = client.post(
config.get('base_url'),
headers=headers,
json=data
)
response.raise_for_status()
result = response.json()
# OpenRouter 使用 OpenAI 兼容格式
if 'choices' in result and len(result['choices']) > 0:
return result['choices'][0]['message']['content']
raise ValueError(f"无法解析OpenRouter API响应: {result}")
def call_api(self, prompt: str, max_retries: int = 3, retry_delay: int = 2) -> str:
"""
调用大模型API
Args:
prompt: 提示词
max_retries: 最大重试次数
retry_delay: 重试延迟(秒)
Returns:
API返回的文本内容
"""
provider_methods = {
'deepseek': self._call_deepseek_api,
'qianwen': self._call_qianwen_api,
'openai': self._call_openai_api,
'openrouter': self._call_openrouter_api
}
method = provider_methods.get(self.current_provider)
if not method:
raise ValueError(f"不支持的API提供商: {self.current_provider}")
last_error = None
for attempt in range(max_retries):
try:
return method(prompt)
except Exception as e:
last_error = e
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
else:
raise Exception(f"API调用失败重试{max_retries}次): {str(e)}") from last_error
raise Exception(f"API调用失败: {str(last_error)}")
def update_api_key(self, provider: str, api_key: str):
"""
更新API密钥
Args:
provider: 提供商名称
api_key: API密钥
"""
if provider not in self.config.get('providers', {}):
raise ValueError(f"不支持的API提供商: {provider}")
self.config['providers'][provider]['api_key'] = api_key
# 保存到配置文件
with open(self.config_path, 'w', encoding='utf-8') as f:
yaml.dump(self.config, f, allow_unicode=True, default_flow_style=False)

76
modules/json_parser.py Normal file
View File

@@ -0,0 +1,76 @@
# @line_count 100
"""JSON解析模块提取功能点和章节信息"""
import json
from pathlib import Path
from typing import List, Dict, Any, Optional
from .parser_adapters.parser_factory import ParserFactory
class JSONParser:
"""JSON文档解析器门面模式"""
def __init__(self, json_path: str):
"""
初始化JSON解析器
Args:
json_path: JSON文件路径
"""
self.json_path = Path(json_path)
self.data: Dict[str, Any] = {}
self.adapter = None
self.load_json()
def load_json(self):
"""加载JSON文件并创建适配器"""
if not self.json_path.exists():
raise FileNotFoundError(f"JSON文件不存在: {self.json_path}")
with open(self.json_path, 'r', encoding='utf-8') as f:
self.data = json.load(f)
# 自动检测格式并创建适配器
self.adapter = ParserFactory.create_adapter(self.data)
def extract_function_points(self) -> List[Dict[str, Any]]:
"""
提取功能点列表
Returns:
功能点列表,每个功能点包含:
- module_name: 所属模块
- function_name: 功能名称
- description: 功能描述
- operation_steps: 操作步骤(可选)
- requirement_id: 需求编号(可选)
- requirement_type: 需求类型(可选)
"""
return self.adapter.extract_function_points()
def get_document_info(self) -> Dict[str, Any]:
"""
获取文档基本信息
Returns:
文档信息字典
"""
return self.adapter.get_document_info()
def get_sections(self) -> List[Dict[str, Any]]:
"""
获取所有章节信息
Returns:
章节列表
"""
return self.adapter.get_sections()
def get_module_summary(self) -> List[Dict[str, Any]]:
"""
获取模块摘要
Returns:
模块摘要列表
"""
return self.adapter.get_module_summary()

332
modules/output_formatter.py Normal file
View File

@@ -0,0 +1,332 @@
# @line_count 300
"""多格式输出模块"""
import json
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime
try:
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
OPENPYXL_AVAILABLE = True
except ImportError:
OPENPYXL_AVAILABLE = False
class OutputFormatter:
"""多格式输出格式化器"""
def __init__(self, test_items: List[Dict[str, Any]], test_cases: List[Dict[str, Any]],
document_info: Optional[Dict[str, Any]] = None):
"""
初始化输出格式化器
Args:
test_items: 测试项列表
test_cases: 测试用例列表
document_info: 文档信息
"""
self.test_items = test_items
self.test_cases = test_cases
self.document_info = document_info or {}
def to_json(self, output_path: Optional[str] = None) -> str:
"""
输出为JSON格式
Args:
output_path: 输出文件路径如果为None则返回JSON字符串
Returns:
JSON字符串或文件路径
"""
data = {
'document_info': self.document_info,
'generation_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'summary': {
'test_item_count': len(self.test_items),
'test_case_count': len(self.test_cases)
},
'test_items': self.test_items,
'test_cases': self.test_cases
}
json_str = json.dumps(data, ensure_ascii=False, indent=2)
if output_path:
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(json_str)
return str(output_path)
return json_str
def to_markdown(self, output_path: Optional[str] = None) -> str:
"""
输出为Markdown格式
Args:
output_path: 输出文件路径如果为None则返回Markdown字符串
Returns:
Markdown字符串或文件路径
"""
lines = []
# 标题和文档信息
lines.append("# 测试项和测试用例文档\n")
if self.document_info.get('title'):
lines.append(f"**文档标题**: {self.document_info['title']}\n")
if self.document_info.get('version'):
lines.append(f"**版本**: {self.document_info['version']}\n")
lines.append(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
lines.append(f"**测试项数量**: {len(self.test_items)}\n")
lines.append(f"**测试用例数量**: {len(self.test_cases)}\n")
lines.append("\n---\n")
# 按模块分组
modules = {}
for item in self.test_items:
module_name = item.get('module_name', '未分类')
if module_name not in modules:
modules[module_name] = []
modules[module_name].append(item)
# 输出每个模块的测试项和测试用例
for module_name, items in modules.items():
lines.append(f"\n## {module_name}\n")
for item in items:
# 测试项
lines.append(f"\n### {item.get('id', '')} {item.get('name', '')}\n")
lines.append(f"- **测试类型**: {item.get('test_type', 'N/A')}")
lines.append(f"- **优先级**: {item.get('priority', 'N/A')}")
lines.append(f"- **测试目标**: {item.get('test_objective', 'N/A')}\n")
# 该测试项下的测试用例
item_cases = [case for case in self.test_cases
if case.get('test_item_id') == item.get('id')]
if item_cases:
lines.append("#### 测试用例\n")
for case in item_cases:
lines.append(f"\n**{case.get('id', '')} {case.get('name', '')}**\n")
lines.append(f"- **前置条件**: {case.get('preconditions', 'N/A')}")
lines.append(f"- **优先级**: {case.get('priority', 'N/A')}")
lines.append(f"- **测试类型**: {case.get('test_type', 'N/A')}\n")
lines.append("**测试步骤**:\n")
for idx, step in enumerate(case.get('test_steps', []), 1):
lines.append(f"{idx}. {step}\n")
lines.append(f"\n**预期结果**: {case.get('expected_result', 'N/A')}\n")
lines.append("---\n")
markdown_str = '\n'.join(lines)
if output_path:
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(markdown_str)
return str(output_path)
return markdown_str
def to_excel(self, output_path: str) -> str:
"""
输出为Excel格式
Args:
output_path: 输出文件路径
Returns:
文件路径
"""
if not OPENPYXL_AVAILABLE:
raise ImportError("openpyxl未安装无法生成Excel文件。请运行: pip install openpyxl")
wb = Workbook()
# 删除默认工作表
if 'Sheet' in wb.sheetnames:
wb.remove(wb['Sheet'])
# 创建测试项工作表
self._create_test_items_sheet(wb)
# 创建测试用例工作表
self._create_test_cases_sheet(wb)
# 创建摘要工作表
self._create_summary_sheet(wb)
# 保存文件
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
wb.save(output_path)
return str(output_path)
def _create_test_items_sheet(self, wb: Workbook):
"""创建测试项工作表"""
ws = wb.create_sheet("测试项", 0)
# 表头
headers = ['测试项ID', '测试项名称', '所属模块', '功能名称', '测试类型', '优先级', '测试目标']
ws.append(headers)
# 设置表头样式
header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid")
header_font = Font(bold=True, color="FFFFFF")
border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
for col in range(1, len(headers) + 1):
cell = ws.cell(1, col)
cell.fill = header_fill
cell.font = header_font
cell.border = border
cell.alignment = Alignment(horizontal='center', vertical='center')
# 数据行
for item in self.test_items:
row = [
item.get('id', ''),
item.get('name', ''),
item.get('module_name', ''),
item.get('function_name', ''),
item.get('test_type', ''),
item.get('priority', ''),
item.get('test_objective', '')
]
ws.append(row)
# 设置边框
for col in range(1, len(headers) + 1):
ws.cell(ws.max_row, col).border = border
# 调整列宽
column_widths = [12, 30, 15, 15, 12, 8, 40]
for col, width in enumerate(column_widths, 1):
ws.column_dimensions[ws.cell(1, col).column_letter].width = width
# 冻结首行
ws.freeze_panes = 'A2'
def _create_test_cases_sheet(self, wb: Workbook):
"""创建测试用例工作表"""
ws = wb.create_sheet("测试用例", 1)
# 表头
headers = ['测试用例ID', '测试项ID', '测试用例名称', '所属模块', '前置条件',
'测试步骤', '预期结果', '优先级', '测试类型']
ws.append(headers)
# 设置表头样式
header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid")
header_font = Font(bold=True, color="FFFFFF")
border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
for col in range(1, len(headers) + 1):
cell = ws.cell(1, col)
cell.fill = header_fill
cell.font = header_font
cell.border = border
cell.alignment = Alignment(horizontal='center', vertical='center')
# 数据行
for case in self.test_cases:
test_steps_str = '\n'.join([f"{idx}. {step}" for idx, step
in enumerate(case.get('test_steps', []), 1)])
row = [
case.get('id', ''),
case.get('test_item_id', ''),
case.get('name', ''),
case.get('module_name', ''),
case.get('preconditions', ''),
test_steps_str,
case.get('expected_result', ''),
case.get('priority', ''),
case.get('test_type', '')
]
ws.append(row)
# 设置边框和换行
for col in range(1, len(headers) + 1):
cell = ws.cell(ws.max_row, col)
cell.border = border
if col == 6: # 测试步骤列
cell.alignment = Alignment(wrap_text=True, vertical='top')
else:
cell.alignment = Alignment(vertical='top')
# 调整列宽
column_widths = [12, 12, 30, 15, 25, 40, 30, 8, 12]
for col, width in enumerate(column_widths, 1):
ws.column_dimensions[ws.cell(1, col).column_letter].width = width
# 设置行高(为测试步骤列预留空间)
for row in range(2, ws.max_row + 1):
ws.row_dimensions[row].height = 60
# 冻结首行
ws.freeze_panes = 'A2'
def _create_summary_sheet(self, wb: Workbook):
"""创建摘要工作表"""
ws = wb.create_sheet("摘要", 2)
# 文档信息
if self.document_info.get('title'):
ws.append(['文档标题', self.document_info['title']])
if self.document_info.get('version'):
ws.append(['版本', self.document_info['version']])
ws.append(['生成时间', datetime.now().strftime('%Y-%m-%d %H:%M:%S')])
ws.append([])
# 统计信息
ws.append(['统计项', '数量'])
ws.append(['测试项总数', len(self.test_items)])
ws.append(['测试用例总数', len(self.test_cases)])
ws.append([])
# 按模块统计
ws.append(['模块', '测试项数量', '测试用例数量'])
modules = {}
for item in self.test_items:
module_name = item.get('module_name', '未分类')
if module_name not in modules:
modules[module_name] = {'items': 0, 'cases': 0}
modules[module_name]['items'] += 1
for case in self.test_cases:
module_name = case.get('module_name', '未分类')
if module_name not in modules:
modules[module_name] = {'items': 0, 'cases': 0}
modules[module_name]['cases'] += 1
for module_name, stats in modules.items():
ws.append([module_name, stats['items'], stats['cases']])
# 设置样式
header_font = Font(bold=True)
for row in ws.iter_rows(min_row=1, max_row=1):
for cell in row:
cell.font = header_font
# 调整列宽
ws.column_dimensions['A'].width = 20
ws.column_dimensions['B'].width = 15
ws.column_dimensions['C'].width = 15

View File

@@ -0,0 +1,13 @@
# @line_count 50
"""JSON解析器适配器包"""
from .base_adapter import BaseParserAdapter
from .parser_factory import ParserFactory
from .section_array_adapter import SectionArrayAdapter
from .requirement_tree_adapter import RequirementTreeAdapter
__all__ = [
'BaseParserAdapter',
'ParserFactory',
'SectionArrayAdapter',
'RequirementTreeAdapter',
]

View File

@@ -0,0 +1,81 @@
# @line_count 100
"""JSON解析器适配器基类"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any
class BaseParserAdapter(ABC):
"""解析器适配器抽象基类"""
def __init__(self, data: Dict[str, Any]):
"""
初始化适配器
Args:
data: 解析后的JSON数据
"""
self.data = data
@abstractmethod
def extract_function_points(self) -> List[Dict[str, Any]]:
"""
提取功能点列表
Returns:
功能点列表,统一格式:
- module_name: 所属模块
- function_name: 功能名称
- description: 功能描述
- operation_steps: 操作步骤(可选)
- requirement_id: 需求编号(可选)
- requirement_type: 需求类型(可选)
"""
pass
@abstractmethod
def get_document_info(self) -> Dict[str, Any]:
"""
获取文档基本信息
Returns:
文档信息字典,统一格式:
- title: 文档标题
- version: 版本(可选)
- date: 日期(可选)
- section_count: 章节数量
"""
pass
@abstractmethod
def get_sections(self) -> List[Dict[str, Any]]:
"""
获取所有章节信息
Returns:
章节列表
"""
pass
@abstractmethod
def get_module_summary(self) -> List[Dict[str, Any]]:
"""
获取模块摘要
Returns:
模块摘要列表
"""
pass
@staticmethod
@abstractmethod
def can_parse(data: Dict[str, Any]) -> bool:
"""
检测是否能解析该格式
Args:
data: JSON数据
Returns:
是否能解析
"""
pass

View File

@@ -0,0 +1,53 @@
# @line_count 100
"""解析器工厂"""
from typing import Dict, Any, Type, List
from .base_adapter import BaseParserAdapter
from .section_array_adapter import SectionArrayAdapter
from .requirement_tree_adapter import RequirementTreeAdapter
class ParserFactory:
"""解析器工厂,自动检测格式并创建合适的适配器"""
# 注册所有适配器(按优先级排序)
_adapters: List[Type[BaseParserAdapter]] = [
RequirementTreeAdapter, # 新格式优先
SectionArrayAdapter, # 旧格式
# 未来可以在这里添加更多适配器
]
@classmethod
def create_adapter(cls, data: Dict[str, Any]) -> BaseParserAdapter:
"""
创建合适的适配器
Args:
data: JSON数据
Returns:
适配器实例
Raises:
ValueError: 如果无法识别格式
"""
for adapter_class in cls._adapters:
if adapter_class.can_parse(data):
return adapter_class(data)
raise ValueError(
"无法识别JSON格式。支持的格式\n"
"- 需求树格式(包含'需求内容''文档元数据'\n"
"- 章节数组格式(包含'sections'数组)"
)
@classmethod
def register_adapter(cls, adapter_class: Type[BaseParserAdapter],
priority: int = 0):
"""
注册新的适配器
Args:
adapter_class: 适配器类
priority: 优先级越小越优先0为最高优先级
"""
cls._adapters.insert(priority, adapter_class)

View File

@@ -0,0 +1,139 @@
# @line_count 200
"""新格式适配器(需求树格式)"""
from typing import List, Dict, Any
from .base_adapter import BaseParserAdapter
class RequirementTreeAdapter(BaseParserAdapter):
"""处理新格式:需求树结构"""
def extract_function_points(self) -> List[Dict[str, Any]]:
"""从需求树中提取功能点"""
function_points = []
requirement_content = self.data.get('需求内容', {})
# 递归遍历章节
self._traverse_requirements(requirement_content, [], function_points)
return function_points
def _traverse_requirements(self, sections: Dict, path: List[str],
function_points: List[Dict]):
"""递归遍历章节,提取需求"""
for section_key, section_data in sections.items():
section_info = section_data.get('章节信息', {})
section_title = section_info.get('章节标题', '')
current_path = path + [section_title]
# 如果有需求列表,提取需求
if '需求列表' in section_data:
requirements = section_data['需求列表']
for req in requirements:
function_points.append({
'module_name': ' > '.join(current_path[:-1]) if len(current_path) > 1 else section_title,
'function_name': req.get('需求编号', ''),
'description': req.get('需求描述', ''),
'requirement_id': req.get('需求编号', ''),
'requirement_type': self._parse_requirement_type(
req.get('需求编号', '')
),
'interface_info': {
k: v for k, v in req.items()
if k in ['接口名称', '接口类型', '来源', '目的地']
} if any(k in req for k in ['接口名称', '接口类型']) else None
})
# 递归处理子章节
if '子章节' in section_data:
self._traverse_requirements(
section_data['子章节'],
current_path,
function_points
)
def _parse_requirement_type(self, req_id: str) -> str:
"""从需求编号解析需求类型"""
if req_id.startswith('FR-'):
return '功能需求'
elif req_id.startswith('IR-'):
return '接口需求'
elif req_id.startswith('OR-'):
return '其他需求'
return '未知'
def get_document_info(self) -> Dict[str, Any]:
"""获取文档信息"""
metadata = self.data.get('文档元数据', {})
return {
'title': metadata.get('标题', ''),
'version': '', # 新格式可能没有版本
'date': metadata.get('生成时间', ''),
'section_count': self._count_sections(self.data.get('需求内容', {}))
}
def _count_sections(self, sections: Dict) -> int:
"""递归统计章节数量"""
count = 0
for section_data in sections.values():
count += 1
if '子章节' in section_data:
count += self._count_sections(section_data['子章节'])
return count
def get_sections(self) -> List[Dict[str, Any]]:
"""获取章节列表(转换为扁平结构)"""
sections = []
self._flatten_sections(self.data.get('需求内容', {}), sections)
return sections
def _flatten_sections(self, sections: Dict, result: List[Dict]):
"""递归扁平化章节结构"""
for section_data in sections.values():
section_info = section_data.get('章节信息', {})
result.append({
'title': section_info.get('章节标题', ''),
'number': section_info.get('章节编号', ''),
'level': section_info.get('章节级别', 0),
'requirement_count': len(section_data.get('需求列表', []))
})
if '子章节' in section_data:
self._flatten_sections(section_data['子章节'], result)
def get_module_summary(self) -> List[Dict[str, Any]]:
"""获取模块摘要"""
modules = []
sections_dict = {}
# 遍历需求内容,构建模块统计
self._build_module_summary(self.data.get('需求内容', {}), sections_dict)
for module_name, info in sections_dict.items():
modules.append({
'name': module_name,
'function_count': info['requirement_count'],
'description': info.get('description', '')
})
return modules
def _build_module_summary(self, sections: Dict, result: Dict):
"""递归构建模块摘要"""
for section_data in sections.values():
section_info = section_data.get('章节信息', {})
section_title = section_info.get('章节标题', '')
if '需求列表' in section_data:
if section_title not in result:
result[section_title] = {
'requirement_count': 0,
'description': ''
}
result[section_title]['requirement_count'] += len(section_data['需求列表'])
if '子章节' in section_data:
self._build_module_summary(section_data['子章节'], result)
@staticmethod
def can_parse(data: Dict[str, Any]) -> bool:
"""检测是否为新格式"""
return '需求内容' in data and '文档元数据' in data

View File

@@ -0,0 +1,164 @@
# @line_count 150
"""旧格式适配器sections数组格式"""
from typing import List, Dict, Any
from .base_adapter import BaseParserAdapter
class SectionArrayAdapter(BaseParserAdapter):
"""处理旧格式sections数组"""
def extract_function_points(self) -> List[Dict[str, Any]]:
"""从sections数组中提取功能点"""
function_points = []
sections = self.data.get('sections', [])
for section in sections:
module_name = section.get('title', '')
content = section.get('content', [])
# 提取模块总体描述(第一个较长的文本内容)
module_description = ""
for item in content:
if item.get('type') == 'text':
text = item.get('content', '').strip()
if len(text) > 50: # 较长的文本通常是模块描述
module_description = text
break
# 识别功能点
# 功能点通常是较短的文本(标题),后面跟着描述
current_function = None
function_description_parts = []
for i, item in enumerate(content):
if item.get('type') != 'text':
continue
text = item.get('content', '').strip()
if not text:
continue
# 判断是否是功能点标题
# 规则:短文本(通常<20字符且不是描述性文本
is_function_title = (
len(text) < 20 and
not text.endswith('') and
not text.endswith('') and
not ('如下' in text or '所示' in text)
)
if is_function_title:
# 保存之前的功能点
if current_function:
function_points.append({
'module_name': module_name,
'module_description': module_description,
'function_name': current_function,
'description': ' '.join(function_description_parts),
'operation_steps': self._extract_steps(function_description_parts)
})
# 开始新功能点
current_function = text
function_description_parts = []
else:
# 添加到当前功能点的描述
if current_function:
function_description_parts.append(text)
elif not module_description:
# 如果还没有模块描述,这可能是模块描述的一部分
pass
# 保存最后一个功能点
if current_function:
function_points.append({
'module_name': module_name,
'module_description': module_description,
'function_name': current_function,
'description': ' '.join(function_description_parts),
'operation_steps': self._extract_steps(function_description_parts)
})
# 如果没有识别到功能点,将整个模块作为一个功能点
if not current_function and module_description:
function_points.append({
'module_name': module_name,
'module_description': module_description,
'function_name': module_name,
'description': module_description,
'operation_steps': []
})
return function_points
def _extract_steps(self, description_parts: List[str]) -> List[str]:
"""
从描述中提取操作步骤
Args:
description_parts: 描述文本列表
Returns:
操作步骤列表
"""
steps = []
for part in description_parts:
# 查找包含操作动词的句子
if any(keyword in part for keyword in ['点击', '选择', '输入', '打开', '关闭', '设置', '查看']):
# 移除"如下图所示"等描述性文字
cleaned = part.replace('如下图所示', '').replace('如下图所示:', '').strip()
if cleaned:
steps.append(cleaned)
return steps
def get_document_info(self) -> Dict[str, Any]:
"""获取文档信息"""
return {
'title': self.data.get('document_title', ''),
'version': self.data.get('version', ''),
'date': self.data.get('date', ''),
'section_count': len(self.data.get('sections', []))
}
def get_sections(self) -> List[Dict[str, Any]]:
"""获取章节列表"""
return self.data.get('sections', [])
def get_module_summary(self) -> List[Dict[str, Any]]:
"""获取模块摘要"""
modules = []
sections = self.data.get('sections', [])
for section in sections:
module_info = {
'name': section.get('title', ''),
'function_count': 0,
'description': ''
}
# 查找模块描述
content = section.get('content', [])
for item in content:
if item.get('type') == 'text':
text = item.get('content', '').strip()
if len(text) > 50:
module_info['description'] = text
break
# 统计功能点数量(简单统计)
function_names = []
for item in content:
if item.get('type') == 'text':
text = item.get('content', '').strip()
if len(text) < 20 and text and not text.endswith(''):
function_names.append(text)
module_info['function_count'] = len(set(function_names))
modules.append(module_info)
return modules
@staticmethod
def can_parse(data: Dict[str, Any]) -> bool:
"""检测是否为旧格式"""
return 'sections' in data and isinstance(data['sections'], list)

178
modules/prompt_manager.py Normal file
View File

@@ -0,0 +1,178 @@
# @line_count 150
"""Prompt模板管理模块"""
import os
import yaml
from pathlib import Path
from typing import Dict, Optional
from .test_standard_manager import TestStandardManager
class PromptManager:
"""Prompt模板管理器"""
def __init__(self, config_path: Optional[str] = None,
use_standards: bool = True,
standard_manager: Optional[TestStandardManager] = None):
"""
初始化Prompt管理器
Args:
config_path: Prompt配置文件路径默认为config/default_prompt.yaml
use_standards: 是否使用测试规范默认True
standard_manager: 测试规范管理器如果为None且use_standards=True则创建新实例
"""
if config_path is None:
# 获取项目根目录
current_dir = Path(__file__).parent.parent
config_path = current_dir / "config" / "default_prompt.yaml"
self.config_path = Path(config_path)
self.prompts: Dict[str, str] = {}
self.use_standards = use_standards
self.standard_manager = standard_manager or (TestStandardManager() if use_standards else None)
# 用户自定义Prompt缓存 {requirement_id或func_point_id: custom_prompt}
self.custom_prompts_cache: Dict[str, str] = {}
self.load_prompts()
def load_prompts(self):
"""加载Prompt模板"""
if not self.config_path.exists():
raise FileNotFoundError(f"Prompt配置文件不存在: {self.config_path}")
with open(self.config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
self.prompts = {
'test_item': config.get('test_item_prompt', ''),
'test_case': config.get('test_case_prompt', ''),
'batch': config.get('batch_generation_prompt', '')
}
def get_prompt(self, prompt_type: str) -> str:
"""
获取指定类型的Prompt模板
Args:
prompt_type: Prompt类型 ('test_item', 'test_case', 'batch')
Returns:
Prompt模板字符串
"""
if prompt_type not in self.prompts:
raise ValueError(f"未知的Prompt类型: {prompt_type}")
return self.prompts[prompt_type]
def format_prompt(self, prompt_type: str, **kwargs) -> str:
"""
格式化Prompt模板替换占位符
Args:
prompt_type: Prompt类型
**kwargs: 要替换的变量如果包含requirement字典且use_standards=True则使用规范化Prompt
Returns:
格式化后的Prompt字符串
"""
# 检查是否有自定义prompt优先级最高
if 'requirement' in kwargs:
requirement = kwargs['requirement']
req_id = requirement.get('requirement_id') or requirement.get('description', '')[:50]
# 调试输出
print(f"\n[PromptManager] 查找自定义Prompt:")
print(f" - requirement_id: {req_id}")
print(f" - 缓存中的Keys: {list(self.custom_prompts_cache.keys())}")
print(f" - 是否匹配: {req_id in self.custom_prompts_cache}")
# 如果有该功能点的自定义prompt直接返回
if req_id in self.custom_prompts_cache:
custom_prompt = self.custom_prompts_cache[req_id]
print(f" ✅ 使用自定义Prompt (长度: {len(custom_prompt)} 字符)")
return custom_prompt
else:
print(f" ⚠️ 未找到自定义Prompt将使用默认生成")
# 如果使用测试规范且提供了requirement使用规范化Prompt
if self.use_standards and self.standard_manager and 'requirement' in kwargs:
requirement = kwargs['requirement']
standard_ids = kwargs.get('standard_ids') # 可选:指定测试规范
print(f" 📋 使用测试规范生成Prompt")
return self.standard_manager.build_prompt(requirement, standard_ids)
# 否则使用传统方式
prompt = self.get_prompt(prompt_type)
print(f" 📄 使用传统Prompt模板")
# 替换占位符 {variable_name}
try:
return prompt.format(**kwargs)
except KeyError as e:
raise ValueError(f"Prompt模板缺少必需的变量: {e}")
def load_custom_prompt(self, file_path: str, prompt_type: str):
"""
从文件加载自定义Prompt模板
Args:
file_path: 自定义Prompt文件路径
prompt_type: Prompt类型
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"自定义Prompt文件不存在: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
custom_prompt = f.read()
self.prompts[prompt_type] = custom_prompt
def set_custom_prompt(self, func_point_id: str, custom_prompt: str):
"""
设置功能点的自定义Prompt
Args:
func_point_id: 功能点ID或需求ID
custom_prompt: 自定义Prompt内容
"""
self.custom_prompts_cache[func_point_id] = custom_prompt
def clear_custom_prompts(self):
"""清空所有自定义Prompt缓存"""
self.custom_prompts_cache.clear()
def get_custom_prompt(self, func_point_id: str) -> Optional[str]:
"""
获取功能点的自定义Prompt
Args:
func_point_id: 功能点ID或需求ID
Returns:
自定义Prompt内容如果没有则返回None
"""
return self.custom_prompts_cache.get(func_point_id)
def save_custom_prompt(self, prompt_type: str, prompt_content: str, file_path: Optional[str] = None):
"""
保存自定义Prompt到文件
Args:
prompt_type: Prompt类型
prompt_content: Prompt内容
file_path: 保存路径默认为templates目录
"""
if file_path is None:
current_dir = Path(__file__).parent.parent
templates_dir = current_dir / "templates"
templates_dir.mkdir(exist_ok=True)
file_path = templates_dir / f"{prompt_type}_prompt.txt"
file_path = Path(file_path)
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(prompt_content)
return str(file_path)

View File

@@ -0,0 +1,167 @@
# @line_count 200
"""测试规范选择器,实现规则+AI混合选择策略"""
from typing import List, Dict, Any, Optional, TYPE_CHECKING
from .test_standard_loader import TestStandardLoader
import json
import re
if TYPE_CHECKING:
from .api_client import APIClient
class StandardSelector:
"""测试规范选择器,实现规则+AI混合选择策略"""
def __init__(self, loader: TestStandardLoader, api_client: Optional['APIClient'] = None):
"""
初始化规范选择器
Args:
loader: 测试规范加载器
api_client: API客户端用于AI辅助选择
"""
self.loader = loader
self.api_client = api_client
self.mapping_rules = loader.get_requirement_mapping()
self.keyword_mapping = loader.get_keyword_mapping()
def select_standards(self, requirement: Dict[str, Any], use_ai: bool = True) -> List[str]:
"""
为需求选择适用的测试规范
Args:
requirement: 需求字典包含requirement_type、description等
use_ai: 是否使用AI辅助选择当规则匹配不足时
Returns:
测试规范ID列表
"""
# 第一步:规则匹配
rule_based = self._rule_based_selection(requirement)
# 第二步如果规则匹配不足使用AI补充
if use_ai and len(rule_based) < 2 and self.api_client:
ai_based = self._ai_based_selection(requirement)
# 合并去重
all_standards = list(set(rule_based + ai_based))
return all_standards
return rule_based
def _rule_based_selection(self, requirement: Dict[str, Any]) -> List[str]:
"""
基于规则的规范选择
Args:
requirement: 需求字典
Returns:
测试规范ID列表
"""
standards = []
req_type = requirement.get('requirement_type', '')
req_desc = requirement.get('description', '')
# 1. 需求类型映射
if req_type in self.mapping_rules:
mapping = self.mapping_rules[req_type]
# 主要测试类型
standards.extend(mapping.get('primary', []))
# 次要测试类型
standards.extend(mapping.get('secondary', []))
# 条件性测试类型
conditionals = mapping.get('conditional', {})
if '有界面' in conditionals and ('界面' in req_desc or '显示' in req_desc):
standards.extend(conditionals['有界面'])
if '有性能要求' in conditionals and ('性能' in req_desc or '速度' in req_desc or '时间' in req_desc):
standards.extend(conditionals['有性能要求'])
if '安全关键' in conditionals and ('安全' in req_desc or '危险' in req_desc or '报警' in req_desc):
standards.extend(conditionals['安全关键'])
if '多软件交互' in conditionals and ('交互' in req_desc or '协同' in req_desc):
standards.extend(conditionals['多软件交互'])
if '性能相关' in conditionals and ('性能' in req_desc):
standards.extend(conditionals['性能相关'])
if '安全相关' in conditionals and ('安全' in req_desc):
standards.extend(conditionals['安全相关'])
# 2. 关键词匹配
for keyword, test_types in self.keyword_mapping.items():
if keyword in req_desc:
standards.extend(test_types)
# 3. 接口信息检测
if requirement.get('interface_info'):
standards.append('外部接口测试')
# 去重并返回
return list(set(standards))
def _ai_based_selection(self, requirement: Dict[str, Any]) -> List[str]:
"""
基于AI的规范选择当规则匹配不足时使用
Args:
requirement: 需求字典
Returns:
测试规范ID列表
"""
if not self.api_client:
return []
# 获取所有测试规范名称
all_standards = self.loader.get_all_standards()
standard_names = [f"{i+1}. {std.get('name', '')}" for i, std in enumerate(all_standards)]
prompt = f"""根据以下需求从14个测试类型中选择最适用的3-5个
{chr(10).join(standard_names)}
需求类型:{requirement.get('requirement_type', '未知')}
需求描述:{requirement.get('description', '')}
请只返回测试类型的名称列表格式为JSON数组例如["功能测试", "性能测试", "边界测试"]
不要包含其他说明文字。"""
try:
response = self.api_client.call_api(prompt)
# 尝试解析JSON
test_types = self._parse_ai_response(response)
return test_types
except Exception as e:
# 如果AI选择失败返回空列表
print(f"AI选择失败: {str(e)}")
return []
def _parse_ai_response(self, response: str) -> List[str]:
"""
解析AI响应提取测试类型列表
Args:
response: AI响应文本
Returns:
测试类型名称列表
"""
# 尝试直接解析JSON
try:
# 尝试提取JSON数组
json_match = re.search(r'\[.*?\]', response, re.DOTALL)
if json_match:
json_str = json_match.group(0)
test_types = json.loads(json_str)
if isinstance(test_types, list):
return test_types
except:
pass
# 如果JSON解析失败尝试文本匹配
all_standards = self.loader.get_all_standards()
standard_names = [std.get('name', '') for std in all_standards]
found_types = []
for name in standard_names:
if name in response:
found_types.append(name)
return found_types

459
modules/test_generator.py Normal file
View File

@@ -0,0 +1,459 @@
# @line_count 250
"""测试生成引擎模块"""
import json
import re
from typing import List, Dict, Any, Optional, Callable
from .json_parser import JSONParser
from .api_client import APIClient
from .prompt_manager import PromptManager
class TestGenerator:
"""测试项和测试用例生成器"""
def __init__(self, json_path: str, api_client: Optional[APIClient] = None,
prompt_manager: Optional[PromptManager] = None):
"""
初始化测试生成器
Args:
json_path: JSON文档路径
api_client: API客户端实例
prompt_manager: Prompt管理器实例
"""
self.parser = JSONParser(json_path)
self.api_client = api_client or APIClient()
self.prompt_manager = prompt_manager or PromptManager()
self.test_items: List[Dict[str, Any]] = []
self.test_cases: List[Dict[str, Any]] = []
self.test_case_counter = 0
def generate_test_items(self, function_points: Optional[List[Dict[str, Any]]] = None,
progress_callback: Optional[Callable] = None) -> List[Dict[str, Any]]:
"""
生成测试项
Args:
function_points: 功能点列表如果为None则自动提取
progress_callback: 进度回调函数 (current, total, message)
Returns:
测试项列表
"""
if function_points is None:
function_points = self.parser.extract_function_points()
test_items = []
total = len(function_points)
for idx, func_point in enumerate(function_points):
if progress_callback:
progress_callback(idx + 1, total, f"正在生成测试项: {func_point['function_name']}")
try:
# 构建功能描述
function_description = self._build_function_description(func_point)
# 格式化prompt
prompt = self.prompt_manager.format_prompt(
'test_item',
function_description=function_description,
module_name=func_point['module_name']
)
# 调用API
response = self.api_client.call_api(prompt)
# 解析响应
test_items_for_func = self._parse_test_items_response(response, func_point)
test_items.extend(test_items_for_func)
except Exception as e:
# 如果生成失败,创建一个默认测试项
test_items.append({
'id': f"TI-{len(test_items) + 1:03d}",
'name': f"{func_point['function_name']}功能测试",
'module_name': func_point['module_name'],
'function_name': func_point['function_name'],
'test_type': '功能测试',
'priority': '',
'test_objective': f"测试{func_point['function_name']}功能是否正常工作",
'error': str(e)
})
self.test_items = test_items
return test_items
def generate_test_cases(self, test_items: Optional[List[Dict[str, Any]]] = None,
progress_callback: Optional[Callable] = None) -> List[Dict[str, Any]]:
"""
为测试项生成测试用例
Args:
test_items: 测试项列表如果为None则使用已生成的测试项
progress_callback: 进度回调函数
Returns:
测试用例列表
"""
if test_items is None:
test_items = self.test_items
if not test_items:
raise ValueError("没有可用的测试项,请先生成测试项")
test_cases = []
total = len(test_items)
for idx, test_item in enumerate(test_items):
if progress_callback:
progress_callback(idx + 1, total, f"正在生成测试用例: {test_item['name']}")
try:
# 获取功能点信息
function_points = self.parser.extract_function_points()
func_point = next(
(fp for fp in function_points if fp['function_name'] == test_item.get('function_name', '')),
None
)
if not func_point:
continue
function_description = self._build_function_description(func_point)
# 格式化prompt
prompt = self.prompt_manager.format_prompt(
'test_case',
test_item_name=test_item['name'],
test_type=test_item.get('test_type', '功能测试'),
module_name=test_item['module_name'],
function_description=function_description
)
# 调用API
response = self.api_client.call_api(prompt)
# 解析响应
cases_for_item = self._parse_test_cases_response(response, test_item)
test_cases.extend(cases_for_item)
except Exception as e:
# 如果生成失败,创建一个默认测试用例
self.test_case_counter += 1
test_cases.append({
'id': f"TC-{self.test_case_counter:03d}",
'test_item_id': test_item.get('id', ''),
'name': f"{test_item['name']} - 基础功能验证",
'module_name': test_item['module_name'],
'preconditions': '系统正常运行',
'test_steps': ['步骤1打开功能模块', '步骤2执行操作', '步骤3验证结果'],
'expected_result': '功能正常工作',
'priority': test_item.get('priority', ''),
'test_type': test_item.get('test_type', '功能测试'),
'error': str(e)
})
self.test_cases = test_cases
return test_cases
def generate_batch(self, function_points: Optional[List[Dict[str, Any]]] = None,
progress_callback: Optional[Callable] = None) -> Dict[str, List[Dict[str, Any]]]:
"""
批量生成测试项和测试用例(一次性生成)
Args:
function_points: 功能点列表
progress_callback: 进度回调函数
Returns:
包含test_items和test_cases的字典
"""
if function_points is None:
function_points = self.parser.extract_function_points()
all_test_items = []
all_test_cases = []
total = len(function_points)
self.test_case_counter = 0
for idx, func_point in enumerate(function_points):
if progress_callback:
progress_callback(idx + 1, total, f"正在生成: {func_point['function_name']}")
try:
# 转换为requirement格式如果使用规范化Prompt
requirement = self._convert_to_requirement(func_point)
# 尝试使用规范化Prompt如果支持
if self.prompt_manager.use_standards:
prompt = self.prompt_manager.format_prompt(
'batch',
requirement=requirement
)
else:
# 使用传统方式
function_description = self._build_function_description(func_point)
prompt = self.prompt_manager.format_prompt(
'batch',
function_description=function_description,
module_name=func_point['module_name']
)
# 调用API
response = self.api_client.call_api(prompt)
# 解析批量响应
result = self._parse_batch_response(response, func_point)
all_test_items.extend(result['test_items'])
all_test_cases.extend(result['test_cases'])
except Exception as e:
# 创建默认测试项和测试用例
item_id = f"TI-{len(all_test_items) + 1:03d}"
all_test_items.append({
'id': item_id,
'name': f"{func_point['function_name']}功能测试",
'module_name': func_point['module_name'],
'function_name': func_point['function_name'],
'test_type': '功能测试',
'priority': '',
'test_objective': f"测试{func_point['function_name']}功能",
'error': str(e)
})
self.test_case_counter += 1
all_test_cases.append({
'id': f"TC-{self.test_case_counter:03d}",
'test_item_id': item_id,
'name': f"{func_point['function_name']} - 基础验证",
'module_name': func_point['module_name'],
'preconditions': '系统正常运行',
'test_steps': ['执行功能操作', '验证结果'],
'expected_result': '功能正常',
'priority': '',
'test_type': '功能测试',
'error': str(e)
})
self.test_items = all_test_items
self.test_cases = all_test_cases
return {
'test_items': all_test_items,
'test_cases': all_test_cases
}
def _build_function_description(self, func_point: Dict[str, Any]) -> str:
"""构建功能描述文本"""
parts = []
if func_point.get('module_description'):
parts.append(f"模块描述:{func_point['module_description']}")
if func_point.get('description'):
parts.append(f"功能描述:{func_point['description']}")
if func_point.get('operation_steps'):
parts.append(f"操作步骤:{''.join(func_point['operation_steps'])}")
return '\n'.join(parts) if parts else func_point.get('function_name', '')
def _parse_test_items_response(self, response: str, func_point: Dict[str, Any]) -> List[Dict[str, Any]]:
"""解析测试项API响应"""
try:
# 尝试提取JSON
json_str = self._extract_json_from_response(response)
data = json.loads(json_str)
test_items = []
for idx, item in enumerate(data.get('test_items', [])):
test_items.append({
'id': f"TI-{len(self.test_items) + len(test_items) + 1:03d}",
'name': item.get('name', ''),
'module_name': func_point['module_name'],
'function_name': func_point['function_name'],
'test_type': item.get('test_type', '功能测试'),
'priority': item.get('priority', ''),
'test_objective': item.get('test_objective', '')
})
return test_items if test_items else [self._create_default_test_item(func_point)]
except Exception:
return [self._create_default_test_item(func_point)]
def _parse_test_cases_response(self, response: str, test_item: Dict[str, Any]) -> List[Dict[str, Any]]:
"""解析测试用例API响应"""
try:
json_str = self._extract_json_from_response(response)
data = json.loads(json_str)
test_cases = []
for case in data.get('test_cases', []):
self.test_case_counter += 1
test_cases.append({
'id': f"TC-{self.test_case_counter:03d}",
'test_item_id': test_item.get('id', ''),
'name': case.get('name', ''),
'module_name': test_item['module_name'],
'preconditions': case.get('preconditions', ''),
'test_steps': case.get('test_steps', []),
'expected_result': case.get('expected_result', ''),
'priority': case.get('priority', test_item.get('priority', '')),
'test_type': case.get('test_type', test_item.get('test_type', '功能测试'))
})
return test_cases if test_cases else [self._create_default_test_case(test_item)]
except Exception:
return [self._create_default_test_case(test_item)]
def _parse_batch_response(self, response: str, func_point: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]:
"""解析批量生成API响应"""
try:
print("\n" + "-"*60)
print("[解析响应] 开始解析API响应...")
print(f"原始响应长度: {len(response)} 字符")
json_str = self._extract_json_from_response(response)
print(f"提取的JSON长度: {len(json_str)} 字符")
data = json.loads(json_str)
print(f"JSON解析成功")
print(f" - test_items数量: {len(data.get('test_items', []))}")
test_items = []
test_cases = []
for item_data in data.get('test_items', []):
item_id = f"TI-{len(self.test_items) + len(test_items) + 1:03d}"
test_item = {
'id': item_id,
'name': item_data.get('name', ''),
'module_name': func_point['module_name'],
'function_name': func_point['function_name'],
'test_type': item_data.get('test_type', '功能测试'),
'priority': item_data.get('priority', ''),
'test_objective': item_data.get('test_objective', '')
}
test_items.append(test_item)
print(f" ✓ 测试项: {test_item['name']}")
# 处理该测试项下的测试用例
cases_in_item = item_data.get('test_cases', [])
print(f" - 测试用例数量: {len(cases_in_item)}")
for case_data in cases_in_item:
self.test_case_counter += 1
test_cases.append({
'id': f"TC-{self.test_case_counter:03d}",
'test_item_id': item_id,
'name': case_data.get('name', ''),
'module_name': func_point['module_name'],
'preconditions': case_data.get('preconditions', ''),
'test_steps': case_data.get('test_steps', []),
'expected_result': case_data.get('expected_result', ''),
'priority': case_data.get('priority', test_item['priority']),
'test_type': case_data.get('test_type', test_item['test_type'])
})
if not test_items:
print(" ⚠️ 没有解析到test_items使用默认值")
test_item = self._create_default_test_item(func_point)
test_items.append(test_item)
test_cases.append(self._create_default_test_case(test_item))
print(f"[解析响应] 完成!共 {len(test_items)} 个测试项,{len(test_cases)} 个测试用例")
print("-"*60 + "\n")
return {
'test_items': test_items,
'test_cases': test_cases
}
except Exception as e:
print("\n" + "-"*60)
print(f"[解析响应] ❌ 解析失败!错误: {str(e)}")
print(f"原始响应前500字符: {response[:500]}")
print("-"*60 + "\n")
test_item = self._create_default_test_item(func_point)
return {
'test_items': [test_item],
'test_cases': [self._create_default_test_case(test_item)]
}
def _extract_json_from_response(self, response: str) -> str:
"""从API响应中提取JSON字符串"""
# 尝试直接解析
try:
json.loads(response)
return response
except:
pass
# 尝试提取代码块中的JSON
json_pattern = r'```(?:json)?\s*(\{.*?\})\s*```'
match = re.search(json_pattern, response, re.DOTALL)
if match:
return match.group(1)
# 尝试提取大括号中的内容
brace_match = re.search(r'\{.*\}', response, re.DOTALL)
if brace_match:
return brace_match.group(0)
raise ValueError("无法从响应中提取JSON")
def _create_default_test_item(self, func_point: Dict[str, Any]) -> Dict[str, Any]:
"""创建默认测试项"""
return {
'id': f"TI-{len(self.test_items) + 1:03d}",
'name': f"{func_point['function_name']}功能测试",
'module_name': func_point['module_name'],
'function_name': func_point['function_name'],
'test_type': '功能测试',
'priority': '',
'test_objective': f"测试{func_point['function_name']}功能是否正常工作"
}
def _create_default_test_case(self, test_item: Dict[str, Any]) -> Dict[str, Any]:
"""创建默认测试用例"""
self.test_case_counter += 1
return {
'id': f"TC-{self.test_case_counter:03d}",
'test_item_id': test_item.get('id', ''),
'name': f"{test_item['name']} - 基础功能验证",
'module_name': test_item['module_name'],
'preconditions': '系统正常运行',
'test_steps': ['步骤1打开功能模块', '步骤2执行操作', '步骤3验证结果'],
'expected_result': '功能正常工作',
'priority': test_item.get('priority', ''),
'test_type': test_item.get('test_type', '功能测试')
}
def _convert_to_requirement(self, func_point: Dict[str, Any]) -> Dict[str, Any]:
"""
将功能点转换为需求格式用于规范化Prompt
Args:
func_point: 功能点字典
Returns:
需求字典
"""
requirement = {
'requirement_id': func_point.get('requirement_id', func_point.get('function_name', '')),
'requirement_type': func_point.get('requirement_type', '功能需求'),
'description': func_point.get('description', func_point.get('function_name', '')),
'module_name': func_point.get('module_name', '')
}
# 如果有接口信息,添加进去
if func_point.get('interface_info'):
requirement['interface_info'] = func_point['interface_info']
return requirement

View File

@@ -0,0 +1,161 @@
# @line_count 150
"""测试规范加载器"""
import yaml
from pathlib import Path
from typing import List, Dict, Any, Optional
class TestStandardLoader:
"""测试规范加载器,负责从配置文件加载测试规范"""
def __init__(self, config_path: Optional[str] = None):
"""
初始化测试规范加载器
Args:
config_path: 配置文件路径默认为config/test_standards.yaml
"""
if config_path is None:
current_dir = Path(__file__).parent.parent
config_path = current_dir / "config" / "test_standards.yaml"
self.config_path = Path(config_path)
self.standards: List[Dict[str, Any]] = []
self.mapping_rules: Dict[str, Any] = {}
self.keyword_mapping: Dict[str, List[str]] = {}
self.load_config()
def load_config(self):
"""加载配置文件"""
if not self.config_path.exists():
raise FileNotFoundError(f"测试规范配置文件不存在: {self.config_path}")
with open(self.config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
self.standards = config.get('test_standards', [])
self.mapping_rules = config.get('requirement_mapping', {})
self.keyword_mapping = config.get('keyword_mapping', {})
def get_all_standards(self) -> List[Dict[str, Any]]:
"""
获取所有测试规范
Returns:
测试规范列表
"""
return self.standards
def get_standard_by_id(self, standard_id: str) -> Optional[Dict[str, Any]]:
"""
根据ID获取测试规范
Args:
standard_id: 测试规范ID
Returns:
测试规范字典如果不存在返回None
"""
for standard in self.standards:
if standard.get('id') == standard_id:
return standard
return None
def get_standards_by_ids(self, standard_ids: List[str]) -> List[Dict[str, Any]]:
"""
根据ID列表获取多个测试规范
Args:
standard_ids: 测试规范ID列表
Returns:
测试规范列表
"""
result = []
for standard_id in standard_ids:
standard = self.get_standard_by_id(standard_id)
if standard:
result.append(standard)
return result
def get_standard_summary(self, standard_id: str) -> str:
"""
获取测试规范的摘要
Args:
standard_id: 测试规范ID
Returns:
规范摘要文本
"""
standard = self.get_standard_by_id(standard_id)
if standard:
return standard.get('summary', '')
return ''
def get_standard_key_points(self, standard_id: str) -> List[str]:
"""
获取测试规范的关键测试点
Args:
standard_id: 测试规范ID
Returns:
关键测试点列表
"""
standard = self.get_standard_by_id(standard_id)
if standard:
return standard.get('key_points', [])
return []
def get_requirement_mapping(self) -> Dict[str, Any]:
"""
获取需求类型到测试规范的映射规则
Returns:
映射规则字典
"""
return self.mapping_rules
def get_keyword_mapping(self) -> Dict[str, List[str]]:
"""
获取关键词到测试规范的映射规则
Returns:
关键词映射字典
"""
return self.keyword_mapping
def format_standards_summary(self, standard_ids: List[str]) -> str:
"""
格式化多个测试规范的摘要用于Prompt
Args:
standard_ids: 测试规范ID列表
Returns:
格式化后的摘要文本
"""
if not standard_ids:
return ""
lines = []
for standard_id in standard_ids:
standard = self.get_standard_by_id(standard_id)
if not standard:
continue
name = standard.get('name', standard_id)
summary = standard.get('summary', '').strip()
key_points = standard.get('key_points', [])
lines.append(f"{name}")
if summary:
lines.append(summary)
if key_points:
lines.append("关键测试点:")
for i, point in enumerate(key_points, 1):
lines.append(f" {i}. {point}")
lines.append("") # 空行分隔
return "\n".join(lines)

View File

@@ -0,0 +1,196 @@
# @line_count 250
"""测试规范管理器整合规范加载、选择和Prompt构建"""
from typing import List, Dict, Any, Optional, TYPE_CHECKING
from .test_standard_loader import TestStandardLoader
from .standard_selector import StandardSelector
if TYPE_CHECKING:
from .api_client import APIClient
class TestStandardManager:
"""测试规范管理器负责管理测试规范、选择适用规范、构建Prompt"""
def __init__(self, loader: Optional[TestStandardLoader] = None,
selector: Optional[StandardSelector] = None,
api_client: Optional['APIClient'] = None):
"""
初始化测试规范管理器
Args:
loader: 测试规范加载器如果为None则创建新实例
selector: 规范选择器如果为None则创建新实例
api_client: API客户端用于AI辅助选择
"""
self.loader = loader or TestStandardLoader()
self.api_client = api_client
self.selector = selector or StandardSelector(self.loader, api_client)
def get_applicable_standards(self, requirement: Dict[str, Any],
use_ai: bool = True) -> List[str]:
"""
为需求获取适用的测试规范
Args:
requirement: 需求字典
use_ai: 是否使用AI辅助选择
Returns:
测试规范ID列表
"""
return self.selector.select_standards(requirement, use_ai=use_ai)
def build_prompt(self, requirement: Dict[str, Any],
standard_ids: Optional[List[str]] = None,
include_full_content: bool = False) -> str:
"""
构建包含测试规范的分层Prompt
Args:
requirement: 需求字典
standard_ids: 测试规范ID列表如果为None则自动选择
include_full_content: 是否包含完整规范内容(默认只包含摘要)
Returns:
格式化后的Prompt文本
"""
# 如果没有指定规范,自动选择
if standard_ids is None:
standard_ids = self.get_applicable_standards(requirement)
# Level 1: 测试规范摘要
standards_summary = self._build_standards_summary(standard_ids, include_full_content)
# Level 2: 需求信息
requirement_info = self._format_requirement(requirement)
# 构建完整Prompt
prompt = f"""你是一位资深的软件测试专家,请严格按照以下测试规范生成测试用例。
【适用的测试规范】
根据需求类型"{requirement.get('requirement_type', '未知')}",以下测试规范适用:
{standards_summary}
【需求信息】
{requirement_info}
【生成要求】
1. 必须覆盖上述测试规范中的所有关键测试点
2. 每个适用的测试规范至少生成1个测试项
3. 测试用例要详细、可执行,包含:
- 前置条件
- 详细测试步骤(编号清晰)
- 预期结果
- 优先级(高/中/低)
4. 确保覆盖正常功能、异常情况、边界条件
5. 每个测试项应标注引用的测试规范standard_reference字段
【输出格式】
以JSON格式输出格式如下
{{
"test_items": [
{{
"name": "测试项名称",
"test_type": "功能测试",
"priority": "",
"test_objective": "测试目标",
"standard_reference": "功能测试",
"test_cases": [
{{
"name": "测试用例名称",
"preconditions": "前置条件",
"test_steps": ["步骤1", "步骤2", "步骤3"],
"expected_result": "预期结果",
"priority": "",
"test_type": "功能测试"
}}
]
}}
]
}}"""
return prompt
def _build_standards_summary(self, standard_ids: List[str],
include_full: bool = False) -> str:
"""
构建测试规范摘要
Args:
standard_ids: 测试规范ID列表
include_full: 是否包含完整内容
Returns:
格式化后的规范摘要
"""
if not standard_ids:
return "无适用的测试规范"
return self.loader.format_standards_summary(standard_ids)
def _format_requirement(self, requirement: Dict[str, Any]) -> str:
"""
格式化需求信息
Args:
requirement: 需求字典
Returns:
格式化后的需求信息文本
"""
lines = []
lines.append(f"需求编号:{requirement.get('requirement_id', 'N/A')}")
lines.append(f"需求类型:{requirement.get('requirement_type', 'N/A')}")
lines.append(f"需求描述:{requirement.get('description', 'N/A')}")
module_name = requirement.get('module_name', '')
if module_name:
lines.append(f"所属模块:{module_name}")
# 接口信息(如果有)
interface_info = requirement.get('interface_info')
if interface_info:
lines.append("\n【接口信息】")
if interface_info.get('接口名称'):
lines.append(f"接口名称:{interface_info['接口名称']}")
if interface_info.get('接口类型'):
lines.append(f"接口类型:{interface_info['接口类型']}")
if interface_info.get('来源'):
lines.append(f"来源:{interface_info['来源']}")
if interface_info.get('目的地'):
lines.append(f"目的地:{interface_info['目的地']}")
return "\n".join(lines)
def get_standards_summary(self, standard_ids: List[str]) -> str:
"""
获取测试规范摘要(便捷方法)
Args:
standard_ids: 测试规范ID列表
Returns:
规范摘要文本
"""
return self.loader.format_standards_summary(standard_ids)
def confirm_test_types(self, requirement: Dict[str, Any],
candidate_standards: List[str]) -> List[str]:
"""
确认最终使用的测试类型两阶段生成策略的阶段1
Args:
requirement: 需求字典
candidate_standards: 候选测试规范ID列表
Returns:
确认后的测试规范ID列表
"""
# 如果候选类型不多(<=3直接使用
if len(candidate_standards) <= 3:
return candidate_standards
# 如果候选类型较多,可以进一步筛选
# 这里简化处理直接返回前5个
return candidate_standards[:5]