完善了识别逻辑,允许轻微改动需求描述

This commit is contained in:
2026-04-12 21:45:55 +08:00
parent aa8fd4d84a
commit f01ddf045d
10 changed files with 1099 additions and 183 deletions

View File

@@ -10,6 +10,8 @@
- **智能过滤**:自动过滤系统描述、重复需求等非需求内容
- **结构化输出**按章节层次组织的JSON格式输出
- **表格需求识别**:支持从表格中提取功能/接口/其他需求
- **PDF表格提取**支持从PDF中提取表格并自动挂接到章节
- **长句原子拆分**:自动将包含多个需求点的长句拆分为多个可验证需求项
## 快速开始
@@ -20,6 +22,9 @@ pip install -r requirements.txt
# 如果使用LLM功能还需安装
pip install dashscope
# 若需增强PDF表格提取能力requirements.txt已包含
pip install pdfplumber
```
### 配置API密钥LLM模式
@@ -41,7 +46,7 @@ llm:
```bash
# LLM增强模式
python main.py -i DC-SRS.pdf -o output.json
python main.py -i ".\input\DC-SRS.pdf" -o ".\output\output.json"
# 纯规则模式不使用LLM
python main.py -i DC-SRS.pdf -o output.json --no-llm

View File

@@ -8,7 +8,7 @@ llm:
# LLM提供商qwen阿里云千问
provider: "qwen"
# 模型名称
model: "qwen3-max"
model: "qwen3-max-2026-01-23"
# API密钥建议使用环境变量 DASHSCOPE_API_KEY
api_key: "sk-7097f7842f724f0c9e70c4bf3b16dacb"
# 可选参数
@@ -66,6 +66,25 @@ extraction:
prefix: "OR"
keywords: ["约束", "资源", "适应性", "保密", "环境", "计算机", "质量", "设计", "人员", "培训", "保障", "验收", "交付"]
priority: 6
splitter:
enabled: true
max_sentence_len: 120
min_clause_len: 12
semantic_guard:
enabled: true
preserve_condition_action_chain: true
preserve_alarm_chain: true
table_strategy:
llm_semantic_enabled: true
sequence_table_merge: "single_requirement"
merge_time_series_rows_min: 3
rewrite_policy:
llm_light_rewrite_enabled: true
preserve_ratio_min: 0.65
max_length_growth_ratio: 1.25
renumber_policy:
enabled: true
mode: "section_continuous"
# 输出配置
output:

View File

@@ -1,5 +1,6 @@
python-docx==0.8.11
PyPDF2==3.0.1
pdfplumber==0.11.4
pyyaml==6.0
requests==2.31.0
dashscope==1.7.0

View File

@@ -10,11 +10,17 @@ from .document_parser import DocumentParser
from .llm_interface import LLMInterface, QwenLLM
from .requirement_extractor import RequirementExtractor
from .json_generator import JSONGenerator
from .settings import AppSettings
from .requirement_splitter import RequirementSplitter
from .requirement_id_generator import RequirementIDGenerator
__all__ = [
'DocumentParser',
'LLMInterface',
'QwenLLM',
'RequirementExtractor',
'JSONGenerator'
'JSONGenerator',
'AppSettings',
'RequirementSplitter',
'RequirementIDGenerator',
]

View File

@@ -7,8 +7,9 @@
import os
import re
import logging
import importlib
from abc import ABC, abstractmethod
from typing import List, Dict, Tuple, Optional
from typing import List, Dict, Tuple, Optional, Any
from pathlib import Path
try:
@@ -23,6 +24,8 @@ try:
except ImportError:
HAS_PDF = False
HAS_PDF_TABLE = importlib.util.find_spec("pdfplumber") is not None
logger = logging.getLogger(__name__)
@@ -38,19 +41,28 @@ class Section:
self.parent = None
self.children = []
self.tables = []
self.blocks = []
def add_child(self, child: 'Section') -> None:
self.children.append(child)
child.parent = self
def add_content(self, text: str) -> None:
text = (text or "").strip()
if not text:
return
if self.content:
self.content += "\n" + text
else:
self.content = text
self.blocks.append({"type": "text", "text": text})
def add_table(self, table_data: List[List[str]]) -> None:
if not table_data:
return
self.tables.append(table_data)
table_index = len(self.tables) - 1
self.blocks.append({"type": "table", "table_index": table_index, "table": table_data})
def generate_auto_number(self, parent_number: str = "", sibling_index: int = 1) -> None:
"""
@@ -332,6 +344,7 @@ class PDFParser(DocumentParser):
raise ImportError("PyPDF2库未安装请运行: pip install PyPDF2")
super().__init__(file_path)
self.document_title = "SRS Document"
self._page_texts: List[str] = []
def parse(self) -> List[Section]:
"""解析PDF文档"""
@@ -348,8 +361,20 @@ class PDFParser(DocumentParser):
# 4. 使用LLM验证和清理章节如果可用
if self.llm:
self.sections = self._llm_validate_sections(self.sections)
# 章节识别失败时,创建兜底章节避免后续表格数据丢失。
if not self.sections:
fallback = Section(level=1, title="未命名章节", number="1", uid=self._next_uid())
if cleaned_text:
fallback.add_content(cleaned_text)
self.sections = [fallback]
# 5. 提取并挂接PDF表格到章节若依赖可用
pdf_tables = self._extract_pdf_tables()
if pdf_tables:
self._attach_pdf_tables_to_sections(pdf_tables)
# 5. 为没有编号的章节自动生成编号
# 6. 为没有编号的章节自动生成编号
self._auto_number_sections(self.sections)
logger.info(f"完成PDF解析提取{len(self.sections)}个顶级章节")
@@ -368,7 +393,98 @@ class PDFParser(DocumentParser):
text = page.extract_text()
if text:
all_text.append(text)
self._page_texts = all_text
return '\n'.join(all_text)
def _extract_pdf_tables(self) -> List[Dict[str, Any]]:
"""提取PDF中的表格数据。"""
if not HAS_PDF_TABLE:
logger.warning("未安装pdfplumber跳过PDF表格提取。可执行: pip install pdfplumber")
return []
tables: List[Dict[str, Any]] = []
try:
pdfplumber = importlib.import_module("pdfplumber")
with pdfplumber.open(self.file_path) as pdf:
for page_idx, page in enumerate(pdf.pages):
page_text = ""
if page_idx < len(self._page_texts):
page_text = self._page_texts[page_idx]
extracted_tables = page.extract_tables() or []
for table_idx, table in enumerate(extracted_tables):
cleaned_table: List[List[str]] = []
for row in table or []:
cells = [re.sub(r'\s+', ' ', str(cell or '')).strip() for cell in row]
if any(cells):
cleaned_table.append(cells)
if cleaned_table:
tables.append(
{
"page_idx": page_idx,
"table_idx": table_idx,
"page_text": page_text,
"data": cleaned_table,
}
)
except Exception as e:
logger.warning(f"PDF表格提取失败继续纯文本流程: {e}")
return []
logger.info(f"PDF表格提取完成{len(tables)}个表格")
return tables
def _attach_pdf_tables_to_sections(self, tables: List[Dict[str, Any]]) -> None:
"""将提取出的PDF表格挂接到最匹配的章节。"""
flat_sections = self._flatten_sections(self.sections)
if not flat_sections:
return
last_section: Optional[Section] = None
for table in tables:
matched = self._match_table_section(table.get("page_text", ""), flat_sections)
target = matched or last_section or flat_sections[0]
target.add_table(table["data"])
last_section = target
def _flatten_sections(self, sections: List[Section]) -> List[Section]:
"""按文档顺序拉平章节树。"""
result: List[Section] = []
for section in sections:
result.append(section)
if section.children:
result.extend(self._flatten_sections(section.children))
return result
def _match_table_section(self, page_text: str, sections: List[Section]) -> Optional[Section]:
"""基于页文本匹配表格归属章节。"""
normalized_page = re.sub(r"\s+", "", (page_text or "")).lower()
if not normalized_page:
return None
matched: Optional[Section] = None
matched_score = -1
for section in sections:
title = (section.title or "").strip()
if not title:
continue
number = (section.number or "").strip()
candidates = [title]
if number:
candidates.append(f"{number}{title}")
candidates.append(f"{number} {title}")
for candidate in candidates:
normalized_candidate = re.sub(r"\s+", "", candidate).lower()
if normalized_candidate and normalized_candidate in normalized_page:
score = len(normalized_candidate)
if score > matched_score:
matched = section
matched_score = score
return matched
def _clean_text(self, text: str) -> str:
"""清洗PDF提取的文本"""
@@ -494,11 +610,7 @@ class PDFParser(DocumentParser):
if len(title) > 60 or len(title) < 2:
return None
# 标题必须包含中文
if not re.search(r'[\u4e00-\u9fa5]', title):
return None
# 放宽标题关键词要求非严格GJB结构
# 放宽标题字符要求兼容部分PDF字体导致中文抽取异常的情况
if not re.search(r'[\u4e00-\u9fa5A-Za-z]', title):
return None

View File

@@ -10,6 +10,7 @@ from datetime import datetime
from typing import List, Dict, Any, Optional
from .document_parser import Section
from .requirement_extractor import Requirement
from .settings import AppSettings
logger = logging.getLogger(__name__)
@@ -17,25 +18,9 @@ logger = logging.getLogger(__name__)
class JSONGenerator:
"""JSON输出生成器"""
# 需求类型中文映射
TYPE_CHINESE = {
'functional': '功能需求',
'interface': '接口需求',
'performance': '其他需求',
'security': '其他需求',
'reliability': '其他需求',
'other': '其他需求'
}
# 非需求章节不输出到JSON
NON_REQUIREMENT_SECTIONS = [
'标识', '系统概述', '文档概述', '引用文档',
'合格性规定', '需求可追踪性', '注释', '附录',
'范围', '概述'
]
def __init__(self, config: Dict = None):
self.config = config or {}
self.settings = AppSettings(self.config)
def generate(self, sections: List[Section], requirements: List[Requirement],
document_title: str = "SRS Document") -> Dict[str, Any]:
@@ -84,7 +69,7 @@ class JSONGenerator:
"""计算需求类型统计"""
stats = {}
for req in requirements:
type_chinese = self.TYPE_CHINESE.get(req.type, '其他需求')
type_chinese = self.settings.type_chinese.get(req.type, '其他需求')
if type_chinese not in stats:
stats[type_chinese] = 0
stats[type_chinese] += 1
@@ -92,12 +77,7 @@ class JSONGenerator:
def _should_include_section(self, section: Section) -> bool:
"""判断章节是否应该包含在输出中"""
# 排除非需求章节
for keyword in self.NON_REQUIREMENT_SECTIONS:
if keyword in section.title:
return False
return True
return not self.settings.is_non_requirement_section(section.title)
def _build_requirement_content(self, sections: List[Section],
reqs_by_section: Dict[str, List[Requirement]]) -> Dict[str, Any]:
@@ -151,11 +131,12 @@ class JSONGenerator:
# 添加当前章节需求
reqs = reqs_by_section.get(section.uid or section.number or 'unknown', [])
reqs = sorted(reqs, key=lambda r: getattr(r, 'source_order', 0))
if reqs:
result["需求列表"] = []
for req in reqs:
# 需求类型放在最前面
type_chinese = self.TYPE_CHINESE.get(req.type, '功能需求')
type_chinese = self.settings.type_chinese.get(req.type, '功能需求')
req_dict = {
"需求类型": type_chinese,
"需求编号": req.id,
@@ -188,8 +169,11 @@ class JSONGenerator:
file_path: 输出文件路径
"""
try:
output_cfg = self.config.get("output", {})
indent = output_cfg.get("indent", 2)
pretty = output_cfg.get("pretty_print", True)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(output, f, ensure_ascii=False, indent=2)
json.dump(output, f, ensure_ascii=False, indent=indent if pretty else None)
logger.info(f"成功保存JSON到: {file_path}")
except Exception as e:
logger.error(f"保存JSON文件失败: {e}")

View File

@@ -9,6 +9,9 @@ import json
import logging
from typing import List, Dict, Optional, Tuple, Any
from .document_parser import Section
from .settings import AppSettings
from .requirement_id_generator import RequirementIDGenerator
from .requirement_splitter import RequirementSplitter
logger = logging.getLogger(__name__)
@@ -20,7 +23,9 @@ class Requirement:
section_number: str = "", section_title: str = "",
interface_name: str = "", interface_type: str = "",
section_uid: str = "",
source: str = "", destination: str = ""):
source: str = "", destination: str = "",
source_type: str = "text", source_order: int = 0,
source_table_index: int = -1, source_row_span: str = ""):
self.id = req_id
self.description = description
self.type = req_type
@@ -32,6 +37,10 @@ class Requirement:
self.interface_type = interface_type
self.source = source
self.destination = destination
self.source_type = source_type
self.source_order = source_order
self.source_table_index = source_table_index
self.source_row_span = source_row_span
def to_dict(self) -> Dict:
result = {
@@ -53,35 +62,20 @@ class Requirement:
class RequirementExtractor:
"""需求提取器 - LLM增强版"""
# 需求类型前缀映射
TYPE_PREFIX = {
'functional': 'FR',
'interface': 'IR',
'performance': 'PR',
'security': 'SR',
'reliability': 'RR',
'other': 'OR'
}
# 中文类型到英文的映射
TYPE_MAPPING = {
'功能需求': 'functional',
'接口需求': 'interface',
'其他需求': 'other'
}
# 非需求章节(应该跳过的)
NON_REQUIREMENT_SECTIONS = [
'标识', '系统概述', '文档概述', '引用文档',
'合格性规定', '需求可追踪性', '注释', '附录',
'范围', '概述'
]
def __init__(self, config: Dict = None, llm=None):
self.config = config or {}
self.llm = llm
self.settings = AppSettings(self.config)
self.id_generator = RequirementIDGenerator(self.settings.type_prefix)
self.splitter = None
if self.settings.splitter_enabled:
self.splitter = RequirementSplitter(
max_sentence_len=self.settings.splitter_max_sentence_len,
min_clause_len=self.settings.splitter_min_clause_len,
)
self.requirements: List[Requirement] = []
self._req_counters: Dict[str, Dict[str, int]] = {} # {section_number: {type: count}}
self._global_order = 0
def extract_from_sections(self, sections: List[Section]) -> List[Requirement]:
"""
@@ -95,9 +89,14 @@ class RequirementExtractor:
"""
self.requirements = []
self._req_counters = {}
self._global_order = 0
for section in sections:
self._process_section(section)
# 去重后统一连续重编号,避免出现跳号。
if self.settings.renumber_enabled:
self.requirements = self._renumber_requirements_continuous(self.requirements)
logger.info(f"共提取 {len(self.requirements)} 个需求项")
return self.requirements
@@ -121,10 +120,8 @@ class RequirementExtractor:
def _should_skip_section(self, section: Section) -> bool:
"""判断是否应该跳过此章节"""
# 检查标题是否包含非需求关键词
for keyword in self.NON_REQUIREMENT_SECTIONS:
if keyword in section.title:
return True
if self.settings.is_non_requirement_section(section.title):
return True
# 检查是否是系统描述章节如3.1.1通常是系统描述)
if self._is_system_description(section):
@@ -169,22 +166,96 @@ class RequirementExtractor:
return '' in response
def _extract_requirements_from_section(self, section: Section) -> List[Requirement]:
"""从单个章节提取需求"""
requirements = []
# 获取需求类型
"""从单个章节按文档顺序提取需求"""
requirements: List[Requirement] = []
req_type = self._identify_requirement_type(section.title, section.content)
if self.llm:
# 使用LLM提取需求
reqs = self._llm_extract_requirements(section, req_type)
requirements.extend(reqs)
else:
# 使用规则提取
reqs = self._rule_extract_requirements(section, req_type)
requirements.extend(reqs)
return requirements
blocks = self._iter_section_blocks(section)
for block in blocks:
block_type = block.get("type", "text")
block_order = int(block.get("order", 0))
temp_section = Section(
level=section.level,
title=section.title,
number=section.number,
content="",
uid=section.uid,
)
if block_type == "text":
temp_section.content = block.get("text", "")
if self.llm:
block_reqs = self._llm_extract_requirements(temp_section, req_type)
else:
block_reqs = self._rule_extract_requirements(temp_section, req_type)
table_index = -1
else:
table_data = block.get("table", [])
temp_section.tables = [table_data] if table_data else []
table_index = int(block.get("table_index", -1))
if self.llm and self.settings.table_llm_semantic_enabled:
block_reqs = self._llm_extract_table_requirements(temp_section, req_type)
else:
block_reqs = self._rule_extract_requirements(temp_section, req_type)
for req in block_reqs:
self._global_order += 1
req.source_type = block_type
req.source_order = self._global_order
req.source_table_index = table_index
req.source_row_span = block.get("row_span", "")
req.description = self._maybe_light_rewrite(req.description, block_type)
requirements.append(req)
requirements = self._semantic_integrity_postprocess(requirements)
return self._deduplicate_requirements(requirements)
def _iter_section_blocks(self, section: Section) -> List[Dict[str, Any]]:
"""返回章节中的顺序块(文本/表格)。"""
blocks: List[Dict[str, Any]] = []
if getattr(section, "blocks", None):
for idx, block in enumerate(section.blocks, 1):
block_type = block.get("type")
if block_type == "text":
text = (block.get("text") or "").strip()
if text:
blocks.append({"type": "text", "text": text, "order": idx})
elif block_type == "table":
table = block.get("table")
table_index = int(block.get("table_index", -1))
if table_index >= 0 and table_index < len(section.tables):
table = section.tables[table_index]
if table:
blocks.append(
{
"type": "table",
"table": table,
"table_index": table_index,
"order": idx,
}
)
if blocks:
return blocks
# 兼容旧解析结果:无顺序块时退化为文本后表格。
fallback_order = 1
text = (section.content or "").strip()
if text:
blocks.append({"type": "text", "text": text, "order": fallback_order})
fallback_order += 1
for table_index, table in enumerate(section.tables):
blocks.append(
{
"type": "table",
"table": table,
"table_index": table_index,
"order": fallback_order,
}
)
fallback_order += 1
return blocks
def _llm_extract_requirements(self, section: Section, req_type: str) -> List[Requirement]:
"""使用LLM提取需求"""
@@ -242,8 +313,8 @@ class RequirementExtractor:
JSON输出"""
else:
# 功能需求、其他需求:保留原文描述,不改写润色
prompt = f"""请从以下SRS文档章节中提取具体的软件需求。保持原文描述,不要改写或润色
# 功能需求、其他需求:以原文为主,允许轻微扩写补全
prompt = f"""请从以下SRS文档章节中提取具体的软件需求。以原文为主,允许轻微扩写补全语义
章节编号:{section.number}
章节标题:{section.title}
@@ -256,11 +327,14 @@ JSON输出"""
提取要求:
1. 同时提取正文与表格中的具体、可验证的软件需求
2. 不要提取系统描述、背景说明等非需求内容
3. 保持原文描述,不要对需求进行改写、润色或重新组织
4. 去除原文中的多余换行符和表格格式符号,但保留语句内容
3. 需求描述应保留原文大部分词语(建议保留率>=70%),仅做轻微补充以增强语义完整性
4. 严禁改变任何数值、阈值、状态名、信号名和逻辑条件
5. 去除原文中的多余换行符和表格格式符号,但保留语句内容
5. 每条需求应该是完整的句子
6. 如果有多条需求,请分别列出
7. 如果一段需求描述内有多条需求,请尽量拆分成独立需求项
7. 如果一段需求描述内有多条需求点,必须拆分成多个独立需求项
8. 拆分判定:出现“并/并且/同时/然后/且/以及”,或一条句子中出现多个动作(如判断+监测+发送)时必须拆分
9. 每条需求尽量满足“单一动作、可单独验证”
8. 过滤重复或过于相似的需求,只保留独特的需求
9. 若原文给出需求编号请优先使用原文编号req_id
@@ -300,44 +374,273 @@ JSON输出"""
if desc and len(desc) > 5:
# 清理描述中的多余换行符和表格符号
desc = self._clean_description(desc)
split_descs = self._split_requirement_description(desc)
if not split_descs:
split_descs = [desc]
# 需求ID优先使用文档给出的编号
doc_req_id = self._normalize_req_id(req_data.get('req_id', '') or req_data.get('id', ''))
if not doc_req_id:
doc_req_id, desc = self._extract_requirement_id_from_text(desc)
# 生成最终的需求ID三级优先级
req_id = self._generate_requirement_id(req_type, section.number, i, doc_req_id, parent_req_id)
# 接口需求提取额外字段
interface_name = ""
interface_type = ""
source = ""
destination = ""
if req_type == 'interface':
interface_name = req_data.get('interface_name', '未知').strip()
interface_type = req_data.get('interface_type', '未知').strip()
source = req_data.get('source', '未知').strip()
destination = req_data.get('destination', '未知').strip()
req = Requirement(
for split_idx, split_desc in enumerate(split_descs, 1):
# 生成最终的需求ID支持拆分后后缀
req_id = self._generate_requirement_id(
req_type,
section.number,
i,
doc_req_id,
parent_req_id,
split_idx,
len(split_descs),
)
# 接口需求提取额外字段
interface_name = ""
interface_type = ""
source = ""
destination = ""
if req_type == 'interface':
interface_name = req_data.get('interface_name', '未知').strip()
interface_type = req_data.get('interface_type', '未知').strip()
source = req_data.get('source', '未知').strip()
destination = req_data.get('destination', '未知').strip()
req = Requirement(
req_id=req_id,
description=split_desc,
req_type=req_type,
section_number=section.number,
section_title=section.title,
section_uid=section.uid,
interface_name=interface_name,
interface_type=interface_type,
source=source,
destination=destination
)
requirements.append(req)
except Exception as e:
logger.warning(f"LLM提取需求失败: {e},使用规则提取")
return self._rule_extract_requirements(section, req_type)
return requirements
def _build_table_requirements_rule(self, section: Section, req_type: str, start_index: int) -> List[Requirement]:
"""仅从表格构建规则需求用于LLM模式补充召回。"""
requirements: List[Requirement] = []
table_requirements = self._extract_requirements_from_tables_rule(section.tables)
if not table_requirements:
return requirements
parent_req_id = ""
complete_id_pattern = r'^[A-Za-z0-9]{2,10}[-_].+$'
for temp_id, _ in table_requirements:
if temp_id and re.match(complete_id_pattern, temp_id):
parent_req_id = temp_id.replace('_', '-')
break
index = start_index
for doc_req_id, desc in table_requirements:
split_descs = self._split_requirement_description(desc)
if not split_descs:
split_descs = [desc]
for split_idx, split_desc in enumerate(split_descs, 1):
req_id = self._generate_requirement_id(
req_type=req_type,
section_number=section.number,
index=index,
doc_req_id=doc_req_id,
parent_req_id=parent_req_id,
split_index=split_idx,
split_total=len(split_descs),
)
requirements.append(
Requirement(
req_id=req_id,
description=split_desc,
req_type=req_type,
section_number=section.number,
section_title=section.title,
section_uid=section.uid,
)
)
index += 1
return requirements
def _llm_extract_table_requirements(self, section: Section, req_type: str) -> List[Requirement]:
"""使用LLM语义化提取表格需求。"""
if not self.llm or not section.tables:
return self._rule_extract_requirements(section, req_type)
table = section.tables[0]
is_sequence_table = self._is_time_series_table(table)
table_text = self._format_tables_for_prompt([table])
merge_hint = "" if is_sequence_table and self.settings.sequence_table_merge == "single_requirement" else ""
prompt = f"""请从下列表格中提取并组织软件需求,要求以语义完整的需求句输出。
章节编号:{section.number}
章节标题:{section.title}
需求类型:{req_type}
该表是否按时间序列指令组织:{merge_hint}
表格内容:
{table_text}
提取规则:
1. 不是简单逐字抄表格,请结合列含义组织成完整需求句。
2. 保留原文大部分关键词、阈值、数值、状态名,不得改变逻辑和数值。
3. 允许轻微补充主语或上下文,使语义更完整。
4. 若为时间序列指令表优先合并为1条需求描述完整执行序列。
5. 若有明显独立语义点,可输出多条需求。
请输出JSON
{{
"requirements": [
{{"req_id": "可为空", "description": "完整需求描述"}}
]
}}"""
try:
response = self.llm.call(prompt)
data = self._parse_llm_json_response(response)
requirements: List[Requirement] = []
if data and isinstance(data.get("requirements"), list):
for i, req_data in enumerate(data["requirements"], 1):
desc = self._clean_description(req_data.get("description", ""))
if not desc:
continue
doc_req_id = self._normalize_req_id(req_data.get("req_id", ""))
req_id = self._generate_requirement_id(req_type, section.number, i, doc_req_id, "")
requirements.append(
Requirement(
req_id=req_id,
description=desc,
req_type=req_type,
section_number=section.number,
section_title=section.title,
section_uid=section.uid,
interface_name=interface_name,
interface_type=interface_type,
source=source,
destination=destination
source_type="table",
)
requirements.append(req)
)
if not requirements:
return self._rule_extract_requirements(section, req_type)
return requirements
except Exception as e:
logger.warning(f"LLM提取需求失败: {e},使用规则提取")
logger.warning(f"LLM表格语义化提取失败,回退规则模式: {e}")
return self._rule_extract_requirements(section, req_type)
return requirements
def _maybe_light_rewrite(self, description: str, source_type: str) -> str:
"""仅在LLM模式做轻微扩写且通过保真校验。"""
description = self._clean_description(description)
if not description:
return description
if not self.llm or not self.settings.llm_light_rewrite_enabled:
return description
need_rewrite = source_type == "table" or len(description) < 28
if not need_rewrite:
return description
prompt = f"""请对下面需求做轻微扩写,使语义更完整。
原文:{description}
要求:
1. 保留原文大部分表述,不改变核心语义。
2. 不得修改任何数值、阈值、状态名称、信号名称。
3. 只允许补充必要主语/宾语长度尽量控制在原文的1.25倍以内。
4. 仅返回改写后的单句文本。"""
try:
rewritten = self._clean_description(self.llm.call(prompt))
if not rewritten:
return description
preserve_ratio = self._calculate_preserve_ratio(description, rewritten)
growth_ratio = len(rewritten) / max(len(description), 1)
if preserve_ratio < self.settings.preserve_ratio_min:
return description
if growth_ratio > self.settings.max_length_growth_ratio:
return description
if not self._numbers_consistent(description, rewritten):
return description
return rewritten
except Exception:
return description
def _calculate_preserve_ratio(self, original: str, rewritten: str) -> float:
original_tokens = [c for c in re.sub(r"\s+", "", original) if c]
rewritten_tokens = set(c for c in re.sub(r"\s+", "", rewritten) if c)
if not original_tokens:
return 1.0
hit = sum(1 for c in original_tokens if c in rewritten_tokens)
return hit / max(len(original_tokens), 1)
def _numbers_consistent(self, original: str, rewritten: str) -> bool:
pattern = r"[<>≤≥]?\d+(?:\.\d+)?(?:[A-Za-z%]*)"
orig_nums = set(re.findall(pattern, original))
rewrite_nums = set(re.findall(pattern, rewritten))
return orig_nums.issubset(rewrite_nums)
def _semantic_integrity_postprocess(self, requirements: List[Requirement]) -> List[Requirement]:
"""语义完整性后处理:合并被误拆的紧耦合需求链。"""
if not self.settings.semantic_guard_enabled or not requirements:
return requirements
merged: List[Requirement] = [requirements[0]]
for req in requirements[1:]:
prev = merged[-1]
if self._should_merge_semantic(prev, req):
prev.description = self._clean_description(
f"{prev.description.rstrip(';。')}{req.description.lstrip(';。')}"
)
else:
merged.append(req)
return merged
def _should_merge_semantic(self, prev: Requirement, curr: Requirement) -> bool:
if prev.section_uid != curr.section_uid or prev.type != curr.type:
return False
prev_desc = prev.description
curr_desc = curr.description
if curr_desc.startswith(("", "", "上述", "", "该报警", "该信号")):
return True
if self.settings.preserve_alarm_chain and ("报警" in prev_desc and "持续" in curr_desc):
return True
if self.settings.preserve_condition_action_chain:
if "进入整星安全模式" in prev_desc and ("过放电模式" in curr_desc or "发送" in curr_desc):
return True
if "若蓄电池充电" in prev_desc and (
"退出低功耗模式" in curr_desc or "热控" in curr_desc or "姿控" in curr_desc
):
return True
if ("产生" in prev_desc and "报警" in prev_desc and "持续" in curr_desc):
return True
return False
def _renumber_requirements_continuous(self, requirements: List[Requirement]) -> List[Requirement]:
"""按文档顺序对去重后的需求重新连续编号。"""
if not requirements:
return requirements
ordered = sorted(requirements, key=lambda r: (r.source_order, r.section_number or ""))
counters: Dict[Tuple[str, str], int] = {}
for req in ordered:
section_key = req.section_uid or req.section_number or "NA"
prefix = self.settings.type_prefix.get(req.type, "FR")
counter_key = (section_key, prefix)
counters[counter_key] = counters.get(counter_key, 0) + 1
section_part = req.section_number if req.section_number else "NA"
req.id = f"{prefix}-{section_part}-{counters[counter_key]}"
return ordered
def _rule_extract_requirements(self, section: Section, req_type: str) -> List[Requirement]:
"""使用规则提取需求(备用方法)"""
@@ -352,7 +655,7 @@ JSON输出"""
if not descriptions:
# 如果没有列表项,将整个内容作为一个需求
desc = self._clean_description(content)
if len(desc) > 5:
if len(desc) > 5 and not section.tables:
descriptions = [f"{section.title}{desc}"]
# 表格需求
@@ -379,31 +682,55 @@ JSON输出"""
desc = self._clean_description(desc)
if len(desc) > 5:
doc_req_id, cleaned_desc = self._extract_requirement_id_from_text(desc)
# 生成最终的需求ID三级优先级
req_id = self._generate_requirement_id(req_type, section.number, index, doc_req_id, parent_req_id)
split_descs = self._split_requirement_description(cleaned_desc)
if not split_descs:
split_descs = [cleaned_desc]
for split_idx, split_desc in enumerate(split_descs, 1):
req_id = self._generate_requirement_id(
req_type,
section.number,
index,
doc_req_id,
parent_req_id,
split_idx,
len(split_descs),
)
req = Requirement(
req_id=req_id,
description=split_desc,
req_type=req_type,
section_number=section.number,
section_title=section.title,
section_uid=section.uid
)
requirements.append(req)
index += 1
for doc_req_id, desc in table_requirements:
split_descs = self._split_requirement_description(desc)
if not split_descs:
split_descs = [desc]
for split_idx, split_desc in enumerate(split_descs, 1):
req_id = self._generate_requirement_id(
req_type,
section.number,
index,
doc_req_id,
parent_req_id,
split_idx,
len(split_descs),
)
req = Requirement(
req_id=req_id,
description=cleaned_desc,
description=split_desc,
req_type=req_type,
section_number=section.number,
section_title=section.title,
section_uid=section.uid
)
requirements.append(req)
index += 1
for doc_req_id, desc in table_requirements:
# 生成最终的需求ID三级优先级
req_id = self._generate_requirement_id(req_type, section.number, index, doc_req_id, parent_req_id)
req = Requirement(
req_id=req_id,
description=desc,
req_type=req_type,
section_number=section.number,
section_title=section.title,
section_uid=section.uid
)
requirements.append(req)
index += 1
return requirements
@@ -440,21 +767,11 @@ JSON输出"""
注意:不能仅靠标题判断是否为功能需求,若无法识别具体类型,默认为功能需求
"""
title_lower = title.lower()
content_lower = (content or "").lower()[:500] # 只检查前500字符
combined_text = title_lower + " " + content_lower
# 优先识别接口需求,根据具体文件情况修改关键词
interface_keywords = ['接口', 'interface', 'api', '串口', '通信协议', '数据交换']
for keyword in interface_keywords:
if keyword in combined_text:
return 'interface'
# 默认为功能需求(不能仅靠标题判断,无法识别时默认为功能需求)
return 'functional'
return self.settings.detect_requirement_type(title, content)
def _generate_requirement_id(self, req_type: str, section_number: str, index: int,
doc_req_id: str = "", parent_req_id: str = "") -> str:
def _generate_requirement_id(self, req_type: str, section_number: str, index: int,
doc_req_id: str = "", parent_req_id: str = "",
split_index: int = 1, split_total: int = 1) -> str:
"""
生成需求ID三级优先级
@@ -473,29 +790,19 @@ JSON输出"""
doc_req_id: 文档中提取的编号/代号
parent_req_id: 父需求编号(用于子需求)
"""
# 优先级1合法的完整编号以2-10个字母或数字开头后跟分隔符
if doc_req_id:
# 检查是否为合法的完整编号格式2-10个字母或数字开头 + 分隔符 + 其他字符
# 例如: NY01-01、FR-3.1.2-1、AIRSAT07-GD03-04
complete_id_pattern = r'^[A-Za-z0-9]{2,10}[-_].+$'
if re.match(complete_id_pattern, doc_req_id):
return doc_req_id.replace('_', '-')
# 优先级2代号/序号 + 父需求编号
if doc_req_id and parent_req_id:
return f"{parent_req_id}-{doc_req_id}"
# 优先级3自动生成保留章节号中的点号
prefix = self.TYPE_PREFIX.get(req_type, 'FR') # 默认FR功能需求
section_part = section_number if section_number else "NA"
return f"{prefix}-{section_part}-{index}"
return self.id_generator.generate(
req_type=req_type,
section_number=section_number,
index=index,
doc_req_id=doc_req_id,
parent_req_id=parent_req_id,
split_index=split_index,
split_total=split_total,
)
def _normalize_req_id(self, req_id: str) -> str:
"""规范化需求编号"""
if not req_id:
return ""
req_id = str(req_id).strip()
return req_id
return self.id_generator.normalize(req_id)
def _clean_description(self, text: str) -> str:
"""清理需求描述"""
@@ -533,29 +840,28 @@ JSON输出"""
1. 完整编号NY01-01、FR-3.1.2-1
2. 代号/序号K101、D61、a)、1)
"""
return self.id_generator.extract_from_text(text)
def _split_requirement_description(self, text: str) -> List[str]:
if not text:
return None, text
# 模式1完整需求编号如 NY01-01、FR-3.1.2-1
pattern1 = r'^\s*([A-Za-z]{2,6}[-_]\d+(?:[-.\d]+)*)\s*[:\)\]】]?\s*(.+)$'
match = re.match(pattern1, text)
if match:
return match.group(1).strip(), match.group(2).strip()
# 模式2代号如 K101、D61
pattern2 = r'^\s*([A-Za-z]\d+)\s*[:\)\]】]?\s*(.+)$'
match = re.match(pattern2, text)
if match:
return match.group(1).strip(), match.group(2).strip()
# 模式3序号如 a)、1)
pattern3 = r'^\s*([a-z0-9]{1,2}[\)])\s*(.+)$'
match = re.match(pattern3, text)
if match:
code = match.group(1).strip().rstrip(')')
return code, match.group(2).strip()
return None, text
return []
if "时间序列" in text and "执行指令" in text:
return [text]
if not self.splitter:
return [text]
return self.splitter.split(text)
def _deduplicate_requirements(self, requirements: List[Requirement]) -> List[Requirement]:
seen = set()
deduped: List[Requirement] = []
for req in requirements:
normalized_desc = re.sub(r'\s+', ' ', req.description).strip().lower()
key = (req.type, normalized_desc)
if key in seen:
continue
seen.add(key)
deduped.append(req)
return deduped
def _extract_requirements_from_tables_rule(self, tables: List[List[List[str]]]) -> List[Tuple[Optional[str], str]]:
"""从表格中提取需求(规则方式)"""
@@ -569,6 +875,13 @@ JSON输出"""
for table in tables:
if not table:
continue
if self._is_time_series_table(table) and self.settings.sequence_table_merge == "single_requirement":
merged_desc = self._build_sequence_table_requirement(table)
if merged_desc:
results.append((None, merged_desc))
continue
header = table[0] if table else []
header_lower = [h.lower() for h in header]
id_idx = None
@@ -605,6 +918,58 @@ JSON输出"""
results.append((req_id, desc))
return results
def _is_time_series_table(self, table: List[List[str]]) -> bool:
if not table:
return False
header = " ".join(cell for cell in table[0] if cell)
header_has_time = any(k in header for k in ["时间", "时刻", "time", "TIME", "T0"])
header_has_action = any(k in header for k in ["指令", "动作", "行为", "操作", "名称"])
time_pattern = re.compile(r"^T\s*0(?:\s*[+-]\s*\d+\s*[sS秒]?)?$")
data_rows = table[1:] if len(table) > 1 else []
time_like_rows = 0
for row in data_rows:
if not row:
continue
first_cell = (row[0] or "").strip() if row else ""
if time_pattern.match(first_cell):
time_like_rows += 1
return (header_has_time and header_has_action) or (time_like_rows >= self.settings.merge_time_series_rows_min)
def _build_sequence_table_requirement(self, table: List[List[str]]) -> str:
if not table or len(table) < 2:
return ""
header = table[0]
time_idx = 0
action_idx = 1 if len(header) > 1 else 0
for i, col in enumerate(header):
col_text = (col or "")
if any(k in col_text for k in ["时间", "时刻", "time", "TIME"]):
time_idx = i
if any(k in col_text for k in ["指令", "动作", "行为", "操作", "名称"]):
action_idx = i
sequence_parts = []
for row in table[1:]:
if not row:
continue
row = [self._clean_description(c) for c in row]
if not any(row):
continue
t = row[time_idx] if time_idx < len(row) else ""
a = row[action_idx] if action_idx < len(row) else ""
if t and a:
sequence_parts.append(f"{t}执行{a}")
elif a:
sequence_parts.append(a)
if not sequence_parts:
return ""
return "系统应按以下时间序列依次执行指令:" + "".join(sequence_parts)
def _parse_llm_json_response(self, response: str) -> Optional[Dict]:
"""解析LLM的JSON响应"""

View File

@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
"""
需求编号生成与提取工具。
"""
import re
from typing import Optional, Tuple, Dict
class RequirementIDGenerator:
def __init__(self, type_prefix: Dict[str, str]):
self.type_prefix = type_prefix
def normalize(self, req_id: str) -> str:
if not req_id:
return ""
return str(req_id).strip()
def extract_from_text(self, text: str) -> Tuple[Optional[str], str]:
if not text:
return None, text
pattern1 = r"^\s*([A-Za-z]{2,10}[-_]\d+(?:[-.\d]+)*)\s*[:\)\]】]?\s*(.+)$"
match = re.match(pattern1, text)
if match:
return match.group(1).strip(), match.group(2).strip()
pattern2 = r"^\s*([A-Za-z]\d+)\s*[:\)\]】]?\s*(.+)$"
match = re.match(pattern2, text)
if match:
return match.group(1).strip(), match.group(2).strip()
pattern3 = r"^\s*([a-z0-9]{1,2}[\)])\s*(.+)$"
match = re.match(pattern3, text)
if match:
code = match.group(1).strip().rstrip(")")
return code, match.group(2).strip()
return None, text
def generate(
self,
req_type: str,
section_number: str,
index: int,
doc_req_id: str = "",
parent_req_id: str = "",
split_index: int = 1,
split_total: int = 1,
) -> str:
base_id = self._generate_base(req_type, section_number, index, doc_req_id, parent_req_id)
if split_total > 1:
return f"{base_id}-S{split_index}"
return base_id
def _generate_base(
self,
req_type: str,
section_number: str,
index: int,
doc_req_id: str,
parent_req_id: str,
) -> str:
if doc_req_id:
complete_id_pattern = r"^[A-Za-z0-9]{2,10}[-_].+$"
if re.match(complete_id_pattern, doc_req_id):
return doc_req_id.replace("_", "-")
if doc_req_id and parent_req_id:
return f"{parent_req_id}-{doc_req_id}"
prefix = self.type_prefix.get(req_type, "FR")
section_part = section_number if section_number else "NA"
return f"{prefix}-{section_part}-{index}"

188
src/requirement_splitter.py Normal file
View File

@@ -0,0 +1,188 @@
# -*- coding: utf-8 -*-
"""
需求长句拆分器。
将复合长句拆分为可验证的原子需求片段。
"""
import re
from typing import List
class RequirementSplitter:
ACTION_HINTS = [
"产生",
"发送",
"设置",
"进入",
"退出",
"关闭",
"开启",
"监测",
"判断",
"记录",
"上传",
"重启",
"恢复",
"关断",
"断电",
"加电",
"执行",
"进行",
]
CONNECTOR_HINTS = ["", "并且", "同时", "然后", "", "以及", ""]
CONDITIONAL_HINTS = ["如果", "", "", "", "其中", "此时", "满足"]
CONTEXT_PRONOUN_HINTS = ["", "", "上述", "", "这些", "那些"]
def __init__(self, max_sentence_len: int = 120, min_clause_len: int = 12):
self.max_sentence_len = max_sentence_len
self.min_clause_len = min_clause_len
def split(self, text: str) -> List[str]:
cleaned = self._clean(text)
if not cleaned:
return []
if self._contains_strong_semantic_chain(cleaned):
return [cleaned]
# 先按强分隔符切分为主片段。
base_parts = self._split_by_strong_punctuation(cleaned)
result: List[str] = []
for part in base_parts:
if len(part) <= self.max_sentence_len:
result.append(part)
continue
# 对超长片段进一步基于逗号和连接词拆分。
refined = self._split_long_clause(part)
result.extend(refined)
result = self._merge_semantic_chain(result)
result = self._merge_too_short(result)
return self._deduplicate(result)
def _contains_strong_semantic_chain(self, text: str) -> bool:
# 条件-动作链完整时,避免强拆。
has_conditional = any(h in text for h in ["如果", "", ""])
has_result = "" in text or "" in text
action_count = sum(1 for h in self.ACTION_HINTS if h in text)
if has_conditional and has_result and action_count >= 2:
return True
return False
def _clean(self, text: str) -> str:
text = re.sub(r"\s+", " ", text or "")
return text.strip(" ;;。")
def _split_by_strong_punctuation(self, text: str) -> List[str]:
chunks = re.split(r"[;。]", text)
return [c.strip(" ,") for c in chunks if c and c.strip(" ,")]
def _split_long_clause(self, clause: str) -> List[str]:
if self._contains_strong_semantic_chain(clause):
return [clause]
raw_parts = [x.strip() for x in re.split(r"[,]", clause) if x.strip()]
if len(raw_parts) <= 1:
return [clause]
assembled: List[str] = []
current = raw_parts[0]
for fragment in raw_parts[1:]:
if self._should_split(current, fragment):
assembled.append(current.strip())
current = fragment
else:
current = f"{current}{fragment}"
if current.strip():
assembled.append(current.strip())
return assembled
def _should_split(self, current: str, fragment: str) -> bool:
if len(current) < self.min_clause_len:
return False
# 指代承接片段通常是语义延续,不应切断。
if any(fragment.startswith(h) for h in self.CONTEXT_PRONOUN_HINTS):
return False
# 条件链中带“则/并/同时”的后继片段,优先保持在同一需求中。
if self._contains_strong_semantic_chain(current + "" + fragment):
return False
frag_starts_with_condition = any(fragment.startswith(h) for h in self.CONDITIONAL_HINTS)
if frag_starts_with_condition:
return False
has_connector = any(fragment.startswith(h) for h in self.CONNECTOR_HINTS)
has_action = any(h in fragment for h in self.ACTION_HINTS)
current_has_action = any(h in current for h in self.ACTION_HINTS)
# 连接词 + 动作词,且当前片段已经包含动作,优先拆分。
if has_connector and has_action and current_has_action:
return True
# 无连接词但出现新的动作片段且整体过长,也拆分。
if has_action and current_has_action and len(current) >= self.max_sentence_len // 2:
return True
return False
def _merge_semantic_chain(self, parts: List[str]) -> List[str]:
if not parts:
return []
merged: List[str] = [parts[0]]
for part in parts[1:]:
prev = merged[-1]
if self._should_merge(prev, part):
merged[-1] = f"{prev}{part}"
else:
merged.append(part)
return merged
def _should_merge(self, prev: str, current: str) -> bool:
# 指代开头:如“该报警信号...”。
if any(current.startswith(h) for h in self.CONTEXT_PRONOUN_HINTS):
return True
# 报警触发后的持续条件与动作属于同一链。
if ("报警" in prev and "持续" in current) or ("产生" in prev and "报警" in prev and "持续" in current):
return True
# 状态迁移 + 后续控制动作保持合并。
if ("进入" in prev or "设置" in prev or "发送" in prev) and ("" in current or "连续" in current):
return True
# 条件链分裂片段重新合并。
if self._contains_strong_semantic_chain(prev + "" + current):
return True
return False
def _merge_too_short(self, parts: List[str]) -> List[str]:
if not parts:
return []
merged: List[str] = []
for part in parts:
if merged and len(part) < self.min_clause_len:
merged[-1] = f"{merged[-1]}{part}"
else:
merged.append(part)
return merged
def _deduplicate(self, parts: List[str]) -> List[str]:
seen = set()
result = []
for part in parts:
key = re.sub(r"\s+", "", part)
if key and key not in seen:
seen.add(key)
result.append(part)
return result

162
src/settings.py Normal file
View File

@@ -0,0 +1,162 @@
# -*- coding: utf-8 -*-
"""
统一配置与映射模块。
将需求类型、章节过滤、输出映射和拆分参数收敛到单一入口。
"""
from dataclasses import dataclass
from typing import Dict, List, Any
@dataclass
class RequirementTypeRule:
key: str
chinese_name: str
prefix: str
keywords: List[str]
priority: int
class AppSettings:
"""从 config 读取并提供统一访问接口。"""
TYPE_NAME_MAP = {
"功能需求": "functional",
"接口需求": "interface",
"性能需求": "performance",
"安全需求": "security",
"可靠性需求": "reliability",
"其他需求": "other",
}
DEFAULT_NON_REQUIREMENT_SECTIONS = [
"标识",
"系统概述",
"文档概述",
"引用文档",
"合格性规定",
"需求可追踪性",
"注释",
"附录",
"范围",
"概述",
]
DEFAULT_TYPE_CHINESE = {
"functional": "功能需求",
"interface": "接口需求",
"performance": "其他需求",
"security": "其他需求",
"reliability": "其他需求",
"other": "其他需求",
}
DEFAULT_PREFIX = {
"functional": "FR",
"interface": "IR",
"performance": "PR",
"security": "SR",
"reliability": "RR",
"other": "OR",
}
def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
document_cfg = self.config.get("document", {})
self.non_requirement_sections = document_cfg.get(
"non_requirement_sections", self.DEFAULT_NON_REQUIREMENT_SECTIONS
)
extraction_cfg = self.config.get("extraction", {})
req_types_cfg = extraction_cfg.get("requirement_types", {})
self.requirement_rules = self._build_rules(req_types_cfg)
self.type_prefix = self._build_type_prefix(req_types_cfg)
self.type_chinese = self._build_type_chinese(req_types_cfg)
splitter_cfg = extraction_cfg.get("splitter", {})
self.splitter_max_sentence_len = int(splitter_cfg.get("max_sentence_len", 120))
self.splitter_min_clause_len = int(splitter_cfg.get("min_clause_len", 12))
self.splitter_enabled = bool(splitter_cfg.get("enabled", True))
semantic_cfg = extraction_cfg.get("semantic_guard", {})
self.semantic_guard_enabled = bool(semantic_cfg.get("enabled", True))
self.preserve_condition_action_chain = bool(
semantic_cfg.get("preserve_condition_action_chain", True)
)
self.preserve_alarm_chain = bool(semantic_cfg.get("preserve_alarm_chain", True))
table_cfg = extraction_cfg.get("table_strategy", {})
self.table_llm_semantic_enabled = bool(table_cfg.get("llm_semantic_enabled", True))
self.sequence_table_merge = table_cfg.get("sequence_table_merge", "single_requirement")
self.merge_time_series_rows_min = int(table_cfg.get("merge_time_series_rows_min", 3))
rewrite_cfg = extraction_cfg.get("rewrite_policy", {})
self.llm_light_rewrite_enabled = bool(rewrite_cfg.get("llm_light_rewrite_enabled", True))
self.preserve_ratio_min = float(rewrite_cfg.get("preserve_ratio_min", 0.65))
self.max_length_growth_ratio = float(rewrite_cfg.get("max_length_growth_ratio", 1.25))
renumber_cfg = extraction_cfg.get("renumber_policy", {})
self.renumber_enabled = bool(renumber_cfg.get("enabled", True))
self.renumber_mode = renumber_cfg.get("mode", "section_continuous")
def _build_rules(self, req_types_cfg: Dict[str, Dict[str, Any]]) -> List[RequirementTypeRule]:
rules: List[RequirementTypeRule] = []
if not req_types_cfg:
# 用默认两类保证兼容旧行为
return [
RequirementTypeRule(
key="interface",
chinese_name="接口需求",
prefix="IR",
keywords=["接口", "interface", "api", "串口", "通信", "CAN", "以太网"],
priority=1,
),
RequirementTypeRule(
key="functional",
chinese_name="功能需求",
prefix="FR",
keywords=["功能", "控制", "处理", "监测", "显示"],
priority=2,
),
]
for zh_name, item in req_types_cfg.items():
key = self.TYPE_NAME_MAP.get(zh_name, "other")
rules.append(
RequirementTypeRule(
key=key,
chinese_name=zh_name,
prefix=item.get("prefix", self.DEFAULT_PREFIX.get(key, "FR")),
keywords=item.get("keywords", []),
priority=int(item.get("priority", 99)),
)
)
return sorted(rules, key=lambda x: x.priority)
def _build_type_prefix(self, req_types_cfg: Dict[str, Dict[str, Any]]) -> Dict[str, str]:
mapping = dict(self.DEFAULT_PREFIX)
for zh_name, key in self.TYPE_NAME_MAP.items():
if zh_name in req_types_cfg:
mapping[key] = req_types_cfg[zh_name].get("prefix", mapping[key])
return mapping
def _build_type_chinese(self, req_types_cfg: Dict[str, Dict[str, Any]]) -> Dict[str, str]:
mapping = dict(self.DEFAULT_TYPE_CHINESE)
for zh_name, key in self.TYPE_NAME_MAP.items():
if zh_name in req_types_cfg:
mapping[key] = zh_name
return mapping
def is_non_requirement_section(self, title: str) -> bool:
return any(keyword in title for keyword in self.non_requirement_sections)
def detect_requirement_type(self, title: str, content: str) -> str:
combined_text = f"{title} {(content or '')[:500]}".lower()
for rule in self.requirement_rules:
for keyword in rule.keywords:
if keyword.lower() in combined_text:
return rule.key
return "functional"