diff --git a/README.md b/README.md index 76f09c1..6b6b6e7 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,8 @@ - **智能过滤**:自动过滤系统描述、重复需求等非需求内容 - **结构化输出**:按章节层次组织的JSON格式输出 - **表格需求识别**:支持从表格中提取功能/接口/其他需求 +- **PDF表格提取**:支持从PDF中提取表格并自动挂接到章节 +- **长句原子拆分**:自动将包含多个需求点的长句拆分为多个可验证需求项 ## 快速开始 @@ -20,6 +22,9 @@ pip install -r requirements.txt # 如果使用LLM功能,还需安装: pip install dashscope + +# 若需增强PDF表格提取能力(requirements.txt已包含) +pip install pdfplumber ``` ### 配置API密钥(LLM模式) @@ -41,7 +46,7 @@ llm: ```bash # LLM增强模式 -python main.py -i DC-SRS.pdf -o output.json +python main.py -i ".\input\DC-SRS.pdf" -o ".\output\output.json" # 纯规则模式(不使用LLM) python main.py -i DC-SRS.pdf -o output.json --no-llm diff --git a/config.yaml b/config.yaml index 7644bab..8fa4a5c 100644 --- a/config.yaml +++ b/config.yaml @@ -8,7 +8,7 @@ llm: # LLM提供商:qwen(阿里云千问) provider: "qwen" # 模型名称 - model: "qwen3-max" + model: "qwen3-max-2026-01-23" # API密钥(建议使用环境变量 DASHSCOPE_API_KEY) api_key: "sk-7097f7842f724f0c9e70c4bf3b16dacb" # 可选参数 @@ -66,6 +66,25 @@ extraction: prefix: "OR" keywords: ["约束", "资源", "适应性", "保密", "环境", "计算机", "质量", "设计", "人员", "培训", "保障", "验收", "交付"] priority: 6 + splitter: + enabled: true + max_sentence_len: 120 + min_clause_len: 12 + semantic_guard: + enabled: true + preserve_condition_action_chain: true + preserve_alarm_chain: true + table_strategy: + llm_semantic_enabled: true + sequence_table_merge: "single_requirement" + merge_time_series_rows_min: 3 + rewrite_policy: + llm_light_rewrite_enabled: true + preserve_ratio_min: 0.65 + max_length_growth_ratio: 1.25 + renumber_policy: + enabled: true + mode: "section_continuous" # 输出配置 output: diff --git a/requirements.txt b/requirements.txt index 669d9f3..52ea734 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ python-docx==0.8.11 PyPDF2==3.0.1 +pdfplumber==0.11.4 pyyaml==6.0 requests==2.31.0 dashscope==1.7.0 diff --git a/src/__init__.py b/src/__init__.py index dd95240..4eb5ca7 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -10,11 +10,17 @@ from .document_parser import DocumentParser from .llm_interface import LLMInterface, QwenLLM from .requirement_extractor import RequirementExtractor from .json_generator import JSONGenerator +from .settings import AppSettings +from .requirement_splitter import RequirementSplitter +from .requirement_id_generator import RequirementIDGenerator __all__ = [ 'DocumentParser', 'LLMInterface', 'QwenLLM', 'RequirementExtractor', - 'JSONGenerator' + 'JSONGenerator', + 'AppSettings', + 'RequirementSplitter', + 'RequirementIDGenerator', ] diff --git a/src/document_parser.py b/src/document_parser.py index b4c6190..859029a 100644 --- a/src/document_parser.py +++ b/src/document_parser.py @@ -7,8 +7,9 @@ import os import re import logging +import importlib from abc import ABC, abstractmethod -from typing import List, Dict, Tuple, Optional +from typing import List, Dict, Tuple, Optional, Any from pathlib import Path try: @@ -23,6 +24,8 @@ try: except ImportError: HAS_PDF = False +HAS_PDF_TABLE = importlib.util.find_spec("pdfplumber") is not None + logger = logging.getLogger(__name__) @@ -38,19 +41,28 @@ class Section: self.parent = None self.children = [] self.tables = [] + self.blocks = [] def add_child(self, child: 'Section') -> None: self.children.append(child) child.parent = self def add_content(self, text: str) -> None: + text = (text or "").strip() + if not text: + return if self.content: self.content += "\n" + text else: self.content = text + self.blocks.append({"type": "text", "text": text}) def add_table(self, table_data: List[List[str]]) -> None: + if not table_data: + return self.tables.append(table_data) + table_index = len(self.tables) - 1 + self.blocks.append({"type": "table", "table_index": table_index, "table": table_data}) def generate_auto_number(self, parent_number: str = "", sibling_index: int = 1) -> None: """ @@ -332,6 +344,7 @@ class PDFParser(DocumentParser): raise ImportError("PyPDF2库未安装,请运行: pip install PyPDF2") super().__init__(file_path) self.document_title = "SRS Document" + self._page_texts: List[str] = [] def parse(self) -> List[Section]: """解析PDF文档""" @@ -348,8 +361,20 @@ class PDFParser(DocumentParser): # 4. 使用LLM验证和清理章节(如果可用) if self.llm: self.sections = self._llm_validate_sections(self.sections) + + # 章节识别失败时,创建兜底章节避免后续表格数据丢失。 + if not self.sections: + fallback = Section(level=1, title="未命名章节", number="1", uid=self._next_uid()) + if cleaned_text: + fallback.add_content(cleaned_text) + self.sections = [fallback] + + # 5. 提取并挂接PDF表格到章节(若依赖可用) + pdf_tables = self._extract_pdf_tables() + if pdf_tables: + self._attach_pdf_tables_to_sections(pdf_tables) - # 5. 为没有编号的章节自动生成编号 + # 6. 为没有编号的章节自动生成编号 self._auto_number_sections(self.sections) logger.info(f"完成PDF解析,提取{len(self.sections)}个顶级章节") @@ -368,7 +393,98 @@ class PDFParser(DocumentParser): text = page.extract_text() if text: all_text.append(text) + self._page_texts = all_text return '\n'.join(all_text) + + def _extract_pdf_tables(self) -> List[Dict[str, Any]]: + """提取PDF中的表格数据。""" + if not HAS_PDF_TABLE: + logger.warning("未安装pdfplumber,跳过PDF表格提取。可执行: pip install pdfplumber") + return [] + + tables: List[Dict[str, Any]] = [] + try: + pdfplumber = importlib.import_module("pdfplumber") + with pdfplumber.open(self.file_path) as pdf: + for page_idx, page in enumerate(pdf.pages): + page_text = "" + if page_idx < len(self._page_texts): + page_text = self._page_texts[page_idx] + + extracted_tables = page.extract_tables() or [] + for table_idx, table in enumerate(extracted_tables): + cleaned_table: List[List[str]] = [] + for row in table or []: + cells = [re.sub(r'\s+', ' ', str(cell or '')).strip() for cell in row] + if any(cells): + cleaned_table.append(cells) + + if cleaned_table: + tables.append( + { + "page_idx": page_idx, + "table_idx": table_idx, + "page_text": page_text, + "data": cleaned_table, + } + ) + except Exception as e: + logger.warning(f"PDF表格提取失败,继续纯文本流程: {e}") + return [] + + logger.info(f"PDF表格提取完成,共{len(tables)}个表格") + return tables + + def _attach_pdf_tables_to_sections(self, tables: List[Dict[str, Any]]) -> None: + """将提取出的PDF表格挂接到最匹配的章节。""" + flat_sections = self._flatten_sections(self.sections) + if not flat_sections: + return + + last_section: Optional[Section] = None + for table in tables: + matched = self._match_table_section(table.get("page_text", ""), flat_sections) + target = matched or last_section or flat_sections[0] + target.add_table(table["data"]) + last_section = target + + def _flatten_sections(self, sections: List[Section]) -> List[Section]: + """按文档顺序拉平章节树。""" + result: List[Section] = [] + for section in sections: + result.append(section) + if section.children: + result.extend(self._flatten_sections(section.children)) + return result + + def _match_table_section(self, page_text: str, sections: List[Section]) -> Optional[Section]: + """基于页文本匹配表格归属章节。""" + normalized_page = re.sub(r"\s+", "", (page_text or "")).lower() + if not normalized_page: + return None + + matched: Optional[Section] = None + matched_score = -1 + for section in sections: + title = (section.title or "").strip() + if not title: + continue + + number = (section.number or "").strip() + candidates = [title] + if number: + candidates.append(f"{number}{title}") + candidates.append(f"{number} {title}") + + for candidate in candidates: + normalized_candidate = re.sub(r"\s+", "", candidate).lower() + if normalized_candidate and normalized_candidate in normalized_page: + score = len(normalized_candidate) + if score > matched_score: + matched = section + matched_score = score + + return matched def _clean_text(self, text: str) -> str: """清洗PDF提取的文本""" @@ -494,11 +610,7 @@ class PDFParser(DocumentParser): if len(title) > 60 or len(title) < 2: return None - # 标题必须包含中文 - if not re.search(r'[\u4e00-\u9fa5]', title): - return None - - # 放宽标题关键词要求(非严格GJB结构) + # 放宽标题字符要求(兼容部分PDF字体导致中文抽取异常的情况) if not re.search(r'[\u4e00-\u9fa5A-Za-z]', title): return None diff --git a/src/json_generator.py b/src/json_generator.py index 0eca52c..1bc46a3 100644 --- a/src/json_generator.py +++ b/src/json_generator.py @@ -10,6 +10,7 @@ from datetime import datetime from typing import List, Dict, Any, Optional from .document_parser import Section from .requirement_extractor import Requirement +from .settings import AppSettings logger = logging.getLogger(__name__) @@ -17,25 +18,9 @@ logger = logging.getLogger(__name__) class JSONGenerator: """JSON输出生成器""" - # 需求类型中文映射 - TYPE_CHINESE = { - 'functional': '功能需求', - 'interface': '接口需求', - 'performance': '其他需求', - 'security': '其他需求', - 'reliability': '其他需求', - 'other': '其他需求' - } - - # 非需求章节(不输出到JSON) - NON_REQUIREMENT_SECTIONS = [ - '标识', '系统概述', '文档概述', '引用文档', - '合格性规定', '需求可追踪性', '注释', '附录', - '范围', '概述' - ] - def __init__(self, config: Dict = None): self.config = config or {} + self.settings = AppSettings(self.config) def generate(self, sections: List[Section], requirements: List[Requirement], document_title: str = "SRS Document") -> Dict[str, Any]: @@ -84,7 +69,7 @@ class JSONGenerator: """计算需求类型统计""" stats = {} for req in requirements: - type_chinese = self.TYPE_CHINESE.get(req.type, '其他需求') + type_chinese = self.settings.type_chinese.get(req.type, '其他需求') if type_chinese not in stats: stats[type_chinese] = 0 stats[type_chinese] += 1 @@ -92,12 +77,7 @@ class JSONGenerator: def _should_include_section(self, section: Section) -> bool: """判断章节是否应该包含在输出中""" - # 排除非需求章节 - for keyword in self.NON_REQUIREMENT_SECTIONS: - if keyword in section.title: - return False - - return True + return not self.settings.is_non_requirement_section(section.title) def _build_requirement_content(self, sections: List[Section], reqs_by_section: Dict[str, List[Requirement]]) -> Dict[str, Any]: @@ -151,11 +131,12 @@ class JSONGenerator: # 添加当前章节需求 reqs = reqs_by_section.get(section.uid or section.number or 'unknown', []) + reqs = sorted(reqs, key=lambda r: getattr(r, 'source_order', 0)) if reqs: result["需求列表"] = [] for req in reqs: # 需求类型放在最前面 - type_chinese = self.TYPE_CHINESE.get(req.type, '功能需求') + type_chinese = self.settings.type_chinese.get(req.type, '功能需求') req_dict = { "需求类型": type_chinese, "需求编号": req.id, @@ -188,8 +169,11 @@ class JSONGenerator: file_path: 输出文件路径 """ try: + output_cfg = self.config.get("output", {}) + indent = output_cfg.get("indent", 2) + pretty = output_cfg.get("pretty_print", True) with open(file_path, 'w', encoding='utf-8') as f: - json.dump(output, f, ensure_ascii=False, indent=2) + json.dump(output, f, ensure_ascii=False, indent=indent if pretty else None) logger.info(f"成功保存JSON到: {file_path}") except Exception as e: logger.error(f"保存JSON文件失败: {e}") diff --git a/src/requirement_extractor.py b/src/requirement_extractor.py index 6a9a1a0..dbfef14 100644 --- a/src/requirement_extractor.py +++ b/src/requirement_extractor.py @@ -9,6 +9,9 @@ import json import logging from typing import List, Dict, Optional, Tuple, Any from .document_parser import Section +from .settings import AppSettings +from .requirement_id_generator import RequirementIDGenerator +from .requirement_splitter import RequirementSplitter logger = logging.getLogger(__name__) @@ -20,7 +23,9 @@ class Requirement: section_number: str = "", section_title: str = "", interface_name: str = "", interface_type: str = "", section_uid: str = "", - source: str = "", destination: str = ""): + source: str = "", destination: str = "", + source_type: str = "text", source_order: int = 0, + source_table_index: int = -1, source_row_span: str = ""): self.id = req_id self.description = description self.type = req_type @@ -32,6 +37,10 @@ class Requirement: self.interface_type = interface_type self.source = source self.destination = destination + self.source_type = source_type + self.source_order = source_order + self.source_table_index = source_table_index + self.source_row_span = source_row_span def to_dict(self) -> Dict: result = { @@ -53,35 +62,20 @@ class Requirement: class RequirementExtractor: """需求提取器 - LLM增强版""" - # 需求类型前缀映射 - TYPE_PREFIX = { - 'functional': 'FR', - 'interface': 'IR', - 'performance': 'PR', - 'security': 'SR', - 'reliability': 'RR', - 'other': 'OR' - } - - # 中文类型到英文的映射 - TYPE_MAPPING = { - '功能需求': 'functional', - '接口需求': 'interface', - '其他需求': 'other' - } - - # 非需求章节(应该跳过的) - NON_REQUIREMENT_SECTIONS = [ - '标识', '系统概述', '文档概述', '引用文档', - '合格性规定', '需求可追踪性', '注释', '附录', - '范围', '概述' - ] - def __init__(self, config: Dict = None, llm=None): self.config = config or {} self.llm = llm + self.settings = AppSettings(self.config) + self.id_generator = RequirementIDGenerator(self.settings.type_prefix) + self.splitter = None + if self.settings.splitter_enabled: + self.splitter = RequirementSplitter( + max_sentence_len=self.settings.splitter_max_sentence_len, + min_clause_len=self.settings.splitter_min_clause_len, + ) self.requirements: List[Requirement] = [] self._req_counters: Dict[str, Dict[str, int]] = {} # {section_number: {type: count}} + self._global_order = 0 def extract_from_sections(self, sections: List[Section]) -> List[Requirement]: """ @@ -95,9 +89,14 @@ class RequirementExtractor: """ self.requirements = [] self._req_counters = {} + self._global_order = 0 for section in sections: self._process_section(section) + + # 去重后统一连续重编号,避免出现跳号。 + if self.settings.renumber_enabled: + self.requirements = self._renumber_requirements_continuous(self.requirements) logger.info(f"共提取 {len(self.requirements)} 个需求项") return self.requirements @@ -121,10 +120,8 @@ class RequirementExtractor: def _should_skip_section(self, section: Section) -> bool: """判断是否应该跳过此章节""" - # 检查标题是否包含非需求关键词 - for keyword in self.NON_REQUIREMENT_SECTIONS: - if keyword in section.title: - return True + if self.settings.is_non_requirement_section(section.title): + return True # 检查是否是系统描述章节(如3.1.1通常是系统描述) if self._is_system_description(section): @@ -169,22 +166,96 @@ class RequirementExtractor: return '是' in response def _extract_requirements_from_section(self, section: Section) -> List[Requirement]: - """从单个章节提取需求""" - requirements = [] - - # 获取需求类型 + """从单个章节按文档顺序提取需求。""" + requirements: List[Requirement] = [] req_type = self._identify_requirement_type(section.title, section.content) - - if self.llm: - # 使用LLM提取需求 - reqs = self._llm_extract_requirements(section, req_type) - requirements.extend(reqs) - else: - # 使用规则提取 - reqs = self._rule_extract_requirements(section, req_type) - requirements.extend(reqs) - - return requirements + + blocks = self._iter_section_blocks(section) + for block in blocks: + block_type = block.get("type", "text") + block_order = int(block.get("order", 0)) + + temp_section = Section( + level=section.level, + title=section.title, + number=section.number, + content="", + uid=section.uid, + ) + + if block_type == "text": + temp_section.content = block.get("text", "") + if self.llm: + block_reqs = self._llm_extract_requirements(temp_section, req_type) + else: + block_reqs = self._rule_extract_requirements(temp_section, req_type) + table_index = -1 + else: + table_data = block.get("table", []) + temp_section.tables = [table_data] if table_data else [] + table_index = int(block.get("table_index", -1)) + if self.llm and self.settings.table_llm_semantic_enabled: + block_reqs = self._llm_extract_table_requirements(temp_section, req_type) + else: + block_reqs = self._rule_extract_requirements(temp_section, req_type) + + for req in block_reqs: + self._global_order += 1 + req.source_type = block_type + req.source_order = self._global_order + req.source_table_index = table_index + req.source_row_span = block.get("row_span", "") + req.description = self._maybe_light_rewrite(req.description, block_type) + requirements.append(req) + + requirements = self._semantic_integrity_postprocess(requirements) + return self._deduplicate_requirements(requirements) + + def _iter_section_blocks(self, section: Section) -> List[Dict[str, Any]]: + """返回章节中的顺序块(文本/表格)。""" + blocks: List[Dict[str, Any]] = [] + if getattr(section, "blocks", None): + for idx, block in enumerate(section.blocks, 1): + block_type = block.get("type") + if block_type == "text": + text = (block.get("text") or "").strip() + if text: + blocks.append({"type": "text", "text": text, "order": idx}) + elif block_type == "table": + table = block.get("table") + table_index = int(block.get("table_index", -1)) + if table_index >= 0 and table_index < len(section.tables): + table = section.tables[table_index] + if table: + blocks.append( + { + "type": "table", + "table": table, + "table_index": table_index, + "order": idx, + } + ) + + if blocks: + return blocks + + # 兼容旧解析结果:无顺序块时退化为文本后表格。 + fallback_order = 1 + text = (section.content or "").strip() + if text: + blocks.append({"type": "text", "text": text, "order": fallback_order}) + fallback_order += 1 + for table_index, table in enumerate(section.tables): + blocks.append( + { + "type": "table", + "table": table, + "table_index": table_index, + "order": fallback_order, + } + ) + fallback_order += 1 + return blocks def _llm_extract_requirements(self, section: Section, req_type: str) -> List[Requirement]: """使用LLM提取需求""" @@ -242,8 +313,8 @@ class RequirementExtractor: JSON输出:""" else: - # 功能需求、其他需求:保留原文描述,不改写润色 - prompt = f"""请从以下SRS文档章节中提取具体的软件需求。保持原文描述,不要改写或润色。 + # 功能需求、其他需求:以原文为主,允许轻微扩写补全 + prompt = f"""请从以下SRS文档章节中提取具体的软件需求。以原文为主,允许轻微扩写补全语义。 章节编号:{section.number} 章节标题:{section.title} @@ -256,11 +327,14 @@ JSON输出:""" 提取要求: 1. 同时提取正文与表格中的具体、可验证的软件需求 2. 不要提取系统描述、背景说明等非需求内容 -3. 保持原文描述,不要对需求进行改写、润色或重新组织 -4. 去除原文中的多余换行符和表格格式符号,但保留语句内容 +3. 需求描述应保留原文大部分词语(建议保留率>=70%),仅做轻微补充以增强语义完整性 +4. 严禁改变任何数值、阈值、状态名、信号名和逻辑条件 +5. 去除原文中的多余换行符和表格格式符号,但保留语句内容 5. 每条需求应该是完整的句子 6. 如果有多条需求,请分别列出 -7. 如果一段需求描述内有多条需求,请尽量拆分成独立的需求项 +7. 如果一段需求描述内有多条需求点,必须拆分成多个独立需求项 +8. 拆分判定:出现“并/并且/同时/然后/且/以及”,或一条句子中出现多个动作(如判断+监测+发送)时必须拆分 +9. 每条需求尽量满足“单一动作、可单独验证” 8. 过滤重复或过于相似的需求,只保留独特的需求 9. 若原文给出需求编号,请优先使用原文编号(req_id) @@ -300,44 +374,273 @@ JSON输出:""" if desc and len(desc) > 5: # 清理描述中的多余换行符和表格符号 desc = self._clean_description(desc) + split_descs = self._split_requirement_description(desc) + if not split_descs: + split_descs = [desc] # 需求ID优先使用文档给出的编号 doc_req_id = self._normalize_req_id(req_data.get('req_id', '') or req_data.get('id', '')) if not doc_req_id: doc_req_id, desc = self._extract_requirement_id_from_text(desc) - - # 生成最终的需求ID(三级优先级) - req_id = self._generate_requirement_id(req_type, section.number, i, doc_req_id, parent_req_id) - - # 接口需求提取额外字段 - interface_name = "" - interface_type = "" - source = "" - destination = "" - if req_type == 'interface': - interface_name = req_data.get('interface_name', '未知').strip() - interface_type = req_data.get('interface_type', '未知').strip() - source = req_data.get('source', '未知').strip() - destination = req_data.get('destination', '未知').strip() - - req = Requirement( + + for split_idx, split_desc in enumerate(split_descs, 1): + # 生成最终的需求ID(支持拆分后后缀) + req_id = self._generate_requirement_id( + req_type, + section.number, + i, + doc_req_id, + parent_req_id, + split_idx, + len(split_descs), + ) + + # 接口需求提取额外字段 + interface_name = "" + interface_type = "" + source = "" + destination = "" + if req_type == 'interface': + interface_name = req_data.get('interface_name', '未知').strip() + interface_type = req_data.get('interface_type', '未知').strip() + source = req_data.get('source', '未知').strip() + destination = req_data.get('destination', '未知').strip() + + req = Requirement( + req_id=req_id, + description=split_desc, + req_type=req_type, + section_number=section.number, + section_title=section.title, + section_uid=section.uid, + interface_name=interface_name, + interface_type=interface_type, + source=source, + destination=destination + ) + requirements.append(req) + except Exception as e: + logger.warning(f"LLM提取需求失败: {e},使用规则提取") + return self._rule_extract_requirements(section, req_type) + + return requirements + + def _build_table_requirements_rule(self, section: Section, req_type: str, start_index: int) -> List[Requirement]: + """仅从表格构建规则需求,用于LLM模式补充召回。""" + requirements: List[Requirement] = [] + table_requirements = self._extract_requirements_from_tables_rule(section.tables) + if not table_requirements: + return requirements + + parent_req_id = "" + complete_id_pattern = r'^[A-Za-z0-9]{2,10}[-_].+$' + for temp_id, _ in table_requirements: + if temp_id and re.match(complete_id_pattern, temp_id): + parent_req_id = temp_id.replace('_', '-') + break + + index = start_index + for doc_req_id, desc in table_requirements: + split_descs = self._split_requirement_description(desc) + if not split_descs: + split_descs = [desc] + + for split_idx, split_desc in enumerate(split_descs, 1): + req_id = self._generate_requirement_id( + req_type=req_type, + section_number=section.number, + index=index, + doc_req_id=doc_req_id, + parent_req_id=parent_req_id, + split_index=split_idx, + split_total=len(split_descs), + ) + requirements.append( + Requirement( + req_id=req_id, + description=split_desc, + req_type=req_type, + section_number=section.number, + section_title=section.title, + section_uid=section.uid, + ) + ) + index += 1 + + return requirements + + def _llm_extract_table_requirements(self, section: Section, req_type: str) -> List[Requirement]: + """使用LLM语义化提取表格需求。""" + if not self.llm or not section.tables: + return self._rule_extract_requirements(section, req_type) + + table = section.tables[0] + is_sequence_table = self._is_time_series_table(table) + table_text = self._format_tables_for_prompt([table]) + merge_hint = "是" if is_sequence_table and self.settings.sequence_table_merge == "single_requirement" else "否" + + prompt = f"""请从下列表格中提取并组织软件需求,要求以语义完整的需求句输出。 + +章节编号:{section.number} +章节标题:{section.title} +需求类型:{req_type} +该表是否按时间序列指令组织:{merge_hint} + +表格内容: +{table_text} + +提取规则: +1. 不是简单逐字抄表格,请结合列含义组织成完整需求句。 +2. 保留原文大部分关键词、阈值、数值、状态名,不得改变逻辑和数值。 +3. 允许轻微补充主语或上下文,使语义更完整。 +4. 若为时间序列指令表,优先合并为1条需求,描述完整执行序列。 +5. 若有明显独立语义点,可输出多条需求。 + +请输出JSON: +{{ + "requirements": [ + {{"req_id": "可为空", "description": "完整需求描述"}} + ] +}}""" + + try: + response = self.llm.call(prompt) + data = self._parse_llm_json_response(response) + requirements: List[Requirement] = [] + if data and isinstance(data.get("requirements"), list): + for i, req_data in enumerate(data["requirements"], 1): + desc = self._clean_description(req_data.get("description", "")) + if not desc: + continue + doc_req_id = self._normalize_req_id(req_data.get("req_id", "")) + req_id = self._generate_requirement_id(req_type, section.number, i, doc_req_id, "") + requirements.append( + Requirement( req_id=req_id, description=desc, req_type=req_type, section_number=section.number, section_title=section.title, section_uid=section.uid, - interface_name=interface_name, - interface_type=interface_type, - source=source, - destination=destination + source_type="table", ) - requirements.append(req) + ) + + if not requirements: + return self._rule_extract_requirements(section, req_type) + return requirements except Exception as e: - logger.warning(f"LLM提取需求失败: {e},使用规则提取") + logger.warning(f"LLM表格语义化提取失败,回退规则模式: {e}") return self._rule_extract_requirements(section, req_type) - - return requirements + + def _maybe_light_rewrite(self, description: str, source_type: str) -> str: + """仅在LLM模式做轻微扩写,且通过保真校验。""" + description = self._clean_description(description) + if not description: + return description + if not self.llm or not self.settings.llm_light_rewrite_enabled: + return description + + need_rewrite = source_type == "table" or len(description) < 28 + if not need_rewrite: + return description + + prompt = f"""请对下面需求做轻微扩写,使语义更完整。 + +原文:{description} + +要求: +1. 保留原文大部分表述,不改变核心语义。 +2. 不得修改任何数值、阈值、状态名称、信号名称。 +3. 只允许补充必要主语/宾语,长度尽量控制在原文的1.25倍以内。 +4. 仅返回改写后的单句文本。""" + + try: + rewritten = self._clean_description(self.llm.call(prompt)) + if not rewritten: + return description + + preserve_ratio = self._calculate_preserve_ratio(description, rewritten) + growth_ratio = len(rewritten) / max(len(description), 1) + if preserve_ratio < self.settings.preserve_ratio_min: + return description + if growth_ratio > self.settings.max_length_growth_ratio: + return description + if not self._numbers_consistent(description, rewritten): + return description + return rewritten + except Exception: + return description + + def _calculate_preserve_ratio(self, original: str, rewritten: str) -> float: + original_tokens = [c for c in re.sub(r"\s+", "", original) if c] + rewritten_tokens = set(c for c in re.sub(r"\s+", "", rewritten) if c) + if not original_tokens: + return 1.0 + hit = sum(1 for c in original_tokens if c in rewritten_tokens) + return hit / max(len(original_tokens), 1) + + def _numbers_consistent(self, original: str, rewritten: str) -> bool: + pattern = r"[<>≤≥]?\d+(?:\.\d+)?(?:[A-Za-z%]*)" + orig_nums = set(re.findall(pattern, original)) + rewrite_nums = set(re.findall(pattern, rewritten)) + return orig_nums.issubset(rewrite_nums) + + def _semantic_integrity_postprocess(self, requirements: List[Requirement]) -> List[Requirement]: + """语义完整性后处理:合并被误拆的紧耦合需求链。""" + if not self.settings.semantic_guard_enabled or not requirements: + return requirements + + merged: List[Requirement] = [requirements[0]] + for req in requirements[1:]: + prev = merged[-1] + if self._should_merge_semantic(prev, req): + prev.description = self._clean_description( + f"{prev.description.rstrip(';;。')};{req.description.lstrip(';;。')}" + ) + else: + merged.append(req) + return merged + + def _should_merge_semantic(self, prev: Requirement, curr: Requirement) -> bool: + if prev.section_uid != curr.section_uid or prev.type != curr.type: + return False + + prev_desc = prev.description + curr_desc = curr.description + + if curr_desc.startswith(("该", "其", "上述", "此", "该报警", "该信号")): + return True + if self.settings.preserve_alarm_chain and ("报警" in prev_desc and "持续" in curr_desc): + return True + if self.settings.preserve_condition_action_chain: + if "进入整星安全模式" in prev_desc and ("过放电模式" in curr_desc or "发送" in curr_desc): + return True + if "若蓄电池充电" in prev_desc and ( + "退出低功耗模式" in curr_desc or "热控" in curr_desc or "姿控" in curr_desc + ): + return True + if ("产生" in prev_desc and "报警" in prev_desc and "持续" in curr_desc): + return True + return False + + def _renumber_requirements_continuous(self, requirements: List[Requirement]) -> List[Requirement]: + """按文档顺序对去重后的需求重新连续编号。""" + if not requirements: + return requirements + + ordered = sorted(requirements, key=lambda r: (r.source_order, r.section_number or "")) + counters: Dict[Tuple[str, str], int] = {} + + for req in ordered: + section_key = req.section_uid or req.section_number or "NA" + prefix = self.settings.type_prefix.get(req.type, "FR") + counter_key = (section_key, prefix) + counters[counter_key] = counters.get(counter_key, 0) + 1 + section_part = req.section_number if req.section_number else "NA" + req.id = f"{prefix}-{section_part}-{counters[counter_key]}" + + return ordered def _rule_extract_requirements(self, section: Section, req_type: str) -> List[Requirement]: """使用规则提取需求(备用方法)""" @@ -352,7 +655,7 @@ JSON输出:""" if not descriptions: # 如果没有列表项,将整个内容作为一个需求 desc = self._clean_description(content) - if len(desc) > 5: + if len(desc) > 5 and not section.tables: descriptions = [f"{section.title}:{desc}"] # 表格需求 @@ -379,31 +682,55 @@ JSON输出:""" desc = self._clean_description(desc) if len(desc) > 5: doc_req_id, cleaned_desc = self._extract_requirement_id_from_text(desc) - # 生成最终的需求ID(三级优先级) - req_id = self._generate_requirement_id(req_type, section.number, index, doc_req_id, parent_req_id) + split_descs = self._split_requirement_description(cleaned_desc) + if not split_descs: + split_descs = [cleaned_desc] + + for split_idx, split_desc in enumerate(split_descs, 1): + req_id = self._generate_requirement_id( + req_type, + section.number, + index, + doc_req_id, + parent_req_id, + split_idx, + len(split_descs), + ) + req = Requirement( + req_id=req_id, + description=split_desc, + req_type=req_type, + section_number=section.number, + section_title=section.title, + section_uid=section.uid + ) + requirements.append(req) + index += 1 + + for doc_req_id, desc in table_requirements: + split_descs = self._split_requirement_description(desc) + if not split_descs: + split_descs = [desc] + + for split_idx, split_desc in enumerate(split_descs, 1): + req_id = self._generate_requirement_id( + req_type, + section.number, + index, + doc_req_id, + parent_req_id, + split_idx, + len(split_descs), + ) req = Requirement( req_id=req_id, - description=cleaned_desc, + description=split_desc, req_type=req_type, section_number=section.number, section_title=section.title, section_uid=section.uid ) requirements.append(req) - index += 1 - - for doc_req_id, desc in table_requirements: - # 生成最终的需求ID(三级优先级) - req_id = self._generate_requirement_id(req_type, section.number, index, doc_req_id, parent_req_id) - req = Requirement( - req_id=req_id, - description=desc, - req_type=req_type, - section_number=section.number, - section_title=section.title, - section_uid=section.uid - ) - requirements.append(req) index += 1 return requirements @@ -440,21 +767,11 @@ JSON输出:""" 注意:不能仅靠标题判断是否为功能需求,若无法识别具体类型,默认为功能需求 """ - title_lower = title.lower() - content_lower = (content or "").lower()[:500] # 只检查前500字符 - combined_text = title_lower + " " + content_lower - - # 优先识别接口需求,根据具体文件情况修改关键词 - interface_keywords = ['接口', 'interface', 'api', '串口', '通信协议', '数据交换'] - for keyword in interface_keywords: - if keyword in combined_text: - return 'interface' - - # 默认为功能需求(不能仅靠标题判断,无法识别时默认为功能需求) - return 'functional' + return self.settings.detect_requirement_type(title, content) - def _generate_requirement_id(self, req_type: str, section_number: str, index: int, - doc_req_id: str = "", parent_req_id: str = "") -> str: + def _generate_requirement_id(self, req_type: str, section_number: str, index: int, + doc_req_id: str = "", parent_req_id: str = "", + split_index: int = 1, split_total: int = 1) -> str: """ 生成需求ID(三级优先级) @@ -473,29 +790,19 @@ JSON输出:""" doc_req_id: 文档中提取的编号/代号 parent_req_id: 父需求编号(用于子需求) """ - # 优先级1:合法的完整编号(以2-10个字母或数字开头,后跟分隔符) - if doc_req_id: - # 检查是否为合法的完整编号格式:2-10个字母或数字开头 + 分隔符 + 其他字符 - # 例如: NY01-01、FR-3.1.2-1、AIRSAT07-GD03-04 - complete_id_pattern = r'^[A-Za-z0-9]{2,10}[-_].+$' - if re.match(complete_id_pattern, doc_req_id): - return doc_req_id.replace('_', '-') - - # 优先级2:代号/序号 + 父需求编号 - if doc_req_id and parent_req_id: - return f"{parent_req_id}-{doc_req_id}" - - # 优先级3:自动生成(保留章节号中的点号) - prefix = self.TYPE_PREFIX.get(req_type, 'FR') # 默认FR(功能需求) - section_part = section_number if section_number else "NA" - return f"{prefix}-{section_part}-{index}" + return self.id_generator.generate( + req_type=req_type, + section_number=section_number, + index=index, + doc_req_id=doc_req_id, + parent_req_id=parent_req_id, + split_index=split_index, + split_total=split_total, + ) def _normalize_req_id(self, req_id: str) -> str: """规范化需求编号""" - if not req_id: - return "" - req_id = str(req_id).strip() - return req_id + return self.id_generator.normalize(req_id) def _clean_description(self, text: str) -> str: """清理需求描述""" @@ -533,29 +840,28 @@ JSON输出:""" 1. 完整编号:NY01-01、FR-3.1.2-1 2. 代号/序号:K101、D61、a)、1) """ + return self.id_generator.extract_from_text(text) + + def _split_requirement_description(self, text: str) -> List[str]: if not text: - return None, text - - # 模式1:完整需求编号(如 NY01-01、FR-3.1.2-1) - pattern1 = r'^\s*([A-Za-z]{2,6}[-_]\d+(?:[-.\d]+)*)\s*[::\)\]】]?\s*(.+)$' - match = re.match(pattern1, text) - if match: - return match.group(1).strip(), match.group(2).strip() - - # 模式2:代号(如 K101、D61) - pattern2 = r'^\s*([A-Za-z]\d+)\s*[::\)\]】]?\s*(.+)$' - match = re.match(pattern2, text) - if match: - return match.group(1).strip(), match.group(2).strip() - - # 模式3:序号(如 a)、1)) - pattern3 = r'^\s*([a-z0-9]{1,2}[\))])\s*(.+)$' - match = re.match(pattern3, text) - if match: - code = match.group(1).strip().rstrip('))') - return code, match.group(2).strip() - - return None, text + return [] + if "时间序列" in text and "执行指令" in text: + return [text] + if not self.splitter: + return [text] + return self.splitter.split(text) + + def _deduplicate_requirements(self, requirements: List[Requirement]) -> List[Requirement]: + seen = set() + deduped: List[Requirement] = [] + for req in requirements: + normalized_desc = re.sub(r'\s+', ' ', req.description).strip().lower() + key = (req.type, normalized_desc) + if key in seen: + continue + seen.add(key) + deduped.append(req) + return deduped def _extract_requirements_from_tables_rule(self, tables: List[List[List[str]]]) -> List[Tuple[Optional[str], str]]: """从表格中提取需求(规则方式)""" @@ -569,6 +875,13 @@ JSON输出:""" for table in tables: if not table: continue + + if self._is_time_series_table(table) and self.settings.sequence_table_merge == "single_requirement": + merged_desc = self._build_sequence_table_requirement(table) + if merged_desc: + results.append((None, merged_desc)) + continue + header = table[0] if table else [] header_lower = [h.lower() for h in header] id_idx = None @@ -605,6 +918,58 @@ JSON输出:""" results.append((req_id, desc)) return results + + def _is_time_series_table(self, table: List[List[str]]) -> bool: + if not table: + return False + + header = " ".join(cell for cell in table[0] if cell) + header_has_time = any(k in header for k in ["时间", "时刻", "time", "TIME", "T0"]) + header_has_action = any(k in header for k in ["指令", "动作", "行为", "操作", "名称"]) + + time_pattern = re.compile(r"^T\s*0(?:\s*[++-]\s*\d+\s*[sS秒]?)?$") + data_rows = table[1:] if len(table) > 1 else [] + time_like_rows = 0 + for row in data_rows: + if not row: + continue + first_cell = (row[0] or "").strip() if row else "" + if time_pattern.match(first_cell): + time_like_rows += 1 + + return (header_has_time and header_has_action) or (time_like_rows >= self.settings.merge_time_series_rows_min) + + def _build_sequence_table_requirement(self, table: List[List[str]]) -> str: + if not table or len(table) < 2: + return "" + + header = table[0] + time_idx = 0 + action_idx = 1 if len(header) > 1 else 0 + for i, col in enumerate(header): + col_text = (col or "") + if any(k in col_text for k in ["时间", "时刻", "time", "TIME"]): + time_idx = i + if any(k in col_text for k in ["指令", "动作", "行为", "操作", "名称"]): + action_idx = i + + sequence_parts = [] + for row in table[1:]: + if not row: + continue + row = [self._clean_description(c) for c in row] + if not any(row): + continue + t = row[time_idx] if time_idx < len(row) else "" + a = row[action_idx] if action_idx < len(row) else "" + if t and a: + sequence_parts.append(f"{t}执行{a}") + elif a: + sequence_parts.append(a) + + if not sequence_parts: + return "" + return "系统应按以下时间序列依次执行指令:" + ";".join(sequence_parts) def _parse_llm_json_response(self, response: str) -> Optional[Dict]: """解析LLM的JSON响应""" diff --git a/src/requirement_id_generator.py b/src/requirement_id_generator.py new file mode 100644 index 0000000..564caf1 --- /dev/null +++ b/src/requirement_id_generator.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +""" +需求编号生成与提取工具。 +""" + +import re +from typing import Optional, Tuple, Dict + + +class RequirementIDGenerator: + def __init__(self, type_prefix: Dict[str, str]): + self.type_prefix = type_prefix + + def normalize(self, req_id: str) -> str: + if not req_id: + return "" + return str(req_id).strip() + + def extract_from_text(self, text: str) -> Tuple[Optional[str], str]: + if not text: + return None, text + + pattern1 = r"^\s*([A-Za-z]{2,10}[-_]\d+(?:[-.\d]+)*)\s*[::\)\]】]?\s*(.+)$" + match = re.match(pattern1, text) + if match: + return match.group(1).strip(), match.group(2).strip() + + pattern2 = r"^\s*([A-Za-z]\d+)\s*[::\)\]】]?\s*(.+)$" + match = re.match(pattern2, text) + if match: + return match.group(1).strip(), match.group(2).strip() + + pattern3 = r"^\s*([a-z0-9]{1,2}[\))])\s*(.+)$" + match = re.match(pattern3, text) + if match: + code = match.group(1).strip().rstrip("))") + return code, match.group(2).strip() + + return None, text + + def generate( + self, + req_type: str, + section_number: str, + index: int, + doc_req_id: str = "", + parent_req_id: str = "", + split_index: int = 1, + split_total: int = 1, + ) -> str: + base_id = self._generate_base(req_type, section_number, index, doc_req_id, parent_req_id) + if split_total > 1: + return f"{base_id}-S{split_index}" + return base_id + + def _generate_base( + self, + req_type: str, + section_number: str, + index: int, + doc_req_id: str, + parent_req_id: str, + ) -> str: + if doc_req_id: + complete_id_pattern = r"^[A-Za-z0-9]{2,10}[-_].+$" + if re.match(complete_id_pattern, doc_req_id): + return doc_req_id.replace("_", "-") + + if doc_req_id and parent_req_id: + return f"{parent_req_id}-{doc_req_id}" + + prefix = self.type_prefix.get(req_type, "FR") + section_part = section_number if section_number else "NA" + return f"{prefix}-{section_part}-{index}" diff --git a/src/requirement_splitter.py b/src/requirement_splitter.py new file mode 100644 index 0000000..b062082 --- /dev/null +++ b/src/requirement_splitter.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +""" +需求长句拆分器。 +将复合长句拆分为可验证的原子需求片段。 +""" + +import re +from typing import List + + +class RequirementSplitter: + ACTION_HINTS = [ + "产生", + "发送", + "设置", + "进入", + "退出", + "关闭", + "开启", + "监测", + "判断", + "记录", + "上传", + "重启", + "恢复", + "关断", + "断电", + "加电", + "执行", + "进行", + ] + + CONNECTOR_HINTS = ["并", "并且", "同时", "然后", "且", "以及", "及"] + CONDITIONAL_HINTS = ["如果", "当", "若", "在", "其中", "此时", "满足"] + CONTEXT_PRONOUN_HINTS = ["该", "其", "上述", "此", "这些", "那些"] + + def __init__(self, max_sentence_len: int = 120, min_clause_len: int = 12): + self.max_sentence_len = max_sentence_len + self.min_clause_len = min_clause_len + + def split(self, text: str) -> List[str]: + cleaned = self._clean(text) + if not cleaned: + return [] + + if self._contains_strong_semantic_chain(cleaned): + return [cleaned] + + # 先按强分隔符切分为主片段。 + base_parts = self._split_by_strong_punctuation(cleaned) + + result: List[str] = [] + for part in base_parts: + if len(part) <= self.max_sentence_len: + result.append(part) + continue + + # 对超长片段进一步基于逗号和连接词拆分。 + refined = self._split_long_clause(part) + result.extend(refined) + + result = self._merge_semantic_chain(result) + result = self._merge_too_short(result) + return self._deduplicate(result) + + def _contains_strong_semantic_chain(self, text: str) -> bool: + # 条件-动作链完整时,避免强拆。 + has_conditional = any(h in text for h in ["如果", "若", "当"]) + has_result = "则" in text or "时" in text + action_count = sum(1 for h in self.ACTION_HINTS if h in text) + if has_conditional and has_result and action_count >= 2: + return True + return False + + def _clean(self, text: str) -> str: + text = re.sub(r"\s+", " ", text or "") + return text.strip(" ;;。") + + def _split_by_strong_punctuation(self, text: str) -> List[str]: + chunks = re.split(r"[;;。]", text) + return [c.strip(" ,,") for c in chunks if c and c.strip(" ,,")] + + def _split_long_clause(self, clause: str) -> List[str]: + if self._contains_strong_semantic_chain(clause): + return [clause] + + raw_parts = [x.strip() for x in re.split(r"[,,]", clause) if x.strip()] + if len(raw_parts) <= 1: + return [clause] + + assembled: List[str] = [] + current = raw_parts[0] + + for fragment in raw_parts[1:]: + if self._should_split(current, fragment): + assembled.append(current.strip()) + current = fragment + else: + current = f"{current},{fragment}" + + if current.strip(): + assembled.append(current.strip()) + + return assembled + + def _should_split(self, current: str, fragment: str) -> bool: + if len(current) < self.min_clause_len: + return False + + # 指代承接片段通常是语义延续,不应切断。 + if any(fragment.startswith(h) for h in self.CONTEXT_PRONOUN_HINTS): + return False + + # 条件链中带“则/并/同时”的后继片段,优先保持在同一需求中。 + if self._contains_strong_semantic_chain(current + "," + fragment): + return False + + frag_starts_with_condition = any(fragment.startswith(h) for h in self.CONDITIONAL_HINTS) + if frag_starts_with_condition: + return False + + has_connector = any(fragment.startswith(h) for h in self.CONNECTOR_HINTS) + has_action = any(h in fragment for h in self.ACTION_HINTS) + current_has_action = any(h in current for h in self.ACTION_HINTS) + + # 连接词 + 动作词,且当前片段已经包含动作,优先拆分。 + if has_connector and has_action and current_has_action: + return True + + # 无连接词但出现新的动作片段且整体过长,也拆分。 + if has_action and current_has_action and len(current) >= self.max_sentence_len // 2: + return True + + return False + + def _merge_semantic_chain(self, parts: List[str]) -> List[str]: + if not parts: + return [] + + merged: List[str] = [parts[0]] + for part in parts[1:]: + prev = merged[-1] + if self._should_merge(prev, part): + merged[-1] = f"{prev};{part}" + else: + merged.append(part) + return merged + + def _should_merge(self, prev: str, current: str) -> bool: + # 指代开头:如“该报警信号...”。 + if any(current.startswith(h) for h in self.CONTEXT_PRONOUN_HINTS): + return True + + # 报警触发后的持续条件与动作属于同一链。 + if ("报警" in prev and "持续" in current) or ("产生" in prev and "报警" in prev and "持续" in current): + return True + + # 状态迁移 + 后续控制动作保持合并。 + if ("进入" in prev or "设置" in prev or "发送" in prev) and ("则" in current or "连续" in current): + return True + + # 条件链分裂片段重新合并。 + if self._contains_strong_semantic_chain(prev + "," + current): + return True + + return False + + def _merge_too_short(self, parts: List[str]) -> List[str]: + if not parts: + return [] + + merged: List[str] = [] + for part in parts: + if merged and len(part) < self.min_clause_len: + merged[-1] = f"{merged[-1]},{part}" + else: + merged.append(part) + return merged + + def _deduplicate(self, parts: List[str]) -> List[str]: + seen = set() + result = [] + for part in parts: + key = re.sub(r"\s+", "", part) + if key and key not in seen: + seen.add(key) + result.append(part) + return result diff --git a/src/settings.py b/src/settings.py new file mode 100644 index 0000000..55e7fc0 --- /dev/null +++ b/src/settings.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +""" +统一配置与映射模块。 +将需求类型、章节过滤、输出映射和拆分参数收敛到单一入口。 +""" + +from dataclasses import dataclass +from typing import Dict, List, Any + + +@dataclass +class RequirementTypeRule: + key: str + chinese_name: str + prefix: str + keywords: List[str] + priority: int + + +class AppSettings: + """从 config 读取并提供统一访问接口。""" + + TYPE_NAME_MAP = { + "功能需求": "functional", + "接口需求": "interface", + "性能需求": "performance", + "安全需求": "security", + "可靠性需求": "reliability", + "其他需求": "other", + } + + DEFAULT_NON_REQUIREMENT_SECTIONS = [ + "标识", + "系统概述", + "文档概述", + "引用文档", + "合格性规定", + "需求可追踪性", + "注释", + "附录", + "范围", + "概述", + ] + + DEFAULT_TYPE_CHINESE = { + "functional": "功能需求", + "interface": "接口需求", + "performance": "其他需求", + "security": "其他需求", + "reliability": "其他需求", + "other": "其他需求", + } + + DEFAULT_PREFIX = { + "functional": "FR", + "interface": "IR", + "performance": "PR", + "security": "SR", + "reliability": "RR", + "other": "OR", + } + + def __init__(self, config: Dict[str, Any] = None): + self.config = config or {} + + document_cfg = self.config.get("document", {}) + self.non_requirement_sections = document_cfg.get( + "non_requirement_sections", self.DEFAULT_NON_REQUIREMENT_SECTIONS + ) + + extraction_cfg = self.config.get("extraction", {}) + req_types_cfg = extraction_cfg.get("requirement_types", {}) + + self.requirement_rules = self._build_rules(req_types_cfg) + self.type_prefix = self._build_type_prefix(req_types_cfg) + self.type_chinese = self._build_type_chinese(req_types_cfg) + + splitter_cfg = extraction_cfg.get("splitter", {}) + self.splitter_max_sentence_len = int(splitter_cfg.get("max_sentence_len", 120)) + self.splitter_min_clause_len = int(splitter_cfg.get("min_clause_len", 12)) + self.splitter_enabled = bool(splitter_cfg.get("enabled", True)) + + semantic_cfg = extraction_cfg.get("semantic_guard", {}) + self.semantic_guard_enabled = bool(semantic_cfg.get("enabled", True)) + self.preserve_condition_action_chain = bool( + semantic_cfg.get("preserve_condition_action_chain", True) + ) + self.preserve_alarm_chain = bool(semantic_cfg.get("preserve_alarm_chain", True)) + + table_cfg = extraction_cfg.get("table_strategy", {}) + self.table_llm_semantic_enabled = bool(table_cfg.get("llm_semantic_enabled", True)) + self.sequence_table_merge = table_cfg.get("sequence_table_merge", "single_requirement") + self.merge_time_series_rows_min = int(table_cfg.get("merge_time_series_rows_min", 3)) + + rewrite_cfg = extraction_cfg.get("rewrite_policy", {}) + self.llm_light_rewrite_enabled = bool(rewrite_cfg.get("llm_light_rewrite_enabled", True)) + self.preserve_ratio_min = float(rewrite_cfg.get("preserve_ratio_min", 0.65)) + self.max_length_growth_ratio = float(rewrite_cfg.get("max_length_growth_ratio", 1.25)) + + renumber_cfg = extraction_cfg.get("renumber_policy", {}) + self.renumber_enabled = bool(renumber_cfg.get("enabled", True)) + self.renumber_mode = renumber_cfg.get("mode", "section_continuous") + + def _build_rules(self, req_types_cfg: Dict[str, Dict[str, Any]]) -> List[RequirementTypeRule]: + rules: List[RequirementTypeRule] = [] + if not req_types_cfg: + # 用默认两类保证兼容旧行为 + return [ + RequirementTypeRule( + key="interface", + chinese_name="接口需求", + prefix="IR", + keywords=["接口", "interface", "api", "串口", "通信", "CAN", "以太网"], + priority=1, + ), + RequirementTypeRule( + key="functional", + chinese_name="功能需求", + prefix="FR", + keywords=["功能", "控制", "处理", "监测", "显示"], + priority=2, + ), + ] + + for zh_name, item in req_types_cfg.items(): + key = self.TYPE_NAME_MAP.get(zh_name, "other") + rules.append( + RequirementTypeRule( + key=key, + chinese_name=zh_name, + prefix=item.get("prefix", self.DEFAULT_PREFIX.get(key, "FR")), + keywords=item.get("keywords", []), + priority=int(item.get("priority", 99)), + ) + ) + + return sorted(rules, key=lambda x: x.priority) + + def _build_type_prefix(self, req_types_cfg: Dict[str, Dict[str, Any]]) -> Dict[str, str]: + mapping = dict(self.DEFAULT_PREFIX) + for zh_name, key in self.TYPE_NAME_MAP.items(): + if zh_name in req_types_cfg: + mapping[key] = req_types_cfg[zh_name].get("prefix", mapping[key]) + return mapping + + def _build_type_chinese(self, req_types_cfg: Dict[str, Dict[str, Any]]) -> Dict[str, str]: + mapping = dict(self.DEFAULT_TYPE_CHINESE) + for zh_name, key in self.TYPE_NAME_MAP.items(): + if zh_name in req_types_cfg: + mapping[key] = zh_name + return mapping + + def is_non_requirement_section(self, title: str) -> bool: + return any(keyword in title for keyword in self.non_requirement_sections) + + def detect_requirement_type(self, title: str, content: str) -> str: + combined_text = f"{title} {(content or '')[:500]}".lower() + for rule in self.requirement_rules: + for keyword in rule.keywords: + if keyword.lower() in combined_text: + return rule.key + return "functional"