diff --git a/.github/skills/METHOD_ID_REGISTRY.md b/.github/skills/METHOD_ID_REGISTRY.md new file mode 100644 index 0000000..3d8fe1e --- /dev/null +++ b/.github/skills/METHOD_ID_REGISTRY.md @@ -0,0 +1,45 @@ +# METHOD_ID_REGISTRY + +## Purpose +Provide a single source of truth for test method identifiers used across skills. + +## Format +- Recommended output format: `Mxx|方法名` +- Routing key: `Mxx` +- Display name: 方法名 + +## B.1 Black-box Methods (M01-M12) +| Method ID | 方法名 | 推荐占位符 | Route Skill | +| --- | --- | --- | --- | +| M01 | 功能分解 | {{return_value}}, {{state_change}} | generate-test-cases-blackbox | +| M02 | 等价类划分 | {{return_value}}, {{error_message}} | generate-test-cases-blackbox | +| M03 | 边界值分析 | {{return_value}}, {{precision_tolerance}} | generate-test-cases-blackbox | +| M04 | 判定表 | {{state_change}}, {{sequence_event}} | generate-test-cases-blackbox | +| M05 | 因果图 | {{error_message}}, {{error_handling}} | generate-test-cases-blackbox | +| M06 | 场景法 | {{sequence_event}}, {{state_change}} | generate-test-cases-blackbox | +| M07 | 功能图法 | {{state_change}}, {{sequence_event}} | generate-test-cases-blackbox | +| M08 | 随机测试 | {{resource_usage}}, {{time_constraint}} | generate-test-cases-blackbox | +| M09 | 猜错法 | {{error_message}}, {{error_handling}} | generate-test-cases-blackbox | +| M10 | 正交试验法 | {{return_value}}, {{data_persistence}} | generate-test-cases-blackbox | +| M11 | 组合测试法 | {{return_value}}, {{sequence_event}} | generate-test-cases-blackbox | +| M12 | 蜕变测试法 | {{pass_criteria}}, {{precision_tolerance}} | generate-test-cases-blackbox | + +## B.2 White-box Methods (M13-M18) +| Method ID | 方法名 | 证据锚点 | Route Skill | +| --- | --- | --- | --- | +| M13 | 控制流测试 | 路径编号 | generate-test-cases-whitebox | +| M14 | 数据流测试 | DU对编号 | generate-test-cases-whitebox | +| M15 | 程序变异 | 变异体编号 | generate-test-cases-whitebox | +| M16 | 程序插桩 | 插桩点编号 | generate-test-cases-whitebox | +| M17 | 域测试 | 域编号 | generate-test-cases-whitebox | +| M18 | 符号求值 | 约束表达式编号 | generate-test-cases-whitebox | + +## Routing Rules +1. Use Method ID (`Mxx`) as the only routing key. +2. `M01-M12` route to `generate-test-cases-blackbox`. +3. `M13-M18` route to `generate-test-cases-whitebox`. +4. Unknown IDs should be recorded in `method_alignment_report.gaps`. + +## Backward Compatibility +- If input methods are provided as plain method names, map them to Method IDs before routing. +- Keep Chinese method names for readability in `case_summary`. diff --git a/.github/skills/build-expected-results/SKILL.md b/.github/skills/build-expected-results/SKILL.md index 0a21c33..5f53749 100644 --- a/.github/skills/build-expected-results/SKILL.md +++ b/.github/skills/build-expected-results/SKILL.md @@ -1,6 +1,6 @@ --- name: build-expected-results -description: "当需要将测试用例中的 expected_result_placeholder 展开为可度量预期成果时使用。" +description: "当测试用例包含 expected_result_placeholder 时使用,将其展开为可验证、可量化、可判定通过/失败的预期成果集合。" --- # build-expected-results diff --git a/.github/skills/decompose-test-items/SKILL.md b/.github/skills/decompose-test-items/SKILL.md index d122dbd..d23118f 100644 --- a/.github/skills/decompose-test-items/SKILL.md +++ b/.github/skills/decompose-test-items/SKILL.md @@ -1,6 +1,6 @@ --- name: decompose-test-items -description: "当需要基于需求类型把需求文本分解为可执行的正常/异常测试项时使用。" +description: "当已获得 requirement_type 后,需将需求文本拆解为可执行的正常/异常测试项并输出 coverage_analysis(覆盖率、缺口、补充建议)时使用。" --- # decompose-test-items @@ -11,8 +11,7 @@ description: "当需要基于需求类型把需求文本分解为可执行的正 ## 输入 - user_requirement_text - requirement_type -- recommended_test_methods(可选) -- suggested_decompose_template(可选) +- recommended_test_methods(可选,格式建议 `Mxx|方法名`) ## 输出 - normal_test_items:完整、可执行的正常测试项列表。 diff --git a/.github/skills/format-output/SKILL.md b/.github/skills/format-output/SKILL.md index 307176e..76e1ec6 100644 --- a/.github/skills/format-output/SKILL.md +++ b/.github/skills/format-output/SKILL.md @@ -1,6 +1,6 @@ --- name: format-output -description: "当需要将测试项、测试用例、预期成果按统一格式输出时使用。" +description: "当需将测试项、测试用例、预期成果统一整理为三段式 Markdown 输出时使用,要求正常/异常分组且编号追踪一致。" --- # format-output diff --git a/.github/skills/generate-test-cases-blackbox/SKILL.md b/.github/skills/generate-test-cases-blackbox/SKILL.md new file mode 100644 index 0000000..78d2513 --- /dev/null +++ b/.github/skills/generate-test-cases-blackbox/SKILL.md @@ -0,0 +1,59 @@ +--- +name: generate-test-cases-blackbox +description: "当 recommended_test_methods 命中 M01-M12 时使用,按黑盒方法特征生成正常/异常测试用例并输出 method_alignment_report。" +--- + +# generate-test-cases-blackbox + +## 目标 +对输入测试项应用黑盒方法(M01-M12),生成可重复执行、可验证的测试用例。 + +## 输入 +- normal_test_items +- abnormal_test_items +- requirement_type(可选) +- recommended_test_methods(可选,格式建议 `Mxx|方法名`) + +## 输出 +- normal_test_cases +- abnormal_test_cases +- method_alignment_report + +## 黑盒方法范围 +- M01 功能分解 +- M02 等价类划分 +- M03 边界值分析 +- M04 判定表 +- M05 因果图 +- M06 场景法 +- M07 功能图法 +- M08 随机测试 +- M09 猜错法 +- M10 正交试验法 +- M11 组合测试法 +- M12 蜕变测试法 + +## 通用规则 +1. 每个测试项至少绑定一个黑盒方法;高风险测试项建议绑定两种方法。 +2. `test_inputs` 必须反映方法特征(类编号、边界点、规则号、场景路径、组合强度、随机参数等)。 +3. `operation_steps` 必须体现方法步骤链,禁止空泛表述。 +4. `case_summary` 必须标注 `Mxx|方法名`。 +5. 每条用例应优先使用黑盒占位符:`{{return_value}}`、`{{state_change}}`、`{{error_message}}`。 + +## 方法详细说明(M01-M12) +- M01 功能分解:适用于规格说明可分层拆解的场景。先按功能抽象把程序分解为功能层次和最低层子功能,再按数据抽象为每个子功能的输入/输出取值集合设计数据。`test_inputs` 需显式包含子功能标识、前置依赖和数据抽象来源,`operation_steps` 按“分层覆盖 -> 子功能执行 -> I/O 校验”编写。 +- M02 等价类划分:在规格约束基础上划分有效等价类与无效等价类,并为每个等价类分配唯一编号。每条用例至少映射一个目标类,建议用“类编号+代表值+预期策略”组织 `test_inputs`。执行时应同时验证有效类的主功能正确性和无效类的错误处理一致性。 +- M03 边界值分析:先识别全部边界条件,再为每个边界给出满足边界和不满足边界的数据。建议采用 `L-1/L/L+1` 与 `U-1/U/U+1`,并对多维边界补充组合边界样本。预期结果需区分“计算错误”(边界内)和“域错误”(边界外),作为 `expected_results` 的判定要点。 +- M04 判定表:以“条件桩、条件条目、动作桩、动作条目”构建规则表,并将每一列规则转化为测试用例。适用前提包括:条件/规则顺序不影响操作、规则命中后无需再检验其他规则、多动作执行顺序无关。`operation_steps` 需记录规则命中过程,`expected_results` 需核验动作集合完整且无多执行。 +- M05 因果图:先识别原因(输入)与结果(输出)并编号,再构建因果关系和约束,随后转换为判定表并逐列生成用例。适用于输入组合关系复杂的需求;若因果图过大导致用例爆炸,应在 `gaps` 中说明并采用降级策略(如关键链路抽样)。用例中应保留原因/结果编号,保证可追踪。 +- M06 场景法:以用况路径为单位覆盖基本流与备选流,形成从开始到结束的可执行场景。步骤应包括:确定流、组合场景、设计用例、审验并去冗余、为用例补充测试数据。`operation_steps` 要体现场景触发条件和路径切换点,避免只写主流程。 +- M07 功能图法:使用“状态转移图 + 逻辑功能模型”联合建模。先在每个状态内用因果图/判定表生成局部用例,再由状态转移图导出路径用例,最后合成实用测试用例。`test_inputs` 需同时体现状态路径和状态内输入条件,`expected_results` 要同时验证迁移结果与局部逻辑输出。 +- M08 随机测试:在输入取值区间上随机取样,并在必要时约束随机分布与种子以保证可复现。该方法在预期输出难以精确构造时更常用于可靠性测试和强度测试。`method_alignment_report` 需包含采样范围、分布、样本量和失败样本聚类结论。 +- M09 猜错法:由有经验测试人员基于历史缺陷和易错情况表设计用例。适合补齐规格难覆盖的风险点(如空值、边界格式、异常时序、重复操作)。用例应明确“猜错依据”,并在结果中验证防护是否生效。 +- M10 正交试验法:把影响功能实现的操作对象和外部因素作为因子,把因子取值作为水平,构造因素分析表并选用正交表进行代表性组合。目标是在减少用例规模的同时保持较高覆盖效率。建议在 `test_inputs` 中保留因子-水平映射,便于回归和复现。 +- M11 组合测试法:面向多参数输入生成组合用例集,并按组合强度评估覆盖充分性。常见强度包括单一选择、基本选择、成对组合、全组合和 K 强度组合。黑盒场景中组合参数通常是外部输入,需在用例中标注强度等级和约束条件。 +- M12 蜕变测试法:适用于单个用例预期结果难判定的场景。通过构造蜕变关系验证“源用例与跟随用例”的结果是否满足关系;不满足即判定失败。该方法可与其他方法联合:既可用于结果验证,也可生成补充用例。 + +## 对齐输出要求 +- 在 `method_alignment_report` 中记录:`case_id`、`selected_methods`、`alignment_score`、`gaps`、`fix_suggestions`。 +- 无法落地的方法必须进入 `gaps` 并提供可执行修复建议。 diff --git a/.github/skills/generate-test-cases-whitebox/SKILL.md b/.github/skills/generate-test-cases-whitebox/SKILL.md new file mode 100644 index 0000000..4cac0a4 --- /dev/null +++ b/.github/skills/generate-test-cases-whitebox/SKILL.md @@ -0,0 +1,47 @@ +--- +name: generate-test-cases-whitebox +description: "当 recommended_test_methods 命中 M13-M18 时使用,按路径/数据/变异/插桩证据链生成可追踪测试用例并输出 method_alignment_report。" +--- + +# generate-test-cases-whitebox + +## 目标 +对输入测试项应用白盒方法(M13-M18),生成带可追踪执行证据的测试用例。 + +## 输入 +- normal_test_items +- abnormal_test_items +- requirement_type(可选) +- recommended_test_methods(可选,格式建议 `Mxx|方法名`) + +## 输出 +- normal_test_cases +- abnormal_test_cases +- method_alignment_report + +## 白盒方法范围 +- M13 控制流测试 +- M14 数据流测试 +- M15 程序变异 +- M16 程序插桩 +- M17 域测试 +- M18 符号求值 + +## 通用规则 +1. 白盒用例必须包含证据锚点(路径编号、DU对、变异体、插桩点、域编号、约束编号)。 +2. `test_inputs` 应能触发目标证据链,禁止只描述功能输入。 +3. `operation_steps` 必须包含证据采集或比对动作。 +4. `case_summary` 必须标注 `Mxx|方法名`。 +5. 白盒用例可结合 `{{pass_criteria}}`、`{{resource_usage}}`、`{{state_change}}` 进行结果绑定。 + +## 方法详细说明(M13-M18) +- M13 控制流测试:依据控制流图生成用例,目标是覆盖不同控制结构。常用覆盖包括语句覆盖、分支覆盖、条件覆盖、条件组合覆盖、MC/DC 和路径覆盖。标准步骤为:流程图转控制流图、路径表达式求解、路径树生成、路径编码与译码、按目标路径枚举测试用例。`method_alignment_report` 应记录覆盖准则与未覆盖路径原因。 +- M14 数据流测试:在控制流图上分析变量定义、使用和消除,重点发现“未定义就使用”与“定义后未使用”等数据流异常。建议步骤为:构建控制流图、标注链路数据操作符、选择数据流覆盖策略、导出测试路径、再生成输入与用例。除静态分析外,可结合动态数据流检查提高真实性,但要说明路径覆盖局限。 +- M15 程序变异:以错误驱动方式定义变异算子并生成语法合法的变异体,通过测试数据逐个执行变异体判断检测能力。可区分强变异与弱变异,并在报告中统计“杀死率、存活率、等价变异占比”。该方法既用于找缺陷,也用于评价现有测试用例集的有效性。 +- M16 程序插桩:通过在程序中插入观测操作实现证据采集,但不得改变被测程序原有功能与运行语义。建议步骤为:定位插桩点、插入采样逻辑、执行测试、回收与分析数据、移除或关闭插桩。由于数据记录量通常较大,宜借助工具完成并在报告中注明插桩开销与可信度。 +- M17 域测试:目标是验证程序对输入空间划分是否正确,重点检查域内、边界和跨域样本的判定一致性。该方法约束较多、使用门槛较高,通常用于有特殊质量要求的场景。用例中应保留域划分依据与判定规则,避免仅给样本值不说明域归属。 +- M18 符号求值:允许变量取符号值,通过符号执行检查公式与分支约束是否满足预期,并可导出程序路径用于生成测试数据。适用于公式密集或约束明确的逻辑。复杂分支推荐工具化处理,分支较少时可人工推导并在用例中记录约束来源与推导过程。 + +## 对齐输出要求 +- 在 `method_alignment_report` 中记录:`case_id`、`selected_methods`、`alignment_score`、`gaps`、`fix_suggestions`。 +- 对无法构造证据链的方法,必须输出 `gaps` 与降级策略说明。 diff --git a/.github/skills/generate-test-cases/SKILL.md b/.github/skills/generate-test-cases/SKILL.md index 51c2b81..a9c2e41 100644 --- a/.github/skills/generate-test-cases/SKILL.md +++ b/.github/skills/generate-test-cases/SKILL.md @@ -1,6 +1,6 @@ --- name: generate-test-cases -description: "当需要根据已分解测试项生成包含操作步骤与测试内容的具体测试用例时使用。" +description: "当已有 normal/abnormal 测试项且需按统一规范生成可执行测试用例时使用;不负责测试方法选择,输出测试用例与合规校验报告。" --- # generate-test-cases @@ -12,12 +12,11 @@ description: "当需要根据已分解测试项生成包含操作步骤与测试 - normal_test_items - abnormal_test_items - requirement_type(可选) -- recommended_test_methods(可选) ## 输出 - normal_test_cases - abnormal_test_cases -- method_alignment_report +- case_compliance_report 每条测试用例必须包含: - case_id @@ -39,12 +38,24 @@ description: "当需要根据已分解测试项生成包含操作步骤与测试 3. 必须区分正常测试用例与异常测试用例。 4. 操作步骤应可顺序执行,避免歧义。 5. 操作步骤必须包含明确动作、对象和输入条件,禁止笼统动作词。 -6. test_content 必须包含可验证条件,便于后续生成可度量预期成果。 +6. `case_summary` 与 `evaluation_criteria` 必须包含可验证条件,便于后续生成可度量预期成果。 7. 测试用例应符合可重复执行原则,初始条件和参数必须可复现。 8. 测试输入必须标识有效值/无效值/边界值性质、输入来源、真实或模拟属性及事件顺序。 9. 每个操作步骤应包含测试输入、动作、中间期望、评估准则与异常终止信号。 10. pass_criteria 必须明确是否通过的判定条件,禁止模糊描述。 +## 统一生成规范(与测试方法无关) +1. 字段完整性:每条用例必须包含本 Skill 规定的全部字段,缺失字段不得省略,应给出“待补充”占位。 +2. 可执行性:`operation_steps` 必须按可顺序执行的步骤组织,单步只表达一个核心动作,避免多动作混写。 +3. 可复现性:`initialization_requirements`、`test_inputs`、`preconditions_constraints` 必须足以让第三方复现实验。 +4. 可观测性:每条步骤必须关联可观测结果或中间检查点,避免只描述操作不描述观测。 +5. 可判定性:`evaluation_criteria` 与 `pass_criteria` 必须给出明确判定阈值或布尔条件,禁止模糊词。 +6. 异常完备性:异常用例必须包含异常触发条件、预期保护动作、终止条件与恢复后状态要求。 +7. 去歧义:字段中禁止“适当”“正常”“合理”等无量纲表述,需替换为可验证条件。 +8. 去重一致性:相同测试项下语义重复的用例应合并,保留最完整版本并保证编号稳定。 +9. 追踪关系:`test_traceability` 必须指向来源测试项,保证“测试项 -> 测试用例”可追踪。 +10. 结果衔接:`expected_result_placeholder` 必须可直接用于下一步预期成果展开,不得出现不可映射占位符。 + ## expected_result_placeholder 映射(用于 Step 4 展开) - {{return_value}}:接口或函数返回值验证。 - {{state_change}}:系统状态变化验证。 @@ -59,30 +70,6 @@ description: "当需要根据已分解测试项生成包含操作步骤与测试 - {{resource_usage}}:资源占用与空间约束验证。 - {{pass_criteria}}:用例通过准则验证。 -## 常用测试方法应用清单(B.1.1-B.2.6) -每个用例必须在 summary 中标注所用方法,并在 steps 中体现方法特征。 - -| 方法 | 适用场景 | 输入构造规则 | 步骤设计特征 | -| --- | --- | --- | --- | -| 功能分解 | 功能/接口主流程 | 按子功能拆输入集 | 用例按子功能逐级覆盖 | -| 等价类划分 | 功能/接口/边界 | 有效类与无效类分别取代表值 | 每类至少一条步骤 | -| 边界值分析 | 输入输出边界 | 取边界、邻近、越界值 | 连续执行边界点序列 | -| 判定表 | 多条件组合逻辑 | 依据条件桩生成规则列 | 一列规则对应一条步骤流 | -| 因果图 | 条件与结果耦合 | 构建原因-结果及约束 | 覆盖关键因果链路 | -| 场景法 | 事件驱动流程 | 基本流与备选流输入 | 按场景路径组织步骤 | -| 功能图法 | 状态与逻辑联合 | 状态转移输入序列 | 状态路径+局部逻辑组合 | -| 随机测试 | 可靠性/强度 | 输入区间+概率分布 | 指定随机规则和样本量 | -| 猜错法 | 高风险经验缺陷 | 构造易错输入集合 | 直接验证高风险点 | -| 正交试验法 | 多因子组合优化 | 因子-水平表+正交表 | 覆盖代表组合点 | -| 组合测试法 | 参数组合覆盖 | 选定组合强度(pairwise/K) | 按组合强度生成步骤 | -| 蜕变测试法 | 预期结果难判定 | 构造蜕变关系输入组 | 校验用例间关系一致性 | -| 控制流测试 | 白盒流程覆盖 | 按路径目标构造输入 | 步骤对应语句/分支路径 | -| 数据流测试 | 白盒变量使用 | 定义-使用链路输入 | 覆盖关键定义引用对 | -| 程序变异 | 错误驱动验证 | 针对变异体生成输入 | 验证是否杀死变异体 | -| 程序插桩 | 运行行为观测 | 插桩点对应输入 | 步骤包含采样与比对 | -| 域测试 | 输入空间划分检验 | 构造域边界/域内样本 | 验证域划分正确性 | -| 符号求值 | 公式与路径推导 | 依据符号约束反推输入 | 校验符号关系与结果 | - ## 禁止模糊描述 - 错误示例:"检查功能正常";正确示例:"验证返回状态码为200且响应体包含status=success"。 - 错误示例:"输入合法数据";正确示例:"在用户名输入框输入长度为8的字母数字字符串并提交"。 @@ -92,5 +79,5 @@ description: "当需要根据已分解测试项生成包含操作步骤与测试 - 每条用例必须可在下一步绑定一条明确、可验证的预期成果。 - 每条 expected_result_placeholder 必须可映射到定量或可观察的检查项。 -## 对齐校验 -- 输出 method_alignment_report,至少包含:case_id、selected_methods、alignment_score、gaps、fix_suggestions。 +## 合规校验输出 +- 输出 case_compliance_report,至少包含:case_id、completeness_score、executability_score、ambiguity_flags、gaps、fix_suggestions。 diff --git a/.github/skills/identify-requirement-type/SKILL.md b/.github/skills/identify-requirement-type/SKILL.md index c24ee31..8fd78ab 100644 --- a/.github/skills/identify-requirement-type/SKILL.md +++ b/.github/skills/identify-requirement-type/SKILL.md @@ -1,12 +1,12 @@ --- name: identify-requirement-type -description: "当需要在测试项分解与测试用例生成之前识别需求类型时使用。" +description: "当输入为原始需求文本且需判定测试需求主类型(含未知回退)、候选类型与推荐方法ID(Mxx|方法名)时使用。" --- # identify-requirement-type ## 目标 -将用户需求文本识别为明确的测试需求类型,为后续测试项分解与测试用例生成提供分类依据、推荐测试方法与分解模板。 +将用户需求文本识别为明确的测试需求类型,为后续测试项分解与测试用例生成提供分类依据与推荐测试方法。 ## 输入 - user_requirement_text:用户原始需求文本。 @@ -31,8 +31,7 @@ description: "当需要在测试项分解与测试用例生成之前识别需求 - 未知类型 - reason:简要判断依据。 - candidates:当 requirement_type 为未知类型时,给出 1-3 个最接近候选类型。 -- recommended_test_methods:基于类型推荐的测试方法列表(按优先级排序)。 -- suggested_decompose_template:推荐的测试项分解模板名称。 +- recommended_test_methods:基于类型推荐的测试方法列表(按优先级排序),格式为 `Mxx|方法名`。 - type_signals:触发该类型判断的关键语义信号。 ## 类型识别信号 @@ -51,24 +50,24 @@ description: "当需要在测试项分解与测试用例生成之前识别需求 - 敏感性测试:关注有效输入类中可能引发不稳定或不正常处理的数据组合。 - 测试充分性要求:关注需求覆盖率、配置项覆盖、语句覆盖、分支覆盖及未覆盖分析确认。 -## 附录 A:类型到方法与模板映射 -| 需求类型 | 推荐测试方法(优先顺序) | 推荐分解模板 | -| --- | --- | --- | -| 功能测试 | 功能分解, 等价类划分, 边界值分析, 场景法 | functional-standard-template | -| 性能测试 | 边界值分析, 组合测试法, 随机测试, 正交试验法 | performance-metric-template | -| 外部接口测试 | 等价类划分, 边界值分析, 判定表, 因果图 | external-interface-template | -| 人机交互界面测试 | 场景法, 猜错法, 等价类划分, 边界值分析 | hmi-flow-template | -| 强度测试 | 随机测试, 组合测试法, 边界值分析 | stress-limit-template | -| 余量测试 | 边界值分析, 组合测试法 | margin-capacity-template | -| 可靠性测试 | 随机测试, 场景法, 组合测试法, 蜕变测试法 | reliability-profile-template | -| 安全性测试 | 边界值分析, 场景法, 猜错法, 因果图 | safety-hazard-template | -| 恢复性测试 | 场景法, 功能图法, 猜错法 | recovery-fault-template | -| 边界测试 | 边界值分析, 等价类划分, 组合测试法 | boundary-focused-template | -| 安装性测试 | 场景法, 猜错法 | install-config-template | -| 互操作性测试 | 场景法, 组合测试法, 因果图 | interoperability-template | -| 敏感性测试 | 组合测试法, 正交试验法, 随机测试, 猜错法 | sensitivity-combination-template | -| 测试充分性要求 | 控制流测试, 数据流测试, 程序变异, 程序插桩 | adequacy-coverage-template | -| 未知类型 | 功能分解, 等价类划分, 边界值分析 | generic-fallback-template | +## 附录 A:类型到方法映射 +| 需求类型 | 推荐测试方法(优先顺序,方法ID+中文名) | +| --- | --- | +| 功能测试 | M01|功能分解, M02|等价类划分, M03|边界值分析, M06|场景法, M04|判定表, M05|因果图, M11|组合测试法, M09|猜错法 | +| 性能测试 | M03|边界值分析, M11|组合测试法, M10|正交试验法, M08|随机测试, M06|场景法, M07|功能图法, M16|程序插桩 | +| 外部接口测试 | M02|等价类划分, M03|边界值分析, M04|判定表, M05|因果图, M11|组合测试法, M06|场景法, M09|猜错法 | +| 人机交互界面测试 | M06|场景法, M09|猜错法, M02|等价类划分, M03|边界值分析, M11|组合测试法, M04|判定表 | +| 强度测试 | M08|随机测试, M11|组合测试法, M03|边界值分析, M10|正交试验法, M06|场景法, M16|程序插桩 | +| 余量测试 | M03|边界值分析, M11|组合测试法, M06|场景法, M10|正交试验法, M08|随机测试 | +| 可靠性测试 | M08|随机测试, M06|场景法, M11|组合测试法, M12|蜕变测试法, M09|猜错法, M16|程序插桩, M07|功能图法 | +| 安全性测试 | M03|边界值分析, M06|场景法, M09|猜错法, M05|因果图, M04|判定表, M11|组合测试法, M07|功能图法 | +| 恢复性测试 | M06|场景法, M07|功能图法, M09|猜错法, M11|组合测试法, M04|判定表, M16|程序插桩 | +| 边界测试 | M03|边界值分析, M02|等价类划分, M17|域测试, M11|组合测试法, M04|判定表, M06|场景法 | +| 安装性测试 | M06|场景法, M09|猜错法, M11|组合测试法, M02|等价类划分, M03|边界值分析 | +| 互操作性测试 | M06|场景法, M11|组合测试法, M05|因果图, M04|判定表, M07|功能图法, M09|猜错法 | +| 敏感性测试 | M11|组合测试法, M10|正交试验法, M08|随机测试, M09|猜错法, M03|边界值分析, M12|蜕变测试法 | +| 测试充分性要求 | M13|控制流测试, M14|数据流测试, M15|程序变异, M16|程序插桩, M17|域测试, M18|符号求值, M11|组合测试法 | +| 未知类型 | M01|功能分解, M02|等价类划分, M03|边界值分析, M06|场景法, M11|组合测试法, M09|猜错法 | ## 规则 1. 优先依据需求文本中的显式表述进行分类。 @@ -77,13 +76,14 @@ description: "当需要在测试项分解与测试用例生成之前识别需求 4. 判断依据需简洁、可追溯到文本证据。 5. 若需求同时覆盖多个类型,输出主类型 requirement_type,并将次类型放入 candidates。 6. recommended_test_methods 至少返回 2 个方法,优先返回文档中可直接执行的方法。 -7. suggested_decompose_template 必须与 requirement_type 一致。 +7. 方法标识以 Method ID(Mxx)为路由主键,中文方法名仅用于可读性展示。 ## 容错 - 当需求描述过于笼统或跨多类型混合时,输出未知类型,并在 candidates 给出最接近类型。 - 当识别不稳定时,优先保守分类,不强行归入单一类型。 - 未知类型不阻断后续流程,应继续执行通用测试项分解。 - 当文本信息不足以支持白盒方法推荐时,保留黑盒优先推荐并标记 reason。 +- 当仅能识别到中文方法名时,先映射为 Method ID 后再输出。 ## 调试 - debug 模式下返回每个类型的分类分数 classification_scores。 diff --git a/.github/skills/testing-orchestrator/SKILL.md b/.github/skills/testing-orchestrator/SKILL.md index a06beab..719fa69 100644 --- a/.github/skills/testing-orchestrator/SKILL.md +++ b/.github/skills/testing-orchestrator/SKILL.md @@ -1,6 +1,6 @@ --- name: testing-orchestrator -description: "当用户要求测试项分解或测试用例生成且需要完整工具调用链时使用。" +description: "当需要从需求文本一键生成测试项、测试用例与预期成果时使用,按 identify→decompose→generate→build→format 全链路编排并透传上下文。" --- # testing-orchestrator @@ -31,15 +31,13 @@ description: "当用户要求测试项分解或测试用例生成且需要完整 - requirement_type - reason - candidates - - recommended_test_methods - - suggested_decompose_template + - recommended_test_methods(格式:`Mxx|方法名`) ### Step 2: decompose-test-items - 输入: - user_requirement_text - requirement_type(来自 Step 1) - recommended_test_methods(来自 Step 1) - - suggested_decompose_template(来自 Step 1) - 输出: - normal_test_items - abnormal_test_items @@ -56,6 +54,13 @@ description: "当用户要求测试项分解或测试用例生成且需要完整 - abnormal_test_cases - method_alignment_report +#### Step 3 路由规则(按需调用) +1. 从 `recommended_test_methods` 提取 Method ID(`Mxx`)。 +2. 若命中 `M01-M12`,调用 `generate-test-cases-blackbox`。 +3. 若命中 `M13-M18`,调用 `generate-test-cases-whitebox`。 +4. 同一测试项命中黑盒+白盒时,并行生成后去重合并。 +5. 合并后统一输出 `case_id` 与 `method_alignment_report`,并记录未落地方法到 `gaps`。 + ### Step 4: build-expected-results - 输入: - normal_test_cases(来自 Step 3) diff --git a/rag-web-ui/backend/alembic/versions/b7217f0c3d92_add_interface_fields_to_srs_requirements.py b/rag-web-ui/backend/alembic/versions/b7217f0c3d92_add_interface_fields_to_srs_requirements.py new file mode 100644 index 0000000..8fd6435 --- /dev/null +++ b/rag-web-ui/backend/alembic/versions/b7217f0c3d92_add_interface_fields_to_srs_requirements.py @@ -0,0 +1,34 @@ +"""add interface fields to srs requirements + +Revision ID: b7217f0c3d92 +Revises: a4f9c89b7d11 +Create Date: 2026-04-18 19:10:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "b7217f0c3d92" +down_revision: Union[str, None] = "a4f9c89b7d11" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column("srs_requirements", sa.Column("section_uid", sa.String(length=64), nullable=True)) + op.add_column("srs_requirements", sa.Column("interface_name", sa.String(length=255), nullable=True)) + op.add_column("srs_requirements", sa.Column("interface_type", sa.String(length=128), nullable=True)) + op.add_column("srs_requirements", sa.Column("data_source", sa.String(length=255), nullable=True)) + op.add_column("srs_requirements", sa.Column("data_destination", sa.String(length=255), nullable=True)) + + +def downgrade() -> None: + op.drop_column("srs_requirements", "data_destination") + op.drop_column("srs_requirements", "data_source") + op.drop_column("srs_requirements", "interface_type") + op.drop_column("srs_requirements", "interface_name") + op.drop_column("srs_requirements", "section_uid") diff --git a/rag-web-ui/backend/alembic/versions/c9f6e7a1bd34_add_testing_generation_history_table.py b/rag-web-ui/backend/alembic/versions/c9f6e7a1bd34_add_testing_generation_history_table.py new file mode 100644 index 0000000..b7a0c9e --- /dev/null +++ b/rag-web-ui/backend/alembic/versions/c9f6e7a1bd34_add_testing_generation_history_table.py @@ -0,0 +1,59 @@ +"""add testing generation history table + +Revision ID: c9f6e7a1bd34 +Revises: b7217f0c3d92 +Create Date: 2026-04-26 20:30:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "c9f6e7a1bd34" +down_revision: Union[str, None] = "b7217f0c3d92" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "testing_generations", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("job_id", sa.Integer(), nullable=False), + sa.Column("source_job_id", sa.Integer(), nullable=True), + sa.Column("source_document_name", sa.String(length=255), nullable=False), + sa.Column("generated_at", sa.DateTime(), nullable=False), + sa.Column("total_requirements", sa.Integer(), nullable=False, server_default="0"), + sa.Column("knowledge_base_id", sa.Integer(), nullable=True), + sa.Column("generated_file", sa.JSON(), nullable=False), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(["job_id"], ["tool_jobs.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["knowledge_base_id"], ["knowledge_bases.id"]), + sa.ForeignKeyConstraint(["source_job_id"], ["tool_jobs.id"]), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("job_id"), + ) + op.create_index(op.f("ix_testing_generations_id"), "testing_generations", ["id"], unique=False) + op.create_index( + op.f("ix_testing_generations_source_job_id"), + "testing_generations", + ["source_job_id"], + unique=False, + ) + op.create_index( + op.f("ix_testing_generations_knowledge_base_id"), + "testing_generations", + ["knowledge_base_id"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index(op.f("ix_testing_generations_knowledge_base_id"), table_name="testing_generations") + op.drop_index(op.f("ix_testing_generations_source_job_id"), table_name="testing_generations") + op.drop_index(op.f("ix_testing_generations_id"), table_name="testing_generations") + op.drop_table("testing_generations") diff --git a/rag-web-ui/backend/app/api/api_v1/testing.py b/rag-web-ui/backend/app/api/api_v1/testing.py index db5e58d..ddcdf20 100644 --- a/rag-web-ui/backend/app/api/api_v1/testing.py +++ b/rag-web-ui/backend/app/api/api_v1/testing.py @@ -1,6 +1,8 @@ +import logging +import asyncio from typing import Any, Dict, List -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from app.core.config import settings @@ -15,6 +17,8 @@ from app.services.testing_pipeline import run_testing_pipeline from app.services.vector_store import VectorStoreFactory router = APIRouter() +logger = logging.getLogger(__name__) +MODEL_PIPELINE_TIMEOUT_SECONDS = 300 async def _build_kb_vector_stores(db: Session, knowledge_bases: List[KnowledgeBase]) -> List[Dict[str, Any]]: @@ -47,38 +51,75 @@ async def generate_testing_content( knowledge_context = (payload.knowledge_context or "").strip() if payload.knowledge_base_ids: - knowledge_bases = ( - db.query(KnowledgeBase) - .filter( - KnowledgeBase.id.in_(payload.knowledge_base_ids), - KnowledgeBase.user_id == current_user.id, + try: + knowledge_bases = ( + db.query(KnowledgeBase) + .filter( + KnowledgeBase.id.in_(payload.knowledge_base_ids), + KnowledgeBase.user_id == current_user.id, + ) + .all() ) - .all() + + kb_vector_stores = await _build_kb_vector_stores(db, knowledge_bases) + if kb_vector_stores: + retriever = MultiKBRetriever( + reranker_weight=settings.RERANKER_WEIGHT, + ) + retrieval_rows = await retriever.retrieve( + query=payload.requirement_text, + kb_vector_stores=kb_vector_stores, + fetch_k_per_kb=max(12, payload.retrieval_top_k * 2), + top_k=payload.retrieval_top_k, + ) + if retrieval_rows: + knowledge_context = format_retrieval_context(retrieval_rows) + except Exception as exc: + logger.exception( + "Testing generation retrieval fallback triggered for user=%s knowledge_base_ids=%s: %s", + current_user.id, + payload.knowledge_base_ids, + exc, + ) + + pipeline_kwargs = { + "user_requirement_text": payload.requirement_text, + "requirement_type_input": payload.requirement_type, + "debug": payload.debug, + "knowledge_context": knowledge_context, + "use_model_generation": payload.use_model_generation, + "max_items_per_group": payload.max_items_per_group, + "cases_per_item": payload.cases_per_item, + "max_focus_points": payload.max_focus_points, + "max_llm_calls": payload.max_llm_calls, + } + + try: + result = await asyncio.wait_for( + asyncio.to_thread(run_testing_pipeline, **pipeline_kwargs), + timeout=MODEL_PIPELINE_TIMEOUT_SECONDS, ) + except asyncio.TimeoutError as exc: + logger.exception( + "Testing pipeline timed out for user=%s use_model_generation=%s after %s seconds", + current_user.id, + payload.use_model_generation, + MODEL_PIPELINE_TIMEOUT_SECONDS, + ) + raise HTTPException( + status_code=504, + detail=f"LLM generation timed out after {MODEL_PIPELINE_TIMEOUT_SECONDS} seconds", + ) from exc + except Exception as exc: + logger.exception( + "Testing pipeline failed for user=%s use_model_generation=%s: %s", + current_user.id, + payload.use_model_generation, + exc, + ) + raise HTTPException( + status_code=500, + detail=f"LLM generation failed: {exc}", + ) from exc - kb_vector_stores = await _build_kb_vector_stores(db, knowledge_bases) - if kb_vector_stores: - retriever = MultiKBRetriever( - reranker_weight=settings.RERANKER_WEIGHT, - ) - retrieval_rows = await retriever.retrieve( - query=payload.requirement_text, - kb_vector_stores=kb_vector_stores, - fetch_k_per_kb=max(12, payload.retrieval_top_k * 2), - top_k=payload.retrieval_top_k, - ) - if retrieval_rows: - knowledge_context = format_retrieval_context(retrieval_rows) - - result = run_testing_pipeline( - user_requirement_text=payload.requirement_text, - requirement_type_input=payload.requirement_type, - debug=payload.debug, - knowledge_context=knowledge_context, - use_model_generation=payload.use_model_generation, - max_items_per_group=payload.max_items_per_group, - cases_per_item=payload.cases_per_item, - max_focus_points=payload.max_focus_points, - max_llm_calls=payload.max_llm_calls, - ) return result diff --git a/rag-web-ui/backend/app/api/api_v1/tools.py b/rag-web-ui/backend/app/api/api_v1/tools.py index eeb37e6..a5791f7 100644 --- a/rag-web-ui/backend/app/api/api_v1/tools.py +++ b/rag-web-ui/backend/app/api/api_v1/tools.py @@ -1,26 +1,46 @@ from pathlib import Path from typing import Any, List +import shutil + from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile from sqlalchemy.orm import Session from app.core.security import get_current_user from app.db.session import get_db -from app.models.tooling import SRSExtraction, ToolJob +from app.models.knowledge import KnowledgeBase +from app.models.tooling import SRSExtraction, TestingGeneration, ToolJob from app.models.user import User from app.schemas.tooling import ( SRSToolCreateJobResponse, + SRSToolHistoryItem, SRSToolJobStatusResponse, SRSToolRequirementsSaveRequest, SRSToolResultResponse, + TestingGenerationCreateRequest, + TestingGenerationCreateResponse, + TestingGenerationHistoryItem, + TestingGenerationJobStatusResponse, + TestingGenerationResultResponse, + TestingGenerationSaveRequest, ToolDefinitionResponse, ) from app.services.srs_job_service import ( + build_srs_upload_path, build_result_response, + delete_srs_job, ensure_upload_path, + list_srs_history, replace_requirements, run_srs_job, ) +from app.services.testing_generation_service import ( + build_testing_generation_response, + create_testing_generation, + delete_testing_generation, + list_testing_history, +) +from app.services.testing_generation_job_service import run_testing_generation_job from app.tools.registry import ToolRegistry from app.tools.srs_reqs_qwen import get_srs_tool @@ -173,3 +193,223 @@ async def save_srs_requirements( db.refresh(extraction) return build_result_response(job, extraction) + + +@router.get("/srs/history", response_model=List[SRSToolHistoryItem]) +async def get_srs_history( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +) -> Any: + return list_srs_history(db, current_user.id) + + +@router.delete("/srs/jobs/{job_id}") +async def delete_srs_job_api( + job_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +) -> Any: + job = ( + db.query(ToolJob) + .filter(ToolJob.id == job_id, ToolJob.user_id == current_user.id) + .first() + ) + if not job: + raise HTTPException(status_code=404, detail="任务不存在") + + upload_path = build_srs_upload_path(job_id) + delete_srs_job(db, job) + + if upload_path.exists(): + shutil.rmtree(upload_path, ignore_errors=True) + + return {"message": "删除成功"} + + +@router.post("/testing/generations", response_model=TestingGenerationResultResponse) +async def save_testing_generation( + payload: TestingGenerationSaveRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +) -> Any: + if payload.source_job_id is not None: + source_job = ( + db.query(ToolJob) + .filter( + ToolJob.id == payload.source_job_id, + ToolJob.user_id == current_user.id, + ) + .first() + ) + if not source_job: + raise HTTPException(status_code=404, detail="来源文件不存在") + + if payload.knowledge_base_id is not None: + knowledge_base = ( + db.query(KnowledgeBase) + .filter( + KnowledgeBase.id == payload.knowledge_base_id, + KnowledgeBase.user_id == current_user.id, + ) + .first() + ) + if not knowledge_base: + raise HTTPException(status_code=404, detail="知识库不存在") + + return create_testing_generation(db, current_user.id, payload) + + +@router.post("/testing/jobs", response_model=TestingGenerationCreateResponse) +async def create_testing_generation_job( + background_tasks: BackgroundTasks, + payload: TestingGenerationCreateRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +) -> Any: + if payload.source_job_id is not None: + source_job = ( + db.query(ToolJob) + .filter( + ToolJob.id == payload.source_job_id, + ToolJob.user_id == current_user.id, + ) + .first() + ) + if not source_job: + raise HTTPException(status_code=404, detail="来源文件不存在") + + if payload.knowledge_base_id is not None: + knowledge_base = ( + db.query(KnowledgeBase) + .filter( + KnowledgeBase.id == payload.knowledge_base_id, + KnowledgeBase.user_id == current_user.id, + ) + .first() + ) + if not knowledge_base: + raise HTTPException(status_code=404, detail="知识库不存在") + + job = ToolJob( + user_id=current_user.id, + tool_name="testing.case_generator", + status="pending", + input_file_name=payload.source_document_name, + input_file_path="", + output_summary={ + "source_document_name": payload.source_document_name, + "current_step": 0, + "total_steps": len(payload.requirements), + }, + ) + db.add(job) + db.commit() + db.refresh(job) + + background_tasks.add_task( + run_testing_generation_job, + job.id, + payload.dict(), + ) + + return { + "job_id": job.id, + "status": job.status, + } + + +@router.get("/testing/jobs/{job_id}", response_model=TestingGenerationJobStatusResponse) +async def get_testing_generation_job_status( + job_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +) -> Any: + job = ( + db.query(ToolJob) + .filter( + ToolJob.id == job_id, + ToolJob.user_id == current_user.id, + ) + .first() + ) + if not job: + raise HTTPException(status_code=404, detail="任务不存在") + + summary = job.output_summary or {} + return { + "job_id": job.id, + "tool_name": job.tool_name, + "status": job.status, + "error_message": job.error_message, + "started_at": job.started_at, + "completed_at": job.completed_at, + "source_document_name": summary.get("source_document_name"), + "current_step": summary.get("current_step"), + "total_steps": summary.get("total_steps"), + "current_requirement_id": summary.get("current_requirement_id"), + } + + +@router.get("/testing/history", response_model=List[TestingGenerationHistoryItem]) +async def get_testing_history( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +) -> Any: + return list_testing_history(db, current_user.id) + + +@router.get("/testing/jobs/{job_id}/result", response_model=TestingGenerationResultResponse) +async def get_testing_generation_result( + job_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +) -> Any: + job = ( + db.query(ToolJob) + .filter( + ToolJob.id == job_id, + ToolJob.user_id == current_user.id, + ) + .first() + ) + if not job: + raise HTTPException(status_code=404, detail="任务不存在") + + generation = ( + db.query(TestingGeneration) + .filter(TestingGeneration.job_id == job.id) + .first() + ) + if not generation: + raise HTTPException(status_code=404, detail="任务结果不存在") + + return build_testing_generation_response(job, generation) + + +@router.delete("/testing/jobs/{job_id}") +async def delete_testing_generation_api( + job_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +) -> Any: + job = ( + db.query(ToolJob) + .filter( + ToolJob.id == job_id, + ToolJob.user_id == current_user.id, + ) + .first() + ) + if not job: + raise HTTPException(status_code=404, detail="任务不存在") + + generation = ( + db.query(TestingGeneration) + .filter(TestingGeneration.job_id == job.id) + .first() + ) + if not generation: + raise HTTPException(status_code=404, detail="任务结果不存在") + + delete_testing_generation(db, job) + return {"message": "删除成功"} diff --git a/rag-web-ui/backend/app/models/__init__.py b/rag-web-ui/backend/app/models/__init__.py index 0e8caac..3695699 100644 --- a/rag-web-ui/backend/app/models/__init__.py +++ b/rag-web-ui/backend/app/models/__init__.py @@ -2,7 +2,7 @@ from .user import User from .knowledge import KnowledgeBase, Document, DocumentChunk from .chat import Chat, Message from .api_key import APIKey -from .tooling import ToolJob, SRSExtraction, SRSRequirement +from .tooling import ToolJob, SRSExtraction, SRSRequirement, TestingGeneration __all__ = [ "User", @@ -15,4 +15,5 @@ __all__ = [ "ToolJob", "SRSExtraction", "SRSRequirement", + "TestingGeneration", ] diff --git a/rag-web-ui/backend/app/models/tooling.py b/rag-web-ui/backend/app/models/tooling.py index 3b3f28a..cdd1433 100644 --- a/rag-web-ui/backend/app/models/tooling.py +++ b/rag-web-ui/backend/app/models/tooling.py @@ -29,6 +29,13 @@ class ToolJob(Base, TimestampMixin): uselist=False, cascade="all, delete-orphan", ) + testing_generation = relationship( + "TestingGeneration", + back_populates="job", + uselist=False, + cascade="all, delete-orphan", + foreign_keys="TestingGeneration.job_id", + ) class SRSExtraction(Base, TimestampMixin): @@ -63,9 +70,14 @@ class SRSRequirement(Base, TimestampMixin): priority = Column(String(16), nullable=False, default="中") acceptance_criteria = Column(JSON, nullable=False) source_field = Column(String(255), nullable=False) + section_uid = Column(String(64), nullable=True) section_number = Column(String(64), nullable=True) section_title = Column(String(255), nullable=True) requirement_type = Column(String(64), nullable=True) + interface_name = Column(String(255), nullable=True) + interface_type = Column(String(128), nullable=True) + data_source = Column(String(255), nullable=True) + data_destination = Column(String(255), nullable=True) sort_order = Column(Integer, nullable=False, default=0) extraction = relationship("SRSExtraction", back_populates="requirements") @@ -74,3 +86,19 @@ class SRSRequirement(Base, TimestampMixin): sa.UniqueConstraint("extraction_id", "requirement_uid", name="uq_srs_extraction_requirement_uid"), sa.Index("idx_srs_requirements_extraction_sort", "extraction_id", "sort_order"), ) + + +class TestingGeneration(Base, TimestampMixin): + __tablename__ = "testing_generations" + + id = Column(Integer, primary_key=True, index=True) + job_id = Column(Integer, ForeignKey("tool_jobs.id", ondelete="CASCADE"), nullable=False, unique=True) + source_job_id = Column(Integer, ForeignKey("tool_jobs.id"), nullable=True, index=True) + source_document_name = Column(String(255), nullable=False) + generated_at = Column(DateTime, default=datetime.utcnow, nullable=False) + total_requirements = Column(Integer, nullable=False, default=0) + knowledge_base_id = Column(Integer, ForeignKey("knowledge_bases.id"), nullable=True, index=True) + generated_file = Column(JSON, nullable=False) + + job = relationship("ToolJob", back_populates="testing_generation", foreign_keys=[job_id]) + diff --git a/rag-web-ui/backend/app/schemas/tooling.py b/rag-web-ui/backend/app/schemas/tooling.py index a11fea3..a19ae20 100644 --- a/rag-web-ui/backend/app/schemas/tooling.py +++ b/rag-web-ui/backend/app/schemas/tooling.py @@ -29,14 +29,18 @@ class SRSToolJobStatusResponse(BaseModel): class SRSToolRequirementItem(BaseModel): id: str - title: str description: str priority: str acceptanceCriteria: List[str] sourceField: str + sectionUid: Optional[str] = None sectionNumber: Optional[str] = None sectionTitle: Optional[str] = None requirementType: Optional[str] = None + interfaceName: Optional[str] = None + interfaceType: Optional[str] = None + dataSource: Optional[str] = None + dataDestination: Optional[str] = None sortOrder: int @@ -46,7 +50,72 @@ class SRSToolResultResponse(BaseModel): generatedAt: str statistics: Dict[str, Any] requirements: List[SRSToolRequirementItem] + rawOutput: Dict[str, Any] + + +class SRSToolHistoryItem(BaseModel): + jobId: int + documentName: str + generatedAt: str + totalRequirements: int + status: str + createdAt: str + updatedAt: str class SRSToolRequirementsSaveRequest(BaseModel): requirements: List[SRSToolRequirementItem] + + +class TestingGenerationSaveRequest(BaseModel): + source_job_id: Optional[int] = None + source_document_name: str + knowledge_base_id: Optional[int] = None + generated_file: Dict[str, Any] + + +class TestingGenerationCreateRequest(BaseModel): + source_job_id: Optional[int] = None + source_document_name: str + knowledge_base_id: Optional[int] = None + requirements: List[SRSToolRequirementItem] + + +class TestingGenerationCreateResponse(BaseModel): + job_id: int + status: str + + +class TestingGenerationJobStatusResponse(BaseModel): + job_id: int + tool_name: str + status: str + error_message: Optional[str] = None + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + source_document_name: Optional[str] = None + current_step: Optional[int] = None + total_steps: Optional[int] = None + current_requirement_id: Optional[str] = None + + +class TestingGenerationResultResponse(BaseModel): + jobId: int + sourceJobId: Optional[int] = None + sourceDocumentName: str + generatedAt: str + totalRequirements: int + knowledgeBaseId: Optional[int] = None + generatedFile: Dict[str, Any] + + +class TestingGenerationHistoryItem(BaseModel): + jobId: int + sourceJobId: Optional[int] = None + sourceDocumentName: str + generatedAt: str + totalRequirements: int + knowledgeBaseId: Optional[int] = None + status: str + createdAt: str + updatedAt: str diff --git a/rag-web-ui/backend/app/services/srs_job_service.py b/rag-web-ui/backend/app/services/srs_job_service.py index b782940..1087321 100644 --- a/rag-web-ui/backend/app/services/srs_job_service.py +++ b/rag-web-ui/backend/app/services/srs_job_service.py @@ -1,8 +1,9 @@ from __future__ import annotations +from copy import deepcopy from datetime import datetime from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, List, Tuple from sqlalchemy.orm import Session @@ -10,6 +11,45 @@ from app.db.session import SessionLocal from app.models.tooling import SRSExtraction, SRSRequirement, ToolJob from app.tools.srs_reqs_qwen import get_srs_tool +TYPE_TO_CHINESE = { + "functional": "功能需求", + "interface": "接口需求", + "performance": "性能需求", + "security": "安全需求", + "reliability": "可靠性需求", + "other": "其他需求", +} + + +def _build_internal_title(description: Any, fallback: str, index: int = 0) -> str: + text = str(description or "").strip() + if not text: + return fallback or f"需求项 {index + 1}" + + for separator in ("。", ";", "\n", ";", "."): + if separator in text: + text = text.split(separator, 1)[0].strip() + break + + text = text[:20].strip() + return text or fallback or f"需求项 {index + 1}" + + +def _normalize_requirement_type(value: Any) -> str: + text = str(value or "").strip() + if text in {"functional", "interface", "performance", "security", "reliability", "other"}: + return text + + chinese_map = { + "接口需求": "interface", + "性能需求": "performance", + "安全需求": "security", + "可靠性需求": "reliability", + "其他需求": "other", + "功能需求": "functional", + } + return chinese_map.get(text, "functional") + def run_srs_job(job_id: int) -> None: db = SessionLocal() @@ -41,14 +81,19 @@ def run_srs_job(job_id: int) -> None: requirement = SRSRequirement( extraction_id=extraction.id, requirement_uid=item["id"], - title=item.get("title") or item["id"], + title=_build_internal_title(item.get("description"), item["id"]), description=item.get("description") or "", priority=item.get("priority") or "中", acceptance_criteria=item.get("acceptance_criteria") or ["待补充验收标准"], source_field=item.get("source_field") or "文档解析", + section_uid=item.get("section_uid"), section_number=item.get("section_number"), section_title=item.get("section_title"), requirement_type=item.get("requirement_type"), + interface_name=item.get("interface_name"), + interface_type=item.get("interface_type"), + data_source=item.get("data_source"), + data_destination=item.get("data_destination"), sort_order=int(item.get("sort_order") or 0), ) db.add(requirement) @@ -97,22 +142,8 @@ def ensure_upload_path(job_id: int, file_name: str) -> Path: def build_result_response(job: ToolJob, extraction: SRSExtraction) -> Dict[str, Any]: - requirements: List[Dict[str, Any]] = [] - for item in extraction.requirements: - requirements.append( - { - "id": item.requirement_uid, - "title": item.title, - "description": item.description, - "priority": item.priority, - "acceptanceCriteria": item.acceptance_criteria or [], - "sourceField": item.source_field, - "sectionNumber": item.section_number, - "sectionTitle": item.section_title, - "requirementType": item.requirement_type, - "sortOrder": item.sort_order, - } - ) + requirements = [_requirement_model_to_payload(item) for item in extraction.requirements] + raw_output = _merge_updates_into_raw_output(extraction.raw_output, requirements, extraction.document_name) return { "jobId": job.id, @@ -120,10 +151,12 @@ def build_result_response(job: ToolJob, extraction: SRSExtraction) -> Dict[str, "generatedAt": extraction.generated_at.isoformat(), "statistics": extraction.statistics or {}, "requirements": requirements, + "rawOutput": raw_output, } def replace_requirements(db: Session, extraction: SRSExtraction, updates: List[Dict[str, Any]]) -> None: + normalized_updates = [_normalize_update_payload(item, index) for index, item in enumerate(updates)] existing = { req.requirement_uid: req for req in db.query(SRSRequirement) @@ -132,51 +165,95 @@ def replace_requirements(db: Session, extraction: SRSExtraction, updates: List[D } seen_ids = set() - for index, item in enumerate(updates): + for index, item in enumerate(normalized_updates): uid = item["id"] seen_ids.add(uid) req = existing.get(uid) if req is None: + description = item.get("description") if item.get("description") is not None else "" req = SRSRequirement( extraction_id=extraction.id, requirement_uid=uid, - title=item.get("title") or uid, - description=item.get("description") if item.get("description") is not None else "", + title=_build_internal_title(description, uid, int(item.get("sortOrder") or index)), + description=description, priority=item.get("priority") or "中", acceptance_criteria=item.get("acceptanceCriteria") or ["待补充验收标准"], source_field=item.get("sourceField") or "文档解析", + section_uid=item.get("sectionUid"), section_number=item.get("sectionNumber"), section_title=item.get("sectionTitle"), requirement_type=item.get("requirementType"), - sort_order=int(item.get("sortOrder") or index), + interface_name=item.get("interfaceName"), + interface_type=item.get("interfaceType"), + data_source=item.get("dataSource"), + data_destination=item.get("dataDestination"), + sort_order=int(item.get("sortOrder") or 0), ) db.add(req) continue - req.title = item.get("title", req.title) req.description = item.get("description", req.description) + req.title = _build_internal_title(req.description, req.requirement_uid, int(item.get("sortOrder") or index)) req.priority = item.get("priority", req.priority) req.acceptance_criteria = item.get("acceptanceCriteria", req.acceptance_criteria) req.source_field = item.get("sourceField", req.source_field) + req.section_uid = item.get("sectionUid", req.section_uid) req.section_number = item.get("sectionNumber", req.section_number) req.section_title = item.get("sectionTitle", req.section_title) req.requirement_type = item.get("requirementType", req.requirement_type) - req.sort_order = int(item.get("sortOrder", index)) + req.interface_name = item.get("interfaceName", req.interface_name) + req.interface_type = item.get("interfaceType", req.interface_type) + req.data_source = item.get("dataSource", req.data_source) + req.data_destination = item.get("dataDestination", req.data_destination) + req.sort_order = int(item.get("sortOrder", req.sort_order)) for uid, req in existing.items(): if uid not in seen_ids: db.delete(req) - extraction.total_requirements = len(updates) + extraction.total_requirements = len(normalized_updates) extraction.statistics = { - "total": len(updates), - "by_type": _count_requirement_types(updates), - } - extraction.raw_output = { - "document_name": extraction.document_name, - "generated_at": extraction.generated_at.isoformat(), - "requirements": updates, + "total": len(normalized_updates), + "by_type": _count_requirement_types(normalized_updates), } + extraction.raw_output = _merge_updates_into_raw_output( + extraction.raw_output, + normalized_updates, + extraction.document_name, + ) + + +def list_srs_history(db: Session, user_id: int) -> List[Dict[str, Any]]: + records: List[Tuple[ToolJob, SRSExtraction]] = ( + db.query(ToolJob, SRSExtraction) + .join(SRSExtraction, SRSExtraction.job_id == ToolJob.id) + .filter(ToolJob.user_id == user_id) + .order_by(ToolJob.created_at.desc()) + .all() + ) + items: List[Dict[str, Any]] = [] + for job, extraction in records: + items.append( + { + "jobId": job.id, + "documentName": extraction.document_name, + "generatedAt": extraction.generated_at.isoformat(), + "totalRequirements": extraction.total_requirements, + "status": job.status, + "createdAt": job.created_at.isoformat(), + "updatedAt": job.updated_at.isoformat(), + } + ) + return items + + +def delete_srs_job(db: Session, job: ToolJob) -> None: + db.delete(job) + db.commit() + + +def build_srs_upload_path(job_id: int) -> Path: + return Path("uploads") / "srs_jobs" / str(job_id) def _count_requirement_types(items: List[Dict[str, Any]]) -> Dict[str, int]: @@ -185,3 +262,210 @@ def _count_requirement_types(items: List[Dict[str, Any]]) -> Dict[str, int]: req_type = item.get("requirementType") or "functional" stats[req_type] = stats.get(req_type, 0) + 1 return stats + + +def _requirement_model_to_payload(item: SRSRequirement) -> Dict[str, Any]: + return { + "id": item.requirement_uid, + "description": item.description, + "priority": item.priority, + "acceptanceCriteria": item.acceptance_criteria or [], + "sourceField": item.source_field, + "sectionUid": item.section_uid, + "sectionNumber": item.section_number, + "sectionTitle": item.section_title, + "requirementType": item.requirement_type, + "interfaceName": item.interface_name, + "interfaceType": item.interface_type, + "dataSource": item.data_source, + "dataDestination": item.data_destination, + "sortOrder": item.sort_order, + } + + +def _normalize_update_payload(item: Dict[str, Any], index: int) -> Dict[str, Any]: + requirement_type = _normalize_requirement_type(item.get("requirementType")) + normalized = { + "id": str(item.get("id") or f"REQ-{index + 1:03d}"), + "description": str(item.get("description") or "").strip(), + "priority": item.get("priority") or "中", + "acceptanceCriteria": item.get("acceptanceCriteria") or ["待补充验收标准"], + "sourceField": item.get("sourceField") or "文档解析", + "sectionUid": item.get("sectionUid"), + "sectionNumber": item.get("sectionNumber"), + "sectionTitle": item.get("sectionTitle"), + "requirementType": requirement_type, + "interfaceName": item.get("interfaceName") or "", + "interfaceType": item.get("interfaceType") or "", + "dataSource": item.get("dataSource") or "", + "dataDestination": item.get("dataDestination") or "", + "sortOrder": int(item.get("sortOrder") or index), + } + + if requirement_type != "interface": + normalized["interfaceName"] = "" + normalized["interfaceType"] = "" + normalized["dataSource"] = "" + normalized["dataDestination"] = "" + + return normalized + + +def _merge_updates_into_raw_output( + raw_output: Dict[str, Any] | None, + updates: List[Dict[str, Any]], + document_name: str, +) -> Dict[str, Any]: + if not isinstance(raw_output, dict) or "需求内容" not in raw_output: + return _build_raw_output_from_flat(updates, document_name) + + result = deepcopy(raw_output) + content = result.get("需求内容") + if not isinstance(content, dict): + return _build_raw_output_from_flat(updates, document_name) + + updates_by_section = _group_updates_by_section(updates) + _rewrite_content_requirements(content, updates_by_section) + _append_unmatched_sections(content, updates_by_section) + _refresh_metadata(result, updates) + return result + + +def _group_updates_by_section(updates: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: + grouped: Dict[str, List[Dict[str, Any]]] = {} + for item in updates: + key = _section_key(item.get("sectionUid"), item.get("sectionNumber"), item.get("sectionTitle")) + grouped.setdefault(key, []).append(item) + + for values in grouped.values(): + values.sort(key=lambda value: int(value.get("sortOrder") or 0)) + return grouped + + +def _rewrite_content_requirements(content: Dict[str, Any], updates_by_section: Dict[str, List[Dict[str, Any]]]) -> None: + for section in content.values(): + if not isinstance(section, dict): + continue + + section_info = section.get("章节信息") or {} + section_key = _section_key( + section_info.get("章节UID"), + section_info.get("章节编号"), + section_info.get("章节标题"), + ) + if section_key in updates_by_section: + section["需求列表"] = [_to_raw_requirement_item(item) for item in updates_by_section.pop(section_key)] + elif "需求列表" in section: + section["需求列表"] = [] + + children = section.get("子章节") + if isinstance(children, dict): + _rewrite_content_requirements(children, updates_by_section) + + +def _append_unmatched_sections(content: Dict[str, Any], updates_by_section: Dict[str, List[Dict[str, Any]]]) -> None: + if not updates_by_section: + return + + orphan_key = "未归类章节" + orphan_section = content.get(orphan_key) + if not isinstance(orphan_section, dict): + orphan_section = { + "章节信息": { + "章节编号": "", + "章节标题": orphan_key, + "章节级别": 1, + }, + "需求列表": [], + } + content[orphan_key] = orphan_section + + all_reqs: List[Dict[str, Any]] = orphan_section.get("需求列表") or [] + for values in updates_by_section.values(): + for item in values: + all_reqs.append(_to_raw_requirement_item(item)) + orphan_section["需求列表"] = all_reqs + updates_by_section.clear() + + +def _refresh_metadata(raw_output: Dict[str, Any], updates: List[Dict[str, Any]]) -> None: + metadata = raw_output.get("文档元数据") + if not isinstance(metadata, dict): + metadata = {} + raw_output["文档元数据"] = metadata + + metadata["总需求数"] = len(updates) + metadata["生成时间"] = datetime.now().isoformat() + + type_stats: Dict[str, int] = {} + for item in updates: + req_type = item.get("requirementType") or "functional" + cn_type = TYPE_TO_CHINESE.get(req_type, "其他需求") + type_stats[cn_type] = type_stats.get(cn_type, 0) + 1 + metadata["需求类型统计"] = type_stats + + +def _to_raw_requirement_item(item: Dict[str, Any]) -> Dict[str, Any]: + req_type = item.get("requirementType") or "functional" + raw_item = { + "需求类型": TYPE_TO_CHINESE.get(req_type, "其他需求"), + "需求编号": item.get("id") or "", + "需求描述": item.get("description") or "", + "优先级": item.get("priority") or "中", + } + if req_type == "interface": + raw_item["接口名称"] = item.get("interfaceName") or "" + raw_item["接口类型"] = item.get("interfaceType") or "" + raw_item["数据来源"] = item.get("dataSource") or "" + raw_item["数据目的地"] = item.get("dataDestination") or "" + return raw_item + + +def _build_raw_output_from_flat(updates: List[Dict[str, Any]], document_name: str) -> Dict[str, Any]: + grouped = _group_updates_by_section(updates) + content: Dict[str, Any] = {} + for key, values in grouped.items(): + number, title = _parse_section_key(key) + section_title = title or "未归类章节" + display_key = f"{number} {section_title}".strip() + content[display_key] = { + "章节信息": { + "章节编号": number, + "章节标题": section_title, + "章节级别": max(len(number.split(".")), 1) if number else 1, + }, + "需求列表": [_to_raw_requirement_item(item) for item in values], + } + + raw_output = { + "文档元数据": { + "标题": document_name, + "生成时间": datetime.now().isoformat(), + "总需求数": len(updates), + "需求类型统计": {}, + }, + "需求内容": content, + } + _refresh_metadata(raw_output, updates) + return raw_output + + +def _section_key(section_uid: Any, section_number: Any, section_title: Any) -> str: + uid = str(section_uid or "").strip() + if uid: + return f"uid::{uid}" + number = str(section_number or "").strip() + title = str(section_title or "").strip() + return f"number::{number}::title::{title}" + + +def _parse_section_key(value: str) -> Tuple[str, str]: + if value.startswith("uid::"): + return "", "未归类章节" + number = "" + title = "" + parts = value.split("::") + if len(parts) >= 4: + number = parts[1] + title = parts[3] + return number, title diff --git a/rag-web-ui/backend/app/services/testing_generation_job_service.py b/rag-web-ui/backend/app/services/testing_generation_job_service.py new file mode 100644 index 0000000..2c59c8a --- /dev/null +++ b/rag-web-ui/backend/app/services/testing_generation_job_service.py @@ -0,0 +1,236 @@ +from __future__ import annotations + +import asyncio +from datetime import datetime +from typing import Any, Dict, List + +from sqlalchemy.orm import Session + +from app.core.config import settings +from app.db.session import SessionLocal +from app.models.knowledge import Document, KnowledgeBase +from app.models.tooling import TestingGeneration, ToolJob +from app.services.embedding.embedding_factory import EmbeddingsFactory +from app.services.retrieval.multi_kb_retriever import MultiKBRetriever, format_retrieval_context +from app.services.testing_pipeline import run_testing_pipeline +from app.services.vector_store import VectorStoreFactory + +def _flatten_record(value: Dict[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + items: List[Dict[str, Any]] = [] + for current in value.values(): + items.extend(current) + return items + + +def _build_kb_vector_stores(db: Session, knowledge_bases: List[KnowledgeBase]) -> List[Dict[str, Any]]: + embeddings = EmbeddingsFactory.create() + kb_vector_stores: List[Dict[str, Any]] = [] + + for kb in knowledge_bases: + documents = db.query(Document).filter(Document.knowledge_base_id == kb.id).all() + if not documents: + continue + + store = VectorStoreFactory.create( + store_type=settings.VECTOR_STORE_TYPE, + collection_name=f"kb_{kb.id}", + embedding_function=embeddings, + ) + kb_vector_stores.append({"kb_id": kb.id, "store": store}) + + return kb_vector_stores + + +def _resolve_knowledge_context( + db: Session, + *, + user_id: int, + requirement_text: str, + knowledge_base_id: int | None, +) -> str: + if knowledge_base_id is None: + return "" + + try: + knowledge_bases = ( + db.query(KnowledgeBase) + .filter( + KnowledgeBase.id == knowledge_base_id, + KnowledgeBase.user_id == user_id, + ) + .all() + ) + kb_vector_stores = _build_kb_vector_stores(db, knowledge_bases) + if not kb_vector_stores: + return "" + + retriever = MultiKBRetriever( + reranker_weight=settings.RERANKER_WEIGHT, + ) + rows = asyncio.run( + retriever.retrieve( + query=requirement_text, + kb_vector_stores=kb_vector_stores, + fetch_k_per_kb=16, + top_k=8, + ) + ) + if rows: + return format_retrieval_context(rows) + except Exception: + return "" + + return "" + + +def _build_generated_requirement(req: Dict[str, Any], pipeline_result: Dict[str, Any]) -> Dict[str, Any]: + test_items = [ + { + "id": item.get("id"), + "content": item.get("content"), + } + for item in _flatten_record(pipeline_result.get("test_items", {})) + ] + test_cases = [ + { + "id": item.get("id"), + "itemId": item.get("item_id"), + "testContent": item.get("test_content"), + "operationSteps": item.get("operation_steps", []), + "expectedResultPlaceholder": item.get("expected_result_placeholder"), + } + for item in _flatten_record(pipeline_result.get("test_cases", {})) + ] + expected_results = [ + { + "id": item.get("id"), + "caseId": item.get("case_id"), + "result": item.get("result"), + } + for item in _flatten_record(pipeline_result.get("expected_results", {})) + ] + + return { + **req, + "测试项": test_items, + "测试用例": test_cases, + "预期结果": expected_results, + } + + +def _mark_job_failed(job_id: int, error_message: str) -> None: + db = SessionLocal() + try: + job = db.query(ToolJob).filter(ToolJob.id == job_id).first() + if not job: + return + job.status = "failed" + job.completed_at = datetime.utcnow() + job.error_message = error_message[:2000] + db.commit() + finally: + db.close() + + +def run_testing_generation_job(job_id: int, payload: Dict[str, Any]) -> None: + db = SessionLocal() + try: + job = db.query(ToolJob).filter(ToolJob.id == job_id).first() + if not job: + return + + requirements = payload.get("requirements") or [] + source_document_name = str(payload.get("source_document_name") or job.input_file_name or "") + source_job_id = payload.get("source_job_id") + knowledge_base_id = payload.get("knowledge_base_id") + + job.status = "processing" + job.started_at = datetime.utcnow() + job.error_message = None + job.output_summary = { + "source_document_name": source_document_name, + "current_step": 0, + "total_steps": len(requirements), + } + db.commit() + + generated_requirements: List[Dict[str, Any]] = [] + + for index, req in enumerate(requirements): + req_id = str(req.get("id") or f"REQ-{index + 1:03d}") + job.output_summary = { + "source_document_name": source_document_name, + "current_step": index + 1, + "total_steps": len(requirements), + "current_requirement_id": req_id, + } + db.commit() + + description = str(req.get("description") or "").strip() + if not description: + generated_requirements.append( + { + **req, + "测试项": [], + "测试用例": [], + "预期结果": [], + } + ) + continue + + knowledge_context = _resolve_knowledge_context( + db, + user_id=job.user_id, + requirement_text=description, + knowledge_base_id=knowledge_base_id, + ) + + pipeline_result = run_testing_pipeline( + user_requirement_text=description, + requirement_type_input=req.get("requirementType"), + debug=False, + knowledge_context=knowledge_context, + use_model_generation=True, + max_items_per_group=12, + cases_per_item=2, + max_focus_points=6, + max_llm_calls=10, + ) + generated_requirements.append(_build_generated_requirement(req, pipeline_result)) + + generated_at = datetime.utcnow() + generated_file = { + "sourceDocument": source_document_name, + "sourceJobId": source_job_id, + "generatedAt": generated_at.isoformat(), + "totalRequirements": len(generated_requirements), + "knowledgeBaseId": knowledge_base_id, + "requirements": generated_requirements, + } + + generation = TestingGeneration( + job_id=job.id, + source_job_id=source_job_id, + source_document_name=source_document_name, + generated_at=generated_at, + total_requirements=len(generated_requirements), + knowledge_base_id=knowledge_base_id, + generated_file=generated_file, + ) + db.add(generation) + + job.status = "completed" + job.completed_at = datetime.utcnow() + job.output_summary = { + "source_document_name": source_document_name, + "current_step": len(generated_requirements), + "total_steps": len(generated_requirements), + "total_requirements": len(generated_requirements), + "knowledge_base_id": knowledge_base_id, + } + db.commit() + except Exception as exc: + db.rollback() + _mark_job_failed(job_id, str(exc)) + finally: + db.close() diff --git a/rag-web-ui/backend/app/services/testing_generation_service.py b/rag-web-ui/backend/app/services/testing_generation_service.py new file mode 100644 index 0000000..35df5cd --- /dev/null +++ b/rag-web-ui/backend/app/services/testing_generation_service.py @@ -0,0 +1,111 @@ +from datetime import datetime +from typing import Any, Dict, List, Tuple + +from sqlalchemy.orm import Session + +from app.models.tooling import TestingGeneration, ToolJob +from app.schemas.tooling import TestingGenerationSaveRequest + +TESTING_TOOL_NAME = "testing.case_generator" + + +def _resolve_total_requirements(generated_file: Dict[str, Any]) -> int: + requirements = generated_file.get("requirements") + if isinstance(requirements, list): + return len(requirements) + + total = generated_file.get("totalRequirements") + if isinstance(total, int) and total >= 0: + return total + + return 0 + + +def build_testing_generation_response(job: ToolJob, generation: TestingGeneration) -> Dict[str, Any]: + return { + "jobId": job.id, + "sourceJobId": generation.source_job_id, + "sourceDocumentName": generation.source_document_name, + "generatedAt": generation.generated_at.isoformat(), + "totalRequirements": generation.total_requirements, + "knowledgeBaseId": generation.knowledge_base_id, + "generatedFile": generation.generated_file or {}, + } + + +def create_testing_generation( + db: Session, + user_id: int, + payload: TestingGenerationSaveRequest, +) -> Dict[str, Any]: + now = datetime.utcnow() + total_requirements = _resolve_total_requirements(payload.generated_file) + + job = ToolJob( + user_id=user_id, + tool_name=TESTING_TOOL_NAME, + status="completed", + input_file_name=payload.source_document_name, + input_file_path="", + started_at=now, + completed_at=now, + output_summary={ + "source_document_name": payload.source_document_name, + "total_requirements": total_requirements, + "knowledge_base_id": payload.knowledge_base_id, + }, + ) + db.add(job) + db.flush() + + generation = TestingGeneration( + job_id=job.id, + source_job_id=payload.source_job_id, + source_document_name=payload.source_document_name, + generated_at=now, + total_requirements=total_requirements, + knowledge_base_id=payload.knowledge_base_id, + generated_file=payload.generated_file, + ) + db.add(generation) + db.commit() + db.refresh(job) + db.refresh(generation) + + return build_testing_generation_response(job, generation) + + +def list_testing_history(db: Session, user_id: int) -> List[Dict[str, Any]]: + rows: List[Tuple[ToolJob, TestingGeneration]] = ( + db.query(ToolJob, TestingGeneration) + .join(TestingGeneration, TestingGeneration.job_id == ToolJob.id) + .filter( + ToolJob.user_id == user_id, + ToolJob.tool_name == TESTING_TOOL_NAME, + ) + .order_by(ToolJob.created_at.desc()) + .all() + ) + + items: List[Dict[str, Any]] = [] + for job, generation in rows: + items.append( + { + "jobId": job.id, + "sourceJobId": generation.source_job_id, + "sourceDocumentName": generation.source_document_name, + "generatedAt": generation.generated_at.isoformat(), + "totalRequirements": generation.total_requirements, + "knowledgeBaseId": generation.knowledge_base_id, + "status": job.status, + "createdAt": job.created_at.isoformat(), + "updatedAt": job.updated_at.isoformat(), + } + ) + + return items + + +def delete_testing_generation(db: Session, job: ToolJob) -> None: + db.delete(job) + db.commit() diff --git a/rag-web-ui/backend/app/services/testing_pipeline/pipeline.py b/rag-web-ui/backend/app/services/testing_pipeline/pipeline.py index e2ce26a..23f14d9 100644 --- a/rag-web-ui/backend/app/services/testing_pipeline/pipeline.py +++ b/rag-web-ui/backend/app/services/testing_pipeline/pipeline.py @@ -4,7 +4,6 @@ from time import perf_counter from typing import Any, Dict, List, Optional from uuid import uuid4 -from app.services.llm.llm_factory import LLMFactory from app.services.testing_pipeline.tools import build_default_tool_chain @@ -42,6 +41,8 @@ def run_testing_pipeline( llm_model = None if use_model_generation: try: + from app.services.llm.llm_factory import LLMFactory + llm_model = LLMFactory.create(streaming=False) except Exception: llm_model = None diff --git a/rag-web-ui/backend/app/tools/srs_reqs_qwen/src/document_parser.py b/rag-web-ui/backend/app/tools/srs_reqs_qwen/src/document_parser.py index dd4505d..e9c9afc 100644 --- a/rag-web-ui/backend/app/tools/srs_reqs_qwen/src/document_parser.py +++ b/rag-web-ui/backend/app/tools/srs_reqs_qwen/src/document_parser.py @@ -137,6 +137,19 @@ class DocumentParser(ABC): chinese_numbers = '一二三四五六七八九十百千万' return text and all(c in chinese_numbers for c in text) + def _section_sort_key(self, section: 'Section') -> Tuple[int, List[int], str]: + number = (section.number or "").strip() + if number and re.match(r'^\d+(?:\.\d+)*$', number): + return (0, [int(part) for part in number.split('.')], section.title or "") + return (1, [section.level], section.title or "") + + def _sort_sections_by_number(self, sections: List['Section']) -> List['Section']: + ordered = sorted(sections, key=self._section_sort_key) + for section in ordered: + if section.children: + section.children = self._sort_sections_by_number(section.children) + return ordered + class DocxParser(DocumentParser): """DOCX格式文档解析器""" @@ -210,6 +223,7 @@ class DocxParser(DocumentParser): # 为没有编号的章节自动生成编号 self._auto_number_sections(self.sections) + self.sections = self._sort_sections_by_number(self.sections) logger.info(f"完成Docx解析,提取{len(self.sections)}个顶级章节") return self.sections @@ -236,12 +250,17 @@ class DocxParser(DocumentParser): """解析标题,返回(编号, 标题, 级别)""" style_name = paragraph.style.name if paragraph.style else "" is_heading_style = style_name.lower().startswith('heading') if style_name else False + + if self._is_calendar_line(text): + return None # 数字编号标题 - match = re.match(r'^(\d+(?:\.\d+)*)\s*[\.、]?\s*(.+)$', text) + match = re.match(r'^(\d+(?:\.\d+)*)\s*[\.、.))::\-_/]?\s*(.+)$', text) if match and self._is_valid_heading(match.group(2)): number = match.group(1) title = match.group(2).strip() + if not self._is_valid_numbered_heading(number, title): + return None level = len(number.split('.')) return number, title, level @@ -263,6 +282,31 @@ class DocxParser(DocumentParser): return None + def _is_calendar_line(self, text: str) -> bool: + value = (text or "").strip().replace(" ", "") + return bool(re.match(r'^\d{4}年\d{1,2}月(?:\d{1,2}日)?$', value)) + + def _is_valid_numbered_heading(self, number: str, title: str) -> bool: + parts = number.split('.') + if len(parts) > 6: + return False + + first = int(parts[0]) + if first < 1 or first > 30: + return False + + for part in parts[1:]: + if int(part) > 30: + return False + + if len(parts) == 1 and re.match(r'^年\d{1,2}月', title): + return False + + if title and title[0].isdigit(): + return False + + return True + def _iter_block_items(self, parent): """按文档顺序迭代段落和表格""" from docx.text.paragraph import Paragraph @@ -356,6 +400,7 @@ class PDFParser(DocumentParser): # 6. 为没有编号的章节自动生成编号 self._auto_number_sections(self.sections) + self.sections = self._sort_sections_by_number(self.sections) logger.info(f"完成PDF解析,提取{len(self.sections)}个顶级章节") return self.sections @@ -599,18 +644,6 @@ class PDFParser(DocumentParser): level = len(number.split('.')) top_level_number = int(number.split('.')[0]) - # 顶级章节序号大幅跳跃通常是误识别(如正文中的“8 表...”)。 - if level == 1 and last_top_level_number and top_level_number > last_top_level_number + 1: - if line and not self._is_noise(line): - content_buffer.append(line) - continue - - # 顶级章节编号倒退通常是正文枚举项被误识别(如“1 综合监控...”)。 - if level == 1 and last_top_level_number and top_level_number < last_top_level_number: - if line and not self._is_noise(line): - content_buffer.append(line) - continue - if level > 6: continue @@ -645,10 +678,6 @@ class PDFParser(DocumentParser): for l in list(section_stack.keys()): if l > level: del section_stack[l] - - # 若出现层级跳跃(如1->3),自动回退到父级+1。 - if level > 1 and (level - 1) not in section_stack: - section.level = max(section_stack.keys()) if section_stack else 1 current_section = section else: @@ -670,7 +699,10 @@ class PDFParser(DocumentParser): (章节编号, 章节标题) 或 None """ # 模式: "3.1 功能需求" / "3.1.2 电场..." - match = re.match(r'^(\d+(?:\.\d+)*)[\s、.))]*(.+)$', line) + if self._is_calendar_line(line): + return None + + match = re.match(r'^(\d+(?:\.\d+)*)[\s、..))::\-_/]*(.+)$', line) if not match: return None @@ -692,7 +724,7 @@ class PDFParser(DocumentParser): # 检查子部分是否合理 for part in parts[1:]: - if int(part) > 20: + if int(part) > 30: return None # 避免重复 @@ -747,6 +779,10 @@ class PDFParser(DocumentParser): return (number, title) + def _is_calendar_line(self, text: str) -> bool: + value = (text or "").strip().replace(" ", "") + return bool(re.match(r'^\d{4}年\d{1,2}月(?:\d{1,2}日)?$', value)) + def _looks_like_statement(self, title: str) -> bool: """判断标题是否更像正文语句而非章节名。""" if not title: diff --git a/rag-web-ui/backend/app/tools/srs_reqs_qwen/tool.py b/rag-web-ui/backend/app/tools/srs_reqs_qwen/tool.py index c5cdb04..fb2beaf 100644 --- a/rag-web-ui/backend/app/tools/srs_reqs_qwen/tool.py +++ b/rag-web-ui/backend/app/tools/srs_reqs_qwen/tool.py @@ -51,6 +51,8 @@ class SRSTool: "other": "低", } + UNKNOWN_INTERFACE_VALUES = {"", "未知", "unknown", "n/a", "-", "--", "无", "none", "null"} + def __init__(self) -> None: ToolRegistry.register(self.DEFINITION) @@ -90,24 +92,78 @@ class SRSTool: normalized: List[Dict[str, Any]] = [] for index, req in enumerate(extracted, start=1): description = (req.description or "").strip() - title = description[:40] if description else f"需求项 {index}" + title = self._build_short_title(description, index) + requirement_type = self._normalize_requirement_type( + req_type=getattr(req, "type", "functional"), + interface_name=getattr(req, "interface_name", ""), + interface_type=getattr(req, "interface_type", ""), + data_source=getattr(req, "source", ""), + data_destination=getattr(req, "destination", ""), + ) source_field = f"{req.section_number} {req.section_title}".strip() or "文档解析" normalized.append( { "id": req.id, "title": title, "description": description, - "priority": self.PRIORITY_BY_TYPE.get(req.type, "中"), + "priority": "中", "acceptance_criteria": [description] if description else ["待补充验收标准"], "source_field": source_field, + "section_uid": req.section_uid, "section_number": req.section_number, "section_title": req.section_title, - "requirement_type": req.type, + "requirement_type": requirement_type, + "interface_name": req.interface_name if requirement_type == "interface" else "", + "interface_type": req.interface_type if requirement_type == "interface" else "", + "data_source": req.source if requirement_type == "interface" else "", + "data_destination": req.destination if requirement_type == "interface" else "", "sort_order": index, } ) return normalized + def _normalize_requirement_type( + self, + req_type: Any, + interface_name: Any, + interface_type: Any, + data_source: Any, + data_destination: Any, + ) -> str: + raw_type = str(req_type or "").strip() + mapping = { + "功能需求": "functional", + "接口需求": "interface", + "性能需求": "performance", + "安全需求": "security", + "可靠性需求": "reliability", + "其他需求": "other", + } + normalized_type = mapping.get(raw_type, raw_type) + if normalized_type not in self.PRIORITY_BY_TYPE: + normalized_type = "functional" + + fields = [interface_name, interface_type, data_source, data_destination] + has_interface_fields = any( + str(value or "").strip().lower() not in self.UNKNOWN_INTERFACE_VALUES for value in fields + ) + + if normalized_type == "interface" or has_interface_fields: + return "interface" + return normalized_type + + def _build_short_title(self, description: str, index: int) -> str: + text = (description or "").strip() + if not text: + return f"需求项 {index}" + for separator in ("。", ";", "\n", ";", "."): + if separator in text: + text = text.split(separator, 1)[0].strip() + break + if len(text) <= 20: + return text + return f"{text[:20].rstrip()}" + def _load_config(self) -> Dict[str, Any]: config_path = Path(__file__).with_name("default_config.yaml") if config_path.exists(): diff --git a/rag-web-ui/frontend/src/app/dashboard/chat/new/page.tsx b/rag-web-ui/frontend/src/app/dashboard/chat/new/page.tsx index f1d3d6e..330b2bf 100644 --- a/rag-web-ui/frontend/src/app/dashboard/chat/new/page.tsx +++ b/rag-web-ui/frontend/src/app/dashboard/chat/new/page.tsx @@ -94,7 +94,7 @@ export default function NewChatPage() { 开始对话前,请先创建至少一个知识库。