commit bf730377ebe6bb82b477335dcdc0ab827c9079d6 Author: junlan <15167915727@163.com> Date: Tue Feb 3 22:48:22 2026 +0800 project init. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ff5aae0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +.vscode/ +__pycache__/ +*.pyc +*.pyo +*.pyd +.env +.venv +venv/ +*.log +.DS_Store \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..10b5e0b --- /dev/null +++ b/README.md @@ -0,0 +1,102 @@ +# SRS需求文档解析工具 + +一个智能的SRS(软件需求规格说明书)文档解析工具,支持PDF和Docx格式,能够自动提取需求并生成结构化JSON输出。 + +## 特性 + +- **LLM增强**:集成阿里云千问大模型,智能识别和提取需求 +- **多格式支持**:支持PDF和Docx格式的SRS文档 +- **非严格GJB结构**:支持不完全遵循GJB 438B标准的文档结构 +- **智能过滤**:自动过滤系统描述、重复需求等非需求内容 +- **结构化输出**:按章节层次组织的JSON格式输出 +- **灵活模式**:支持纯规则提取和LLM增强两种模式 +- **表格需求识别**:支持从表格中提取功能/接口/其他需求 + +## 快速开始 + +### 安装依赖 + +```bash +pip install -r requirements.txt + +# 如果使用LLM功能,还需安装: +pip install dashscope +``` + +### 配置API密钥(LLM模式) + +```bash +# 方式1:环境变量(推荐) +# Linux/Mac +export DASHSCOPE_API_KEY="your-api-key" + +# Windows PowerShell +$env:DASHSCOPE_API_KEY="your-api-key" + +# 方式2:在config.yaml中配置 +# llm: +# api_key: "your-api-key" +``` + +### 运行 + +```bash +# LLM增强模式 +python main.py -i DC-SRS.pdf -o output.json + +# 纯规则模式(不使用LLM) +python main.py -i DC-SRS.pdf -o output.json --no-llm +``` + + + +## 需求类型说明 + +工具统计**三类**需求类型: + +| 类型 | 描述 | +|------|------| +| **功能需求** | 系统应该提供的功能和行为 | +| **接口需求** | 系统的输入/输出接口规范、通信协议等 | +| **其他需求** | 性能、安全、可靠性等非功能性需求 | + +### 接口需求扩展字段 + +接口需求除了基本的"需求编号"和"需求描述"外,还包含以下字段: + +| 字段 | 说明 | +|------|------| +| **接口名称** | 接口的名称 +| **接口类型** | 接口的类型(如:CAN总线接口、以太网接口、串口等) | +| **来源** | 数据或信号的来源/发送方 | +| **目的地** | 数据或信号的目的地/接收方 | + +### 需求描述规则 + +- **功能需求**:保持原文描述,不改写润色 +- **接口需求**:允许改写润色,确保描述清晰完整 +- **其他需求**:保持原文描述,不改写润色 + +## 目录结构 + +``` +SRS_reqs_qwen/ +├── main.py # 主程序入口 +├── config.yaml # 配置文件 +├── requirements.txt # 依赖 +├── src/ +│ ├── document_parser.py # 文档解析器 +│ ├── requirement_extractor.py # 需求提取器 +│ ├── json_generator.py # JSON生成器 +│ ├── llm_interface.py # LLM接口 +│ └── utils.py # 工具函数 +├── docs/ +│ ├── README.md # 项目说明 +│ ├── ARCHITECTURE.md # 架构文档 +│ └── USAGE.md # 使用指南 +└── tests/ # 测试文件 +``` + diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..7644bab --- /dev/null +++ b/config.yaml @@ -0,0 +1,83 @@ +# 配置文件 - SRS 需求文档解析工具 (LLM增强版) +# Configuration file for SRS Requirement Document Parser (LLM Enhanced Version) + +# LLM配置 - 阿里云千问 +llm: + # 是否启用LLM(设为false则使用纯规则提取) + enabled: true + # LLM提供商:qwen(阿里云千问) + provider: "qwen" + # 模型名称 + model: "qwen3-max" + # API密钥(建议使用环境变量 DASHSCOPE_API_KEY) + api_key: "sk-7097f7842f724f0c9e70c4bf3b16dacb" + # 可选参数 + temperature: 0.3 + max_tokens: 1024 + +# 文档解析配置 +document: + supported_formats: + - ".pdf" + - ".docx" + # 标题识别的样式列表 + heading_styles: + - "Heading 1" + - "Heading 2" + - "Heading 3" + - "Heading 4" + - "Heading 5" + # 需要过滤的非需求章节(GJB438B标准) + non_requirement_sections: + - "标识" + - "系统概述" + - "文档概述" + - "引用文档" + - "合格性规定" + - "需求可追踪性" + - "注释" + - "附录" + +# 需求提取配置 +extraction: + # 需求类型关键字(用于自动判断需求类型) + requirement_types: + 功能需求: + prefix: "FR" + keywords: ["功能", "feature", "requirement", "CSCI组成", "控制", "处理", "监测", "显示"] + priority: 1 + 接口需求: + prefix: "IR" + keywords: ["接口", "interface", "api", "外部接口", "内部接口", "CAN", "以太网", "通信"] + priority: 2 + 性能需求: + prefix: "PR" + keywords: ["性能", "performance", "速度", "响应时间", "吞吐量"] + priority: 3 + 安全需求: + prefix: "SR" + keywords: ["安全", "security", "安全性", "报警"] + priority: 4 + 可靠性需求: + prefix: "RR" + keywords: ["可靠", "reliability", "容错", "恢复", "冗余"] + priority: 5 + 其他需求: + prefix: "OR" + keywords: ["约束", "资源", "适应性", "保密", "环境", "计算机", "质量", "设计", "人员", "培训", "保障", "验收", "交付"] + priority: 6 + +# 输出配置 +output: + format: "json" + indent: 2 + # 是否美化输出(格式化) + pretty_print: true + # 是否包含元数据 + include_metadata: true + +# 日志配置 +logging: + level: "INFO" # DEBUG, INFO, WARNING, ERROR + format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + file: "srs_parser.log" diff --git a/input/test.docx b/input/test.docx new file mode 100644 index 0000000..0062709 Binary files /dev/null and b/input/test.docx differ diff --git a/json_to_excel.py b/json_to_excel.py new file mode 100644 index 0000000..d662b1f --- /dev/null +++ b/json_to_excel.py @@ -0,0 +1,202 @@ +# -*- coding: utf-8 -*- +""" +将需求JSON文件转换为Excel格式 +""" + +import json +import argparse +from openpyxl import Workbook +from openpyxl.styles import Font, PatternFill, Alignment, Border, Side +from openpyxl.utils import get_column_letter + + +def parse_requirements_from_json(json_data, parent_section=""): + """ + 递归解析JSON,提取所有需求 + + Args: + json_data: JSON数据(章节或子章节) + parent_section: 父章节路径 + + Returns: + 需求列表 + """ + requirements = [] + + if isinstance(json_data, dict): + # 提取章节信息 + section_info = json_data.get("章节信息", {}) + section_number = section_info.get("章节编号", "") + section_title = section_info.get("章节标题", "") + section_level = section_info.get("章节级别", "") + + current_section = f"{section_number} {section_title}".strip() if section_number else section_title + + # 提取需求列表 + req_list = json_data.get("需求列表", []) + for req in req_list: + req_data = { + "章节编号": section_number, + "章节标题": section_title, + "章节级别": section_level, + "章节完整路径": current_section, + "需求类型": req.get("需求类型", ""), + "需求编号": req.get("需求编号", ""), + "需求描述": req.get("需求描述", ""), + "接口名称": req.get("接口名称", ""), + "接口类型": req.get("接口类型", ""), + "来源": req.get("来源", ""), + "目的地": req.get("目的地", "") + } + requirements.append(req_data) + + # 递归处理子章节 + subsections = json_data.get("子章节", {}) + for subsection_name, subsection_data in subsections.items(): + sub_reqs = parse_requirements_from_json(subsection_data, current_section) + requirements.extend(sub_reqs) + + return requirements + + +def create_excel(json_file, output_file): + """ + 将JSON文件转换为Excel + + Args: + json_file: 输入的JSON文件路径 + output_file: 输出的Excel文件路径 + """ + # 读取JSON文件 + with open(json_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + # 创建工作簿 + wb = Workbook() + + # 创建元数据工作表 + ws_meta = wb.active + ws_meta.title = "文档元数据" + + metadata = data.get("文档元数据", {}) + ws_meta['A1'] = "文档标题" + ws_meta['B1'] = metadata.get("标题", "") + ws_meta['A2'] = "生成时间" + ws_meta['B2'] = metadata.get("生成时间", "") + ws_meta['A3'] = "总需求数" + ws_meta['B3'] = metadata.get("总需求数", 0) + + # 需求类型统计 + stats = metadata.get("需求类型统计", {}) + row = 5 + ws_meta['A5'] = "需求类型统计" + ws_meta['A5'].font = Font(bold=True) + for req_type, count in stats.items(): + row += 1 + ws_meta[f'A{row}'] = req_type + ws_meta[f'B{row}'] = count + + # 设置元数据工作表样式 + for row in ws_meta.iter_rows(min_row=1, max_row=row, min_col=1, max_col=1): + for cell in row: + cell.font = Font(bold=True) + cell.fill = PatternFill(start_color="CCE5FF", end_color="CCE5FF", fill_type="solid") + + # 创建需求列表工作表 + ws_reqs = wb.create_sheet(title="需求列表") + + # 定义表头(按用户要求的顺序) + headers = [ + "章节编号", "章节标题", "需求类型", "需求编号", "需求描述", + "接口名称", "接口类型", "来源", "目的地" + ] + + # 写入表头 + for col, header in enumerate(headers, 1): + cell = ws_reqs.cell(row=1, column=col, value=header) + cell.font = Font(bold=True, size=11, color="FFFFFF") + cell.fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") + cell.alignment = Alignment(horizontal="center", vertical="center") + + # 解析所有需求 + all_requirements = [] + need_content = data.get("需求内容", {}) + for section_name, section_data in need_content.items(): + reqs = parse_requirements_from_json(section_data) + all_requirements.extend(reqs) + + # 写入需求数据 + for row_idx, req in enumerate(all_requirements, 2): + for col_idx, header in enumerate(headers, 1): + # 获取字段值,如果字段不存在或为空字符串,则为空 + value = req.get(header, "") + if value is None: + value = "" + + cell = ws_reqs.cell(row=row_idx, column=col_idx, value=value) + cell.alignment = Alignment(horizontal="left", vertical="top", wrap_text=True) + + # 根据需求类型设置颜色 + if header == "需求类型" and value: + if value == "接口需求": + cell.fill = PatternFill(start_color="FFF2CC", end_color="FFF2CC", fill_type="solid") + elif value == "功能需求": + cell.fill = PatternFill(start_color="E2EFDA", end_color="E2EFDA", fill_type="solid") + else: + cell.fill = PatternFill(start_color="FCE4D6", end_color="FCE4D6", fill_type="solid") + + # 设置列宽 + column_widths = { + 'A': 12, # 章节编号 + 'B': 25, # 章节标题 + 'C': 12, # 需求类型 + 'D': 18, # 需求编号 + 'E': 80, # 需求描述 + 'F': 25, # 接口名称 + 'G': 25, # 接口类型 + 'H': 25, # 来源 + 'I': 25 # 目的地 + } + + for col, width in column_widths.items(): + ws_reqs.column_dimensions[col].width = width + + # 设置所有单元格边框 + thin_border = Border( + left=Side(style='thin'), + right=Side(style='thin'), + top=Side(style='thin'), + bottom=Side(style='thin') + ) + + for row in ws_reqs.iter_rows(min_row=1, max_row=len(all_requirements)+1, min_col=1, max_col=len(headers)): + for cell in row: + cell.border = thin_border + + # 冻结首行 + ws_reqs.freeze_panes = "A2" + + # 保存Excel文件 + wb.save(output_file) + print(f"成功将 {len(all_requirements)} 条需求导出到 Excel 文件: {output_file}") + print(f"工作表: '文档元数据' - 包含文档基本信息和统计") + print(f"工作表: '需求列表' - 包含所有需求的详细信息") + + +def main(): + parser = argparse.ArgumentParser(description='将需求JSON文件转换为Excel格式') + parser.add_argument('-i', '--input', required=True, help='输入的JSON文件路径') + parser.add_argument('-o', '--output', required=True, help='输出的Excel文件路径') + + args = parser.parse_args() + + try: + create_excel(args.input, args.output) + except Exception as e: + print(f"转换失败: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + main() diff --git a/main.py b/main.py new file mode 100644 index 0000000..7b496c7 --- /dev/null +++ b/main.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +SRS 解析工具 - 主程序入口 +LLM 增强版 - 默认阿里云千问大模型 +""" + +import argparse +import os +import sys +import logging +from pathlib import Path + +# 添加当前目录到Python路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from src.utils import load_config, setup_logging, validate_file_path, ensure_directory_exists, get_env_or_config +from src.document_parser import create_parser +from src.requirement_extractor import RequirementExtractor +from src.json_generator import JSONGenerator + +logger = logging.getLogger(__name__) + + +def create_llm(config: dict): + """ + 创建LLM实例 + + Args: + config: 配置字典 + + Returns: + LLM实例或None + """ + llm_config = config.get('llm', {}) + + # 检查是否启用LLM + if not llm_config.get('enabled', True): + logger.info("LLM已禁用,使用纯规则提取模式") + return None + + provider = llm_config.get('provider', 'qwen') + + # 获取API密钥(优先使用环境变量) + api_key = get_env_or_config('DASHSCOPE_API_KEY', llm_config.get('api_key')) + + if not api_key: + logger.warning("未配置API密钥,请使用纯规则提取模式") + logger.warning("请设置环境变量 DASHSCOPE_API_KEY 或在 config.yaml 中配置 llm.api_key") + return None + + try: + from src.llm_interface import QwenLLM + + model = llm_config.get('model', 'qwen-plus') + temperature = llm_config.get('temperature', 0.3) + max_tokens = llm_config.get('max_tokens', 1024) + + llm = QwenLLM( + api_key=api_key, + model=model, + temperature=temperature, + max_tokens=max_tokens + ) + + logger.info(f"成功创建LLM实例: {provider} ({model})") + return llm + + except ImportError as e: + logger.warning(f"无法导入LLM模块: {e}") + logger.warning("请运行: pip install dashscope") + return None + except Exception as e: + logger.warning(f"创建LLM实例失败: {e}") + return None + + +def main(): + """主程序入口""" + + # 解析命令行参数 + parser = argparse.ArgumentParser( + description='SRS需求文档解析工具', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +示例用法: + python main.py --input sample.pdf --output output.json + python main.py -i requirements.docx -o output.json --verbose + python main.py -i DC-SRS.pdf -o output.json --no-llm # 禁用LLM + """ + ) + + parser.add_argument( + '--input', '-i', + type=str, + required=True, + help='输入的SRS文档路径(支持.docx和.pdf)' + ) + + parser.add_argument( + '--output', '-o', + type=str, + default='output.json', + help='输出JSON文件路径(默认:output.json)' + ) + + parser.add_argument( + '--config', '-c', + type=str, + default=None, + help='配置文件路径(默认:./config.yaml)' + ) + + parser.add_argument( + '--verbose', '-v', + action='store_true', + help='输出详细日志' + ) + + parser.add_argument( + '--no-llm', + action='store_true', + help='禁用LLM,使用纯规则提取' + ) + + # 解析命令行参数 + args = parser.parse_args() + + # 加载配置 + config = load_config(args.config) + + # 命令行参数覆盖配置 + if args.no_llm: + config.setdefault('llm', {})['enabled'] = False + + # 设置日志 + if args.verbose: + config.setdefault('logging', {})['level'] = 'DEBUG' + setup_logging(config) + + logger.info("=" * 60) + logger.info("SRS需求文档解析工具启动(LLM增强版)") + logger.info("=" * 60) + + try: + # 验证输入文件 + if not validate_file_path(args.input, ['.pdf', '.docx']): + logger.error(f"输入文件验证失败: {args.input}") + return False + + logger.info(f"输入文件: {args.input}") + + # 创建输出目录 + output_dir = os.path.dirname(args.output) or '.' + if output_dir != '.' and not ensure_directory_exists(output_dir): + logger.error(f"无法创建输出目录: {output_dir}") + return False + + logger.info(f"输出文件: {args.output}") + + # 创建LLM实例 + llm = create_llm(config) + if llm: + logger.info("LLM增强模式已启用") + else: + logger.info("使用纯规则提取模式") + + # 步骤1:解析文档 + logger.info("\n" + "=" * 60) + logger.info("步骤1:解析文档") + logger.info("=" * 60) + + doc_parser = create_parser(args.input) + if llm: + doc_parser.set_llm(llm) + + sections = doc_parser.parse() + document_title = doc_parser.get_document_title() + + logger.info(f"成功解析文档,提取{len(sections)}个顶级章节") + + # 打印章节结构 + def print_sections(sections, indent=0): + for section in sections: + logger.info(" " * indent + f"- {section.number} {section.title}") + if section.children: + print_sections(section.children, indent + 1) + + if args.verbose: + logger.info("章节结构:") + print_sections(sections) + + # 步骤2:提取需求 + logger.info("\n" + "=" * 60) + if llm: + logger.info("步骤2:提取需求(LLM增强模式)") + else: + logger.info("步骤2:提取需求(规则匹配模式)") + logger.info("=" * 60) + + extractor = RequirementExtractor(config, llm=llm) + requirements = extractor.extract_from_sections(sections) + + # 统计需求信息 + stats = extractor.get_statistics() + logger.info(f"\n需求统计:") + for req_type, count in stats['by_type'].items(): + logger.info(f" {req_type}: {count}项") + logger.info(f" 总计: {stats['total']}项") + + # 步骤3:生成JSON + logger.info("\n" + "=" * 60) + logger.info("步骤3:生成JSON") + logger.info("=" * 60) + + generator = JSONGenerator(config) + json_output = generator.generate( + sections, + requirements, + document_title + ) + + logger.info(f"JSON结构生成完成") + + # 步骤4:保存文件 + logger.info("\n" + "=" * 60) + logger.info("步骤4:保存结果") + logger.info("=" * 60) + + generator.save_to_file(json_output, args.output) + logger.info(f"成功保存JSON文件到: {args.output}") + + # 打印输出文件大小 + if os.path.exists(args.output): + file_size = os.path.getsize(args.output) + logger.info(f"文件大小: {file_size} 字节") + + logger.info("\n" + "=" * 60) + logger.info("SRS需求文档解析完成!") + logger.info("=" * 60) + + return True + + except Exception as e: + logger.error(f"处理过程中出现错误: {e}", exc_info=True) + return False + + +if __name__ == '__main__': + success = main() + sys.exit(0 if success else 1) diff --git a/output/output_llm.json b/output/output_llm.json new file mode 100644 index 0000000..d72c229 --- /dev/null +++ b/output/output_llm.json @@ -0,0 +1,682 @@ +{ + "文档元数据": { + "标题": "SRS Document", + "生成时间": "2026-01-12T10:47:07.196787", + "总需求数": 83, + "需求类型统计": { + "功能需求": 18, + "接口需求": 30, + "其他需求": 35 + } + }, + "需求内容": { + "3 需求": { + "章节信息": { + "章节编号": "3", + "章节标题": "需求", + "章节级别": 1 + }, + "子章节": { + "3.1 功能需求": { + "章节信息": { + "章节编号": "3.1", + "章节标题": "功能需求", + "章节级别": 2 + }, + "子章节": { + "3.1.2 电场综合防护系统软件功能要求": { + "章节信息": { + "章节编号": "3.1.2", + "章节标题": "电场综合防护系统软件功能要求", + "章节级别": 3 + }, + "子章节": { + "3.1.2.1 综合监控装置显控软件功能要求": { + "章节信息": { + "章节编号": "3.1.2.1", + "章节标题": "综合监控装置显控软件功能要求", + "章节级别": 4 + }, + "需求列表": [ + { + "需求编号": "FR-3.1.2.1-1", + "需求描述": "根据作战需求,对电场综合防护设备的静电场防护进行启停控制,轴频电场防护出航即开启,同时对外加电流阴极保护设备进行远程控制。" + }, + { + "需求编号": "FR-3.1.2.1-2", + "需求描述": "对电场综合防护设备的工作状态进行实时监测。" + }, + { + "需求编号": "FR-3.1.2.1-3", + "需求描述": "通过CAN总线,实时接收由艏侧推和主推防护装置送来的运行故障信息,在人机界面上显示。" + }, + { + "需求编号": "FR-3.1.2.1-4", + "需求描述": "通过CAN总线,实时接收由艏侧推和主推防护装置送来的运行状态参数,如轴地电压、轴电流和补偿电流等信息,并在人机界面上显示。" + }, + { + "需求编号": "FR-3.1.2.1-5", + "需求描述": "当故障来临时,能对电场综合防护设备的故障来临时间和故障类别进行记录,形成故障日志,故障记录不少于500条。" + }, + { + "需求编号": "FR-3.1.2.1-6", + "需求描述": "通过以太网,将电场综合防护设备的运行参数实时上传至上层监控平台网。" + } + ] + }, + "3.1.2.2 艏侧推电场防护装置控制软件功能要求": { + "章节信息": { + "章节编号": "3.1.2.2", + "章节标题": "艏侧推电场防护装置控制软件功能要求", + "章节级别": 4 + }, + "需求列表": [ + { + "需求编号": "FR-3.1.2.2-1", + "需求描述": "通过485总线,本地人机界面实时接收主控板传送的设备运行状态参数和防护参数。" + }, + { + "需求编号": "FR-3.1.2.2-2", + "需求描述": "通过实时获取艏侧推附近的参比电位信号,控制艏侧推附近的补偿阳极输出静电场补偿电流,降低由艏侧推产生的静电场信号。" + }, + { + "需求编号": "FR-3.1.2.2-3", + "需求描述": "通过CAN总线,远程接收综合监控装置的电场防护启停指令。" + }, + { + "需求编号": "FR-3.1.2.2-4", + "需求描述": "通过485总线,本地接收人机界面的电场防护启停指令,进行静电场防护功能的启停控制。" + }, + { + "需求编号": "FR-3.1.2.2-5", + "需求描述": "设备发生过温、过流故障时发出声光报警。" + }, + { + "需求编号": "FR-3.1.2.2-6", + "需求描述": "通过CAN总线,实时将工作状态参数及电场防护参数上传至综合监控装置。" + } + ] + }, + "3.1.2.3 主推电场防护装置控制软件功能要求": { + "章节信息": { + "章节编号": "3.1.2.3", + "章节标题": "主推电场防护装置控制软件功能要求", + "章节级别": 4 + }, + "需求列表": [ + { + "需求编号": "FR-3.1.2.3-1", + "需求描述": "通过485总线,本地人机界面实时接收主控板传送的设备运行状态参数和防护参数;" + }, + { + "需求编号": "FR-3.1.2.3-2", + "需求描述": "通过实时获取的轴电流信号,向辅助阳极输出静电场补偿电流,降低静电场信号;" + }, + { + "需求编号": "FR-3.1.2.3-3", + "需求描述": "通过CAN总线,远程接收综合监控装置的电场防护启停指令;" + }, + { + "需求编号": "FR-3.1.2.3-4", + "需求描述": "通过485总线,本地接收人机界面的电场防护启停指令,进行静电场和轴频防护功能的启停控制;" + }, + { + "需求编号": "FR-3.1.2.3-5", + "需求描述": "设备发生过温、过流故障时发出声光报警;" + }, + { + "需求编号": "FR-3.1.2.3-6", + "需求描述": "通过CAN总线,实时将工作状态参数及电场防护参数上传至综合监控装置。" + } + ] + } + } + } + } + }, + "3.3 CSCI外部接口需求": { + "章节信息": { + "章节编号": "3.3", + "章节标题": "CSCI外部接口需求", + "章节级别": 2 + }, + "子章节": { + "3.3.2 外部接口描述": { + "章节信息": { + "章节编号": "3.3.2", + "章节标题": "外部接口描述", + "章节级别": 3 + }, + "需求列表": [ + { + "需求编号": "IR-3.3.2-1", + "需求描述": "主推电场防护装置控制软件应通过CAN接口向综合监控装置显控软件发送包含轴地电压、轴电流、轴频电场补偿电压、轴频电场补偿电流、静电场补偿电流及报警信号的状态信息。", + "接口名称": "主推电场防护装置状态信息输入", + "接口类型": "CAN", + "来源": "主推电场防护装置控制软件", + "目的地": "综合监控装置显控软件" + }, + { + "需求编号": "IR-3.3.2-2", + "需求描述": "艏侧推电场防护装置控制软件应通过CAN接口向综合监控装置显控软件发送包含艏侧推参比电位、报警信号和艏侧推补偿电流的状态信息。", + "接口名称": "艏侧推电场防护装置状态信息输入", + "接口类型": "CAN", + "来源": "艏侧推电场防护装置控制软件", + "目的地": "综合监控装置显控软件" + }, + { + "需求编号": "IR-3.3.2-3", + "需求描述": "外加电流阴极保护设备应通过模拟量输入(AI)接口向综合监控装置显控软件提供船体参比电位数据,数据格式为2字节无符号整数,单位为mV,取值范围为0至999。", + "接口名称": "船体参比电位模拟量输入", + "接口类型": "AI", + "来源": "外加电流阴极保护设备", + "目的地": "综合监控装置显控软件" + }, + { + "需求编号": "IR-3.3.2-4", + "需求描述": "综合监控装置显控软件应通过CAN接口向主推电场防护装置控制软件发送启停控制指令,包括轴频电场防护启停信号和静电场防护启停信号,每个信号为1字节无符号整数,取值为0或1。", + "接口名称": "启停信号开关量输出", + "接口类型": "CAN", + "来源": "综合监控装置显控软件", + "目的地": "主推电场防护装置控制软件" + }, + { + "需求编号": "IR-3.3.2-5", + "需求描述": "综合监控装置显控软件应通过CAN接口向艏侧推电场防护装置控制软件发送启停控制指令及船体参比电位数据,其中启停信号为1字节无符号整数(0/1),船体参比电位为2字节无符号整数(单位mV,范围0–999)。", + "接口名称": "启停信号、船体参比电位输出", + "接口类型": "CAN", + "来源": "综合监控装置显控软件", + "目的地": "艏侧推电场防护装置控制软件" + }, + { + "需求编号": "IR-3.3.2-6", + "需求描述": "综合监控装置显控软件应通过开关量输出(DO)接口向外加电流阴极保护设备发送启停信号,信号格式为1字节无符号整数,取值为0或1。", + "接口名称": "启停信号开关量输出", + "接口类型": "DO", + "来源": "综合监控装置显控软件", + "目的地": "外加电流阴极保护设备" + }, + { + "需求编号": "IR-3.3.2-7", + "需求描述": "综合监控装置显控软件应通过以太网接口向上层监控平台发送系统运行参数,参数内容封装在标识为JK-SCPT-YXCS的数据元素中。", + "接口名称": "系统运行参数输出", + "接口类型": "以太网", + "来源": "综合监控装置显控软件", + "目的地": "上层监控平台" + } + ] + }, + "3.4.2 内部接口描述": { + "章节信息": { + "章节编号": "3.4.2", + "章节标题": "内部接口描述", + "章节级别": 3 + }, + "子章节": { + "3.4.2.1 显控软件内部接口描述": { + "章节信息": { + "章节编号": "3.4.2.1", + "章节标题": "显控软件内部接口描述", + "章节级别": 4 + }, + "需求列表": [ + { + "需求编号": "IR-3.4.2.1-1", + "需求描述": "显控软件应通过CAN接口接收来自CAN通讯单元的工况转换控制指令,并将其传输至显示屏进行显示。", + "接口名称": "工况转换控制指令", + "接口类型": "CAN接口", + "来源": "CAN通讯单元", + "目的地": "显示屏" + }, + { + "需求编号": "IR-3.4.2.1-2", + "需求描述": "显控软件应通过CAN接口将设备运行状态参数从CAN通讯单元传输至显示屏进行实时显示。", + "接口名称": "设备运行状态参数", + "接口类型": "CAN接口", + "来源": "CAN通讯单元", + "目的地": "显示屏" + }, + { + "需求编号": "IR-3.4.2.1-3", + "需求描述": "显控软件应通过CAN接口将报警信息从CAN通讯单元传输至显示屏,用于及时提示用户。", + "接口名称": "报警信息", + "接口类型": "CAN接口", + "来源": "CAN通讯单元", + "目的地": "显示屏" + }, + { + "需求编号": "IR-3.4.2.1-4", + "需求描述": "显控软件应通过以太网接口将系统运行参数从CAN通讯单元传输至以太网通信单元,用于外部系统访问或远程监控。", + "接口名称": "系统运行参数", + "接口类型": "以太网接口", + "来源": "CAN通讯单元", + "目的地": "以太网通信单元" + } + ] + }, + "3.4.2.2 艏侧推电场防护装置控制软件内部接口描述": { + "章节信息": { + "章节编号": "3.4.2.2", + "章节标题": "艏侧推电场防护装置控制软件内部接口描述", + "章节级别": 4 + }, + "需求列表": [ + { + "需求编号": "IR-3.4.2.2-1", + "需求描述": "控制软件应通过CAN接口接收设备参数和状态信息,并将其上传至CAN通讯单元。", + "接口名称": "参数上传状态信息", + "接口类型": "CAN接口", + "来源": "CSSC-CAN-SCTXX", + "目的地": "CAN通讯单元" + }, + { + "需求编号": "IR-3.4.2.2-2", + "需求描述": "控制软件应通过模拟量输入(AI)接口接收静电场补偿电流信号,用于静电场防护计算模块的运行参数上传。", + "接口名称": "静电场补偿电流", + "接口类型": "模拟量输入(AI)", + "来源": "SCTJFH-CSSC-BCDL", + "目的地": "静电场防护计算" + }, + { + "需求编号": "IR-3.4.2.2-3", + "需求描述": "控制软件应通过CAN接口接收工况控制指令(如启停信号),并将该指令传递至电流采样模块。", + "接口名称": "工况控制指令", + "接口类型": "CAN接口", + "来源": "SCTCAN-CY-KZZL", + "目的地": "电流采样" + }, + { + "需求编号": "IR-3.4.2.2-4", + "需求描述": "控制软件应通过模拟量输入(AI)接口采集报警监控相关的温度与电流数据,并将报警信息发送至故障诊断与报警监控模块。", + "接口名称": "报警监控", + "接口类型": "模拟量输入(AI)", + "来源": "SCTTC-GZZD-BJJK", + "目的地": "故障诊断与报警监控" + } + ] + }, + "3.4.2.3 主推电场防护装置控制软件内部接口描述": { + "章节信息": { + "章节编号": "3.4.2.3", + "章节标题": "主推电场防护装置控制软件内部接口描述", + "章节级别": 4 + }, + "需求列表": [ + { + "需求编号": "IR-3.4.2.3-1", + "需求描述": "主推电场防护装置控制软件应通过CAN接口将设备参数和状态信息上传至CAN通讯单元。", + "接口名称": "参数上传状态信息", + "接口类型": "CAN接口", + "来源": "CSSC-CAN-ZTXX", + "目的地": "CAN通讯单元" + }, + { + "需求编号": "IR-3.4.2.3-2", + "需求描述": "静电场防护控制软件应接收来自静电场补偿电流模块的运行参数,并执行静电场防护计算。", + "接口名称": "静电场防护控制计算", + "接口类型": "AI接口", + "来源": "ZTJFH-CSSC-BCDL", + "目的地": "静电场防护计算" + }, + { + "需求编号": "IR-3.4.2.3-3", + "需求描述": "主推电场防护装置控制软件应通过CAN接口接收工况控制指令(如启停信号),并将其传递至电流采样和轴地电压采样模块。", + "接口名称": "工况控制指令", + "接口类型": "CAN接口", + "来源": "ZTCAN-CY-KZZL", + "目的地": "电流采样、轴地电压采样" + }, + { + "需求编号": "IR-3.4.2.3-4", + "需求描述": "报警监控模块应通过AI接口接收温度采集和电流采样数据,用于故障诊断与报警监控,并生成相应的报警信息。", + "接口名称": "报警监控", + "接口类型": "AI接口", + "来源": "ZTTC-GZZD-BJJK", + "目的地": "故障诊断与报警监控" + } + ] + }, + "3.4.3.1 显控软件内部接口输入输出数据详细信息": { + "章节信息": { + "章节编号": "3.4.3.1", + "章节标题": "显控软件内部接口输入输出数据详细信息", + "章节级别": 4 + }, + "需求列表": [ + { + "需求编号": "IR-3.4.3.1-1", + "需求描述": "显控软件应通过CAN接口接收来自显示屏的工况转换控制指令,该指令包含左侧艏侧推、右侧艏侧推、左轴主推、右轴主推的电场防护启停信号以及外加电流阴极保护启停信号。", + "接口名称": "VIEW-CAN-KZZL", + "接口类型": "CAN接口", + "来源": "显示屏", + "目的地": "CAN通讯单元" + }, + { + "需求编号": "IR-3.4.3.1-2", + "需求描述": "CAN通讯单元应通过CAN接口向显示屏发送设备运行状态参数,包括左右两侧艏侧推的参比电位、船体电位和补偿电流,以及左右主推的轴地电压、轴电流、轴频电场补偿电压与电流、静电场补偿电流和报警信号。", + "接口名称": "CAN-VIEW-ZTCS", + "接口类型": "CAN接口", + "来源": "CAN通讯单元", + "目的地": "显示屏" + }, + { + "需求编号": "IR-3.4.3.1-3", + "需求描述": "CAN通讯单元应通过CAN接口向显示屏发送报警信息,包括左侧艏侧推、右侧艏侧推、左轴主推和右轴主推的报警信号。", + "接口名称": "CAN-VIEW-BJ", + "接口类型": "CAN接口", + "来源": "CAN通讯单元", + "目的地": "显示屏" + }, + { + "需求编号": "IR-3.4.3.1-4", + "需求描述": "参数上传模块应通过CAN接口向CAN通讯单元发送艏侧推相关状态信息,包括左右艏侧推的参比电位和报警信号。", + "接口名称": "CSSC-CAN-SCTXX", + "接口类型": "CAN接口", + "来源": "参数上传", + "目的地": "CAN通讯单元" + }, + { + "需求编号": "IR-3.4.3.1-5", + "需求描述": "静电场防护计算模块应通过内部接口向参数上传模块输出艏侧推的静电场补偿电流,包括左侧和右侧艏侧推的补偿电流值。", + "接口名称": "SCTJFH-CSSC-BCDL", + "接口类型": "未知", + "来源": "静电场防护计算", + "目的地": "参数上传" + }, + { + "需求编号": "IR-3.4.3.1-6", + "需求描述": "CAN通讯单元应接收来自电流采样模块的工况控制指令,用于控制左侧和右侧艏侧推电场防护电流输出的启停状态。", + "接口名称": "SCTCAN-CY-KZZL", + "接口类型": "CAN接口", + "来源": "电流采样", + "目的地": "CAN通讯单元" + }, + { + "需求编号": "IR-3.4.3.1-7", + "需求描述": "CAN通讯单元应接收来自电位采集单元的船体电位输入信息,包括左侧和右侧艏侧推对应的船体电位数据。", + "接口名称": "SCTCAN-DC-CTDW", + "接口类型": "CAN接口", + "来源": "电位采集单元", + "目的地": "CAN通讯单元" + }, + { + "需求编号": "IR-3.4.3.1-8", + "需求描述": "故障诊断与报警监控模块应向温度采集及电流采样模块提供报警监控信息,包括左右艏侧推的过压和过流故障信号。", + "接口名称": "SCTTC-GZZD-BJJK", + "接口类型": "未知", + "来源": "故障诊断与报警监控", + "目的地": "温度采集、电流采样" + }, + { + "需求编号": "IR-3.4.3.1-9", + "需求描述": "参数上传模块应通过CAN接口向CAN通讯单元发送主推系统运行状态信息,包括左右主推的轴地电压、轴电流、轴频电场补偿电压与电流、静电场补偿电流及报警信号。", + "接口名称": "CSSC-CAN-ZTXX", + "接口类型": "CAN接口", + "来源": "参数上传", + "目的地": "CAN通讯单元" + }, + { + "需求编号": "IR-3.4.3.1-10", + "需求描述": "静电场防护计算模块应向参数上传模块输出主推系统的静电场补偿电流,包括左轴和右轴主推的静电场补偿电流值。", + "接口名称": "ZTJFH-CSSC-BCDL", + "接口类型": "未知", + "来源": "静电场防护计算", + "目的地": "参数上传" + }, + { + "需求编号": "IR-3.4.3.1-11", + "需求描述": "轴频电场防护计算模块应向运行参数上传模块输出主推系统的轴频电场补偿电流,包括左轴和右轴主推的轴频电场补偿电流值。", + "接口名称": "ZTZPFH-CSSC-BCDL", + "接口类型": "未知", + "来源": "轴频电场防护计算", + "目的地": "运行参数上传" + } + ] + } + } + } + } + }, + "3.7 安全性需求": { + "章节信息": { + "章节编号": "3.7", + "章节标题": "安全性需求", + "章节级别": 2 + }, + "需求列表": [ + { + "需求编号": "OR-3.7-1", + "需求描述": "监测到主推和艏侧推电场防护装置发生故障后,即时声光报警,提醒操作人员,确保设备使用安全。" + } + ] + }, + "3.8 保密性需求": { + "章节信息": { + "章节编号": "3.8", + "章节标题": "保密性需求", + "章节级别": 2 + }, + "需求列表": [ + { + "需求编号": "OR-3.8-1", + "需求描述": "对于核心算法不提供文字解释和描述,只提供算法实现和输入、输出参数。" + }, + { + "需求编号": "OR-3.8-2", + "需求描述": "对于部分核心功能模块不提供网络拷贝等功能。" + }, + { + "需求编号": "OR-3.8-3", + "需求描述": "本软件运行过程中,仅产生装载配置数据文件,该文件不需要作加密处理。" + } + ] + }, + "3.9 CSCI环境需求": { + "章节信息": { + "章节编号": "3.9", + "章节标题": "CSCI环境需求", + "章节级别": 2 + }, + "需求列表": [ + { + "需求编号": "OR-3.9-1", + "需求描述": "综合监控显控软件适用的操作系统是中标麒麟,并使用QtCreator4.0.3开发与调试。" + }, + { + "需求编号": "OR-3.9-2", + "需求描述": "主推和艏侧推电场防护装置控制软件适用的操作系统是uc/osⅢV3.04.04,并使用KeiluVision5V5.35.0.0工具进行开发与编译调试。" + } + ] + }, + "3.10 计算机资源需求": { + "章节信息": { + "章节编号": "3.10", + "章节标题": "计算机资源需求", + "章节级别": 2 + }, + "子章节": { + "3.10.1 计算机硬件需求": { + "章节信息": { + "章节编号": "3.10.1", + "章节标题": "计算机硬件需求", + "章节级别": 3 + }, + "需求列表": [ + { + "需求编号": "OR-3.10.1-1", + "需求描述": "综合监控装置显控软件(JK_DISP)运行于10.4英寸表页显示屏HJ/JYX-AQB0GCM-000A。" + }, + { + "需求编号": "OR-3.10.1-2", + "需求描述": "综合监控装置显控软件(JK_DISP)的CPU内核为IntelN2600。" + }, + { + "需求编号": "OR-3.10.1-3", + "需求描述": "综合监控装置显控软件(JK_DISP)的CPU主频为1.6GHz。" + }, + { + "需求编号": "OR-3.10.1-4", + "需求描述": "综合监控装置显控软件(JK_DISP)的内存RAM不小于1G。" + }, + { + "需求编号": "OR-3.10.1-5", + "需求描述": "综合监控装置显控软件(JK_DISP)的存储器ROM不小于8G。" + }, + { + "需求编号": "OR-3.10.1-6", + "需求描述": "综合监控装置显控软件(JK_DISP)的显示屏分辨率为1024*768。" + }, + { + "需求编号": "OR-3.10.1-7", + "需求描述": "综合监控装置显控软件(JK_DISP)具有NET网口1路。" + }, + { + "需求编号": "OR-3.10.1-8", + "需求描述": "综合监控装置显控软件(JK_DISP)具有CAN通讯口2路。" + }, + { + "需求编号": "OR-3.10.1-9", + "需求描述": "艏侧推电场防护装置控制软件(SCT_CTRL)运行硬件环境为嵌入式单片机GD32F105RGT6。" + }, + { + "需求编号": "OR-3.10.1-10", + "需求描述": "艏侧推电场防护装置控制软件(SCT_CTRL)的CPU内核为ARMCortex-M3。" + }, + { + "需求编号": "OR-3.10.1-11", + "需求描述": "艏侧推电场防护装置控制软件(SCT_CTRL)的CPU主频为108MHz。" + }, + { + "需求编号": "OR-3.10.1-12", + "需求描述": "艏侧推电场防护装置控制软件(SCT_CTRL)的FlashMemory为1024KB。" + }, + { + "需求编号": "OR-3.10.1-13", + "需求描述": "艏侧推电场防护装置控制软件(SCT_CTRL)的SRAM为64KB。" + }, + { + "需求编号": "OR-3.10.1-14", + "需求描述": "艏侧推电场防护装置控制软件(SCT_CTRL)具有CAN通讯口2路。" + } + ] + }, + "3.10.2 计算机硬件资源使用需求": { + "章节信息": { + "章节编号": "3.10.2", + "章节标题": "计算机硬件资源使用需求", + "章节级别": 3 + }, + "需求列表": [ + { + "需求编号": "OR-3.10.2-1", + "需求描述": "CPU使用率应留有50%以上余量。" + } + ] + }, + "3.10.4 计算机通信需求": { + "章节信息": { + "章节编号": "3.10.4", + "章节标题": "计算机通信需求", + "章节级别": 3 + }, + "需求列表": [ + { + "需求编号": "OR-3.10.4-1", + "需求描述": "以太网通信及CAN通信需要通信信道畅通,不丢失数据包。" + } + ] + } + } + }, + "3.13 人员需求": { + "章节信息": { + "章节编号": "3.13", + "章节标题": "人员需求", + "章节级别": 2 + }, + "需求列表": [ + { + "需求编号": "OR-3.13-1", + "需求描述": "对于本软件开发人员,必须精通脚本语言软件界面开发和uc/osⅡ环境下的嵌入式语言开发,具有CAN底层驱动开发经验3年以上,熟练使用C/C++语言。" + }, + { + "需求编号": "OR-3.13-2", + "需求描述": "对于本软件使用人员,在经过培训或按照用户手册和维护手册的说明,能够熟练使用、维护安装本软件。" + } + ] + }, + "3.14 培训需求": { + "章节信息": { + "章节编号": "3.14", + "章节标题": "培训需求", + "章节级别": 2 + }, + "需求列表": [ + { + "需求编号": "OR-3.14-1", + "需求描述": "结合设备对用户进行操作培训。" + } + ] + }, + "3.17 验收、交付和包装需求、": { + "章节信息": { + "章节编号": "3.17", + "章节标题": "验收、交付和包装需求、", + "章节级别": 2 + }, + "需求列表": [ + { + "需求编号": "OR-3.17-1", + "需求描述": "用户手册在软件发布时随同软件发布包一同交付" + }, + { + "需求编号": "OR-3.17-2", + "需求描述": "培训期间提供相应的培训资料" + }, + { + "需求编号": "OR-3.17-3", + "需求描述": "提交文件清单(需求规格说明书、设计说明书、测试报告、使用手册)" + } + ] + }, + "3.18 需求的优先顺序和关键程度": { + "章节信息": { + "章节编号": "3.18", + "章节标题": "需求的优先顺序和关键程度", + "章节级别": 2 + }, + "需求列表": [ + { + "需求编号": "OR-3.18-1", + "需求描述": "主推电场防护装置状态信息" + }, + { + "需求编号": "OR-3.18-2", + "需求描述": "艏侧推电场防护装置状态信息" + }, + { + "需求编号": "OR-3.18-3", + "需求描述": "外加电流阴极保护船体参比电位模拟量" + }, + { + "需求编号": "OR-3.18-4", + "需求描述": "主推启停信号开关量" + }, + { + "需求编号": "OR-3.18-5", + "需求描述": "艏侧推启停信号、船体参比电位" + }, + { + "需求编号": "OR-3.18-6", + "需求描述": "外加电流阴极保护启停信号开关量" + }, + { + "需求编号": "OR-3.18-7", + "需求描述": "系统运行参数" + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..669d9f3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +python-docx==0.8.11 +PyPDF2==3.0.1 +pyyaml==6.0 +requests==2.31.0 +dashscope==1.7.0 +pytest==7.4.3 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..dd95240 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,20 @@ +# src/__init__.py +""" +SRS 需求文档解析工具包 +""" + +__version__ = "1.0.0" +__author__ = "SRS Parser Team" + +from .document_parser import DocumentParser +from .llm_interface import LLMInterface, QwenLLM +from .requirement_extractor import RequirementExtractor +from .json_generator import JSONGenerator + +__all__ = [ + 'DocumentParser', + 'LLMInterface', + 'QwenLLM', + 'RequirementExtractor', + 'JSONGenerator' +] diff --git a/src/document_parser.py b/src/document_parser.py new file mode 100644 index 0000000..b4c6190 --- /dev/null +++ b/src/document_parser.py @@ -0,0 +1,597 @@ +# -*- coding: utf-8 -*- +""" +文档解析模块 - LLM增强版 +支持PDF和Docx格式,针对GJB438B标准SRS文档优化 +""" + +import os +import re +import logging +from abc import ABC, abstractmethod +from typing import List, Dict, Tuple, Optional +from pathlib import Path + +try: + from docx import Document + HAS_DOCX = True +except ImportError: + HAS_DOCX = False + +try: + import PyPDF2 + HAS_PDF = True +except ImportError: + HAS_PDF = False + +logger = logging.getLogger(__name__) + + +class Section: + """表示文档中的一个章节""" + + def __init__(self, level: int, title: str, number: str = None, content: str = "", uid: str = ""): + self.level = level + self.title = title + self.number = number + self.content = content + self.uid = uid + self.parent = None + self.children = [] + self.tables = [] + + def add_child(self, child: 'Section') -> None: + self.children.append(child) + child.parent = self + + def add_content(self, text: str) -> None: + if self.content: + self.content += "\n" + text + else: + self.content = text + + def add_table(self, table_data: List[List[str]]) -> None: + self.tables.append(table_data) + + def generate_auto_number(self, parent_number: str = "", sibling_index: int = 1) -> None: + """ + 自动生成章节编号(当章节没有编号时) + + Args: + parent_number: 父章节编号 + sibling_index: 在同级章节中的序号(从1开始) + """ + if not self.number: + if parent_number: + self.number = f"{parent_number}.{sibling_index}" + else: + self.number = str(sibling_index) + + def __repr__(self) -> str: + return f"Section(level={self.level}, number='{self.number}', title='{self.title}')" + + +class DocumentParser(ABC): + """文档解析器基类""" + + def __init__(self, file_path: str): + self.file_path = file_path + self.sections: List[Section] = [] + self.document_title = "" + self.raw_text = "" + self.llm = None + self._uid_counter = 0 + + def set_llm(self, llm) -> None: + """设置LLM实例""" + self.llm = llm + + @abstractmethod + def parse(self) -> List[Section]: + pass + + def get_document_title(self) -> str: + return self.document_title + + def _next_uid(self) -> str: + self._uid_counter += 1 + return f"sec-{self._uid_counter}" + + def _auto_number_sections(self, sections: List[Section], parent_number: str = "") -> None: + """ + 为没有编号的章节自动生成编号 + + 规则:使用Word样式确定级别,跳过前置章节(目录、概述等), + 从第一个正文章节(如"外部接口")开始编号为1 + + Args: + sections: 章节列表 + parent_number: 父章节编号 + """ + # 仅在顶级章节重编号 + if not parent_number: + # 前置章节关键词(需要跳过的) + skip_keywords = ['目录', '封面', '扉页', '未命名', '年', '月'] + # 正文章节关键词(遇到这些说明正文开始) + content_keywords = ['外部接口', '接口', '软件需求', '需求', '功能', '性能', '设计', '概述', '标识', '引言'] + + start_index = 0 + for idx, section in enumerate(sections): + # 优先检查是否是正文章节 + is_content = any(kw in section.title for kw in content_keywords) + if is_content and section.level == 1: + start_index = idx + break + + # 重新编号所有章节 + counter = 1 + for i, section in enumerate(sections): + if i < start_index: + # 前置章节不编号 + section.number = "" + else: + # 正文章节:顶级章节从1开始编号 + if section.level == 1: + section.number = str(counter) + counter += 1 + + # 递归处理子章节 + if section.children: + self._auto_number_sections(section.children, section.number) + else: + # 子章节编号 + for i, section in enumerate(sections, 1): + if not section.number or self._is_chinese_number(section.number): + section.generate_auto_number(parent_number, i) + if section.children: + self._auto_number_sections(section.children, section.number) + + def _is_chinese_number(self, text: str) -> bool: + """检查是否是中文数字编号""" + chinese_numbers = '一二三四五六七八九十百千万' + return text and all(c in chinese_numbers for c in text) + + +class DocxParser(DocumentParser): + """DOCX格式文档解析器""" + + def __init__(self, file_path: str): + if not HAS_DOCX: + raise ImportError("python-docx库未安装,请运行: pip install python-docx") + super().__init__(file_path) + self.document = None + + def parse(self) -> List[Section]: + try: + self.document = Document(self.file_path) + self.document_title = self.document.core_properties.title or "SRS Document" + + section_stack = {} + + for block in self._iter_block_items(self.document): + from docx.text.paragraph import Paragraph + from docx.table import Table + if isinstance(block, Paragraph): + text = block.text.strip() + if not text: + continue + + heading_info = self._parse_heading(block, text) + if heading_info: + number, title, level = heading_info + section = Section(level=level, title=title, number=number, uid=self._next_uid()) + + if level == 1 or not section_stack: + self.sections.append(section) + section_stack = {1: section} + else: + parent_level = level - 1 + while parent_level >= 1 and parent_level not in section_stack: + parent_level -= 1 + + if parent_level >= 1 and parent_level in section_stack: + section_stack[parent_level].add_child(section) + elif self.sections: + self.sections[-1].add_child(section) + + section_stack[level] = section + for l in list(section_stack.keys()): + if l > level: + del section_stack[l] + else: + # 添加内容到当前章节 + if section_stack: + max_level = max(section_stack.keys()) + section_stack[max_level].add_content(text) + else: + # 没有标题时,创建默认章节 + default_section = Section(level=1, title="未命名章节", number="", uid=self._next_uid()) + default_section.add_content(text) + self.sections.append(default_section) + section_stack = {1: default_section} + elif isinstance(block, Table): + # 表格处理 + table_data = self._extract_table_data(block) + if table_data: + if section_stack: + max_level = max(section_stack.keys()) + section_stack[max_level].add_table(table_data) + else: + default_section = Section(level=1, title="未命名章节", number="", uid=self._next_uid()) + default_section.add_table(table_data) + self.sections.append(default_section) + section_stack = {1: default_section} + + # 为没有编号的章节自动生成编号 + self._auto_number_sections(self.sections) + + logger.info(f"完成Docx解析,提取{len(self.sections)}个顶级章节") + return self.sections + + except Exception as e: + logger.error(f"解析Docx文档失败: {e}") + raise + + def _is_valid_heading(self, text: str) -> bool: + """检查是否是有效的标题""" + if len(text) > 120 or '...' in text: + return False + # 标题应包含中文或字母 + if not re.search(r'[\u4e00-\u9fa5A-Za-z]', text): + return False + # 过滤目录项(标题后跟页码,如"概述 2"或"概述 . . . . 2") + if re.search(r'\s{2,}\d+$', text): # 多个空格后跟数字结尾 + return False + if re.search(r'[\.。\s]+\d+$', text): # 点号或空格后跟数字结尾 + return False + return True + + def _parse_heading(self, paragraph, text: str) -> Optional[Tuple[str, str, int]]: + """解析标题,返回(编号, 标题, 级别)""" + style_name = paragraph.style.name if paragraph.style else "" + is_heading_style = style_name.lower().startswith('heading') if style_name else False + + # 数字编号标题 + match = re.match(r'^(\d+(?:\.\d+)*)\s*[\.、]?\s*(.+)$', text) + if match and self._is_valid_heading(match.group(2)): + number = match.group(1) + title = match.group(2).strip() + level = len(number.split('.')) + return number, title, level + + # 中文编号标题 + match = re.match(r'^([一二三四五六七八九十]+)[、\.]+\s*(.+)$', text) + if match and self._is_valid_heading(match.group(2)): + number = match.group(1) + title = match.group(2).strip() + level = 1 + return number, title, level + + # 样式标题 + if is_heading_style and self._is_valid_heading(text): + level = 1 + level_match = re.search(r'(\d+)', style_name) + if level_match: + level = int(level_match.group(1)) + return "", text, level + + return None + + def _iter_block_items(self, parent): + """按文档顺序迭代段落和表格""" + from docx.text.paragraph import Paragraph + from docx.table import Table + from docx.oxml.text.paragraph import CT_P + from docx.oxml.table import CT_Tbl + + for child in parent.element.body.iterchildren(): + if isinstance(child, CT_P): + yield Paragraph(child, parent) + elif isinstance(child, CT_Tbl): + yield Table(child, parent) + + def _extract_table_data(self, table) -> List[List[str]]: + """提取表格数据""" + table_data = [] + for row in table.rows: + row_data = [] + for cell in row.cells: + text = cell.text.replace('\n', ' ').strip() + text = re.sub(r'\s+', ' ', text) + row_data.append(text) + if any(cell for cell in row_data): + table_data.append(row_data) + return table_data + + +class PDFParser(DocumentParser): + """PDF格式文档解析器 - LLM增强版""" + + # GJB438B标准SRS文档的有效章节标题关键词 + VALID_TITLE_KEYWORDS = [ + '范围', '标识', '概述', '引用', '文档', + '需求', '功能', '接口', '性能', '安全', '保密', + '环境', '资源', '质量', '设计', '约束', + '人员', '培训', '保障', '验收', '交付', '包装', + '优先', '关键', '合格', '追踪', '注释', + 'CSCI', '计算机', '软件', '硬件', '通信', '通讯', + '数据', '适应', '可靠', '内部', '外部', + '描述', '要求', '规定', '说明', '定义', + '电场', '防护', '装置', '控制', '监控', '显控' + ] + + # 明显无效的章节标题模式(噪声) + INVALID_TITLE_PATTERNS = [ + '本文档可作为', '参比电位', '补偿电流', '以太网', + '电源', '软件接', '功能\\', '性能 \\', '输入/输出 \\', + '数据处理要求 \\', '固件 \\', '质量控制要求', + '信安科技', '浙江', '公司' + ] + + def __init__(self, file_path: str): + if not HAS_PDF: + raise ImportError("PyPDF2库未安装,请运行: pip install PyPDF2") + super().__init__(file_path) + self.document_title = "SRS Document" + + def parse(self) -> List[Section]: + """解析PDF文档""" + try: + # 1. 提取所有文本 + self.raw_text = self._extract_all_text() + + # 2. 清洗文本 + cleaned_text = self._clean_text(self.raw_text) + + # 3. 识别章节结构 + self.sections = self._parse_sections(cleaned_text) + + # 4. 使用LLM验证和清理章节(如果可用) + if self.llm: + self.sections = self._llm_validate_sections(self.sections) + + # 5. 为没有编号的章节自动生成编号 + self._auto_number_sections(self.sections) + + logger.info(f"完成PDF解析,提取{len(self.sections)}个顶级章节") + return self.sections + + except Exception as e: + logger.error(f"解析PDF文档失败: {e}") + raise + + def _extract_all_text(self) -> str: + """从PDF提取所有文本""" + all_text = [] + with open(self.file_path, 'rb') as f: + pdf_reader = PyPDF2.PdfReader(f) + for page in pdf_reader.pages: + text = page.extract_text() + if text: + all_text.append(text) + return '\n'.join(all_text) + + def _clean_text(self, text: str) -> str: + """清洗PDF提取的文本""" + lines = text.split('\n') + cleaned_lines = [] + + for line in lines: + line = line.strip() + if not line: + continue + # 跳过页码(通常是1-3位数字单独一行) + if re.match(r'^\d{1,3}$', line): + continue + # 跳过目录行 + if line.count('.') > 10 and '...' in line: + continue + + cleaned_lines.append(line) + + return '\n'.join(cleaned_lines) + + def _parse_sections(self, text: str) -> List[Section]: + """解析章节结构""" + sections = [] + section_stack = {} + lines = text.split('\n') + current_section = None + content_buffer = [] + found_sections = set() + + for line in lines: + line = line.strip() + if not line: + continue + + # 尝试匹配章节标题 + section_info = self._match_section_header(line, found_sections) + + if section_info: + number, title = section_info + level = len(number.split('.')) + + # 保存之前章节的内容 + if current_section and content_buffer: + current_section.add_content('\n'.join(content_buffer)) + content_buffer = [] + + # 创建新章节 + section = Section(level=level, title=title, number=number, uid=self._next_uid()) + found_sections.add(number) + + # 建立层次结构 + if level == 1: + sections.append(section) + section_stack = {1: section} + else: + parent_level = level - 1 + while parent_level >= 1 and parent_level not in section_stack: + parent_level -= 1 + + if parent_level >= 1 and parent_level in section_stack: + section_stack[parent_level].add_child(section) + elif sections: + sections[-1].add_child(section) + else: + sections.append(section) + section_stack = {1: section} + + section_stack[level] = section + for l in list(section_stack.keys()): + if l > level: + del section_stack[l] + + current_section = section + else: + # 收集内容 + if line and not self._is_noise(line): + content_buffer.append(line) + + # 保存最后一个章节的内容 + if current_section and content_buffer: + current_section.add_content('\n'.join(content_buffer)) + + return sections + + def _match_section_header(self, line: str, found_sections: set) -> Optional[Tuple[str, str]]: + """ + 匹配章节标题 + + Returns: + (章节编号, 章节标题) 或 None + """ + # 模式: "3.1功能需求" 或 "3.1 功能需求" + match = re.match(r'^(\d+(?:\.\d+)*)\s*(.+)$', line) + if not match: + return None + + number = match.group(1) + title = match.group(2).strip() + + # 排除目录行 + if '...' in title or title.count('.') > 5: + return None + + # 验证章节编号 + parts = number.split('.') + first_part = int(parts[0]) + + # 放宽一级章节编号范围(非严格GJB结构) + if first_part < 1 or first_part > 30: + return None + + # 检查子部分是否合理 + for part in parts[1:]: + if int(part) > 20: + return None + + # 避免重复 + if number in found_sections: + return None + + # 标题长度检查 + if len(title) > 60 or len(title) < 2: + return None + + # 标题必须包含中文 + if not re.search(r'[\u4e00-\u9fa5]', title): + return None + + # 放宽标题关键词要求(非严格GJB结构) + if not re.search(r'[\u4e00-\u9fa5A-Za-z]', title): + return None + + # 检查是否包含无效模式 + for invalid_pattern in self.INVALID_TITLE_PATTERNS: + if invalid_pattern in title: + return None + + # 标题不能以数字开头 + if title[0].isdigit(): + return None + + # 数字比例检查 + digit_ratio = sum(c.isdigit() for c in title) / max(len(title), 1) + if digit_ratio > 0.3: + return None + + # 检查标题是否包含反斜杠(通常是表格噪声) + if '\\' in title and '需求' not in title: + return None + + return (number, title) + + def _is_noise(self, line: str) -> bool: + """检查是否是噪声行""" + # 纯数字行 + if re.match(r'^[\d\s,.]+$', line): + return True + # 非常短的行 + if len(line) < 3: + return True + # 罗马数字 + if re.match(r'^[ivxIVX]+$', line): + return True + return False + + def _llm_validate_sections(self, sections: List[Section]) -> List[Section]: + """使用LLM验证章节是否有效""" + if not self.llm: + return sections + + validated_sections = [] + + for section in sections: + # 验证顶级章节 + if self._is_valid_section_with_llm(section): + # 递归验证子章节 + section.children = self._validate_children(section.children) + validated_sections.append(section) + + return validated_sections + + def _validate_children(self, children: List[Section]) -> List[Section]: + """递归验证子章节""" + validated = [] + for child in children: + if self._is_valid_section_with_llm(child): + child.children = self._validate_children(child.children) + validated.append(child) + return validated + + def _is_valid_section_with_llm(self, section: Section) -> bool: + """使用LLM判断章节是否有效""" + # 先用规则快速过滤明显无效的章节 + invalid_titles = [ + '本文档可作为', '故障', '实时', '输入/输出', + '固件', '功能\\', '\\4.', '\\3.' + ] + for invalid in invalid_titles: + if invalid in section.title: + logger.debug(f"过滤无效章节: {section.number} {section.title}") + return False + + # 对于需求相关章节(第3章),额外验证 + if section.number and section.number.startswith('3'): + # 检查标题是否看起来像是有效的需求章节标题 + # 有效的标题应该是完整的中文短语 + if '\\' in section.title or '/' in section.title: + if not any(kw in section.title for kw in ['输入', '输出', '接口']): + return False + + return True + + +def create_parser(file_path: str) -> DocumentParser: + """ + 工厂函数:根据文件扩展名创建相应的解析器 + """ + ext = Path(file_path).suffix.lower() + + if ext == '.docx': + return DocxParser(file_path) + elif ext == '.pdf': + return PDFParser(file_path) + else: + raise ValueError(f"不支持的文件格式: {ext}") diff --git a/src/json_generator.py b/src/json_generator.py new file mode 100644 index 0000000..0eca52c --- /dev/null +++ b/src/json_generator.py @@ -0,0 +1,214 @@ +# -*- coding: utf-8 -*- +""" +JSON生成器模块 - LLM增强版 +将提取的需求和章节结构转换为结构化JSON输出 +""" + +import json +import logging +from datetime import datetime +from typing import List, Dict, Any, Optional +from .document_parser import Section +from .requirement_extractor import Requirement + +logger = logging.getLogger(__name__) + + +class JSONGenerator: + """JSON输出生成器""" + + # 需求类型中文映射 + TYPE_CHINESE = { + 'functional': '功能需求', + 'interface': '接口需求', + 'performance': '其他需求', + 'security': '其他需求', + 'reliability': '其他需求', + 'other': '其他需求' + } + + # 非需求章节(不输出到JSON) + NON_REQUIREMENT_SECTIONS = [ + '标识', '系统概述', '文档概述', '引用文档', + '合格性规定', '需求可追踪性', '注释', '附录', + '范围', '概述' + ] + + def __init__(self, config: Dict = None): + self.config = config or {} + + def generate(self, sections: List[Section], requirements: List[Requirement], + document_title: str = "SRS Document") -> Dict[str, Any]: + """ + 生成JSON输出 + + Args: + sections: 章节列表 + requirements: 需求列表 + document_title: 文档标题 + + Returns: + 结构化JSON字典 + """ + # 按章节组织需求 + reqs_by_section = self._group_requirements_by_section(requirements) + + # 统计需求类型 + type_stats = self._calculate_type_statistics(requirements) + + # 构建输出结构 + output = { + "文档元数据": { + "标题": document_title, + "生成时间": datetime.now().isoformat(), + "总需求数": len(requirements), + "需求类型统计": type_stats + }, + "需求内容": self._build_requirement_content(sections, reqs_by_section) + } + + logger.info(f"生成JSON输出,共{len(requirements)}个需求") + return output + + def _group_requirements_by_section(self, requirements: List[Requirement]) -> Dict[str, List[Requirement]]: + """按章节编号分组需求""" + grouped = {} + for req in requirements: + section_key = req.section_uid or req.section_number or 'unknown' + if section_key not in grouped: + grouped[section_key] = [] + grouped[section_key].append(req) + return grouped + + def _calculate_type_statistics(self, requirements: List[Requirement]) -> Dict[str, int]: + """计算需求类型统计""" + stats = {} + for req in requirements: + type_chinese = self.TYPE_CHINESE.get(req.type, '其他需求') + if type_chinese not in stats: + stats[type_chinese] = 0 + stats[type_chinese] += 1 + return stats + + def _should_include_section(self, section: Section) -> bool: + """判断章节是否应该包含在输出中""" + # 排除非需求章节 + for keyword in self.NON_REQUIREMENT_SECTIONS: + if keyword in section.title: + return False + + return True + + def _build_requirement_content(self, sections: List[Section], + reqs_by_section: Dict[str, List[Requirement]]) -> Dict[str, Any]: + """构建需求内容的层次结构""" + content = {} + + for section in sections: + # 只处理需求相关章节 + if not self._should_include_section(section): + # 但仍需检查子章节 + for child in section.children: + child_content = self._build_section_content_recursive(child, reqs_by_section) + if child_content: + key = f"{child.number} {child.title}" if child.number else child.title + content[key] = child_content + continue + + section_content = self._build_section_content_recursive(section, reqs_by_section) + if section_content: + key = f"{section.number} {section.title}" if section.number else section.title + content[key] = section_content + + return content + + def _build_section_content_recursive(self, section: Section, + reqs_by_section: Dict[str, List[Requirement]]) -> Optional[Dict[str, Any]]: + """递归构建章节内容""" + # 检查是否应该包含此章节 + if not self._should_include_section(section): + return None + + # 章节基本信息 + result = { + "章节信息": { + "章节编号": section.number or "", + "章节标题": section.title, + "章节级别": section.level + } + } + + # 检查是否有子章节 + has_valid_children = False + subsections = {} + + for child in section.children: + child_content = self._build_section_content_recursive(child, reqs_by_section) + if child_content: + has_valid_children = True + key = f"{child.number} {child.title}" if child.number else child.title + subsections[key] = child_content + + # 添加当前章节需求 + reqs = reqs_by_section.get(section.uid or section.number or 'unknown', []) + if reqs: + result["需求列表"] = [] + for req in reqs: + # 需求类型放在最前面 + type_chinese = self.TYPE_CHINESE.get(req.type, '功能需求') + req_dict = { + "需求类型": type_chinese, + "需求编号": req.id, + "需求描述": req.description + } + # 接口需求增加额外字段 + if req.type == 'interface': + req_dict["接口名称"] = req.interface_name + req_dict["接口类型"] = req.interface_type + req_dict["来源"] = req.source + req_dict["目的地"] = req.destination + result["需求列表"].append(req_dict) + + # 如果有子章节,添加子章节 + if has_valid_children: + result["子章节"] = subsections + + # 如果章节既没有需求也没有子章节,返回None + if "需求列表" not in result and "子章节" not in result: + return None + + return result + + def save_to_file(self, output: Dict[str, Any], file_path: str) -> None: + """ + 将输出保存到文件 + + Args: + output: 输出字典 + file_path: 输出文件路径 + """ + try: + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(output, f, ensure_ascii=False, indent=2) + logger.info(f"成功保存JSON到: {file_path}") + except Exception as e: + logger.error(f"保存JSON文件失败: {e}") + raise + + def generate_and_save(self, sections: List[Section], requirements: List[Requirement], + document_title: str, file_path: str) -> Dict[str, Any]: + """ + 生成并保存JSON + + Args: + sections: 章节列表 + requirements: 需求列表 + document_title: 文档标题 + file_path: 输出文件路径 + + Returns: + 生成的输出字典 + """ + output = self.generate(sections, requirements, document_title) + self.save_to_file(output, file_path) + return output diff --git a/src/llm_interface.py b/src/llm_interface.py new file mode 100644 index 0000000..b2db801 --- /dev/null +++ b/src/llm_interface.py @@ -0,0 +1,197 @@ +# src/llm_interface.py +""" +LLM接口模块 - 支持多个LLM提供商 +""" + +import logging +import json +from abc import ABC, abstractmethod +from typing import Dict, List, Optional, Any + +from .utils import get_env_or_config + +logger = logging.getLogger(__name__) + + +class LLMInterface(ABC): + """LLM接口基类""" + + def __init__(self, api_key: str = None, model: str = None, **kwargs): + """ + 初始化LLM接口 + + Args: + api_key: API密钥 + model: 模型名称 + **kwargs: 其他参数(如temperature, max_tokens等) + """ + self.api_key = api_key + self.model = model + self.extra_params = kwargs + + @abstractmethod + def call(self, prompt: str) -> str: + """ + 调用LLM API + + Args: + prompt: 提示词 + + Returns: + LLM的响应文本 + """ + pass + + @abstractmethod + def call_json(self, prompt: str) -> Dict[str, Any]: + """ + 调用LLM API并获取JSON格式的响应 + + Args: + prompt: 提示词 + + Returns: + 解析后的JSON字典 + """ + pass + + def validate_config(self) -> bool: + """验证配置是否完整""" + return bool(self.api_key and self.model) + + +class QwenLLM(LLMInterface): + """阿里云千问LLM实现""" + + def __init__(self, api_key: str = None, model: str = "qwen-plus", + api_endpoint: str = None, **kwargs): + """ + 初始化千问LLM + + Args: + api_key: 阿里云API密钥 + model: 模型名称(如qwen-plus, qwen-turbo) + api_endpoint: API端点地址 + **kwargs: 其他参数 + """ + super().__init__(api_key, model, **kwargs) + self.api_endpoint = api_endpoint or "https://dashscope.aliyuncs.com/compatible-mode/v1" + self._check_dashscope_import() + + def _check_dashscope_import(self) -> None: + """检查dashscope库是否已安装""" + try: + import dashscope + self.dashscope = dashscope + except ImportError: + logger.error("dashscope库未安装,请运行: pip install dashscope") + raise + + def call(self, prompt: str) -> str: + """ + 调用千问LLM + + Args: + prompt: 提示词 + + Returns: + LLM的响应文本 + """ + if not self.validate_config(): + raise ValueError("LLM配置不完整(api_key或model未设置)") + + try: + from dashscope import Generation + + # 设置API密钥 + self.dashscope.api_key = self.api_key + + # 构建请求参数 - dashscope 1.7.0 格式 + response = Generation.call( + model=self.model, + messages=[ + {'role': 'user', 'content': prompt} + ], + result_format='message' # 使用message格式 + ) + + # 调试输出 + logger.debug(f"API响应类型: {type(response)}") + logger.debug(f"API响应内容: {response}") + + # 处理响应 + if isinstance(response, dict): + # dict格式响应 + status_code = response.get('status_code', 200) + if status_code == 200: + output = response.get('output', {}) + if 'choices' in output: + return output['choices'][0]['message']['content'] + elif 'text' in output: + return output['text'] + else: + # 尝试直接获取text + return output.get('text', str(output)) + else: + error_msg = response.get('message', response.get('code', 'Unknown error')) + logger.error(f"千问API返回错误: {error_msg}") + raise Exception(f"API调用失败: {error_msg}") + else: + # 对象格式响应 + if hasattr(response, 'status_code') and response.status_code == 200: + output = response.output + if hasattr(output, 'choices'): + return output.choices[0].message.content + elif hasattr(output, 'text'): + return output.text + else: + return str(output) + elif hasattr(response, 'status_code'): + error_msg = getattr(response, 'message', str(response)) + raise Exception(f"API调用失败: {error_msg}") + else: + return str(response) + + except Exception as e: + logger.error(f"调用千问LLM失败: {e}") + raise + + def call_json(self, prompt: str) -> Dict[str, Any]: + """ + 调用千问LLM并获取JSON格式响应 + + Args: + prompt: 提示词 + + Returns: + 解析后的JSON字典 + """ + # 添加JSON格式要求到提示词 + json_prompt = prompt + "\n\n请确保响应是有效的JSON格式。" + + response = self.call(json_prompt) + + try: + # 尝试解析JSON + # 首先尝试直接解析 + return json.loads(response) + except json.JSONDecodeError: + # 尝试提取JSON代码块 + try: + import re + # 查找JSON代码块 + json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL) + if json_match: + return json.loads(json_match.group(1)) + + # 尝试查找任何JSON对象 + json_match = re.search(r'\{.*\}', response, re.DOTALL) + if json_match: + return json.loads(json_match.group(0)) + + except Exception as e: + logger.warning(f"无法从响应中提取JSON: {e}") + + # 如果都失败,返回错误信息 + logger.error(f"无法解析LLM响应为JSON: {response}") + return {"error": "Failed to parse response as JSON", "raw_response": response} diff --git a/src/requirement_extractor.py b/src/requirement_extractor.py new file mode 100644 index 0000000..6a9a1a0 --- /dev/null +++ b/src/requirement_extractor.py @@ -0,0 +1,643 @@ +# -*- coding: utf-8 -*- +""" +需求提取器模块 - LLM增强版 +使用阿里云千问大模型智能提取和分类需求 +""" + +import re +import json +import logging +from typing import List, Dict, Optional, Tuple, Any +from .document_parser import Section + +logger = logging.getLogger(__name__) + + +class Requirement: + """表示一个需求项""" + + def __init__(self, req_id: str, description: str, req_type: str = "functional", + section_number: str = "", section_title: str = "", + interface_name: str = "", interface_type: str = "", + section_uid: str = "", + source: str = "", destination: str = ""): + self.id = req_id + self.description = description + self.type = req_type + self.section_number = section_number + self.section_title = section_title + self.section_uid = section_uid + # 接口需求特有字段 + self.interface_name = interface_name + self.interface_type = interface_type + self.source = source + self.destination = destination + + def to_dict(self) -> Dict: + result = { + "需求编号": self.id, + "需求描述": self.description + } + # 接口需求增加额外字段 + if self.type == 'interface': + result["接口名称"] = self.interface_name + result["接口类型"] = self.interface_type + result["来源"] = self.source + result["目的地"] = self.destination + return result + + def __repr__(self) -> str: + return f"Requirement(id='{self.id}', type='{self.type}')" + + +class RequirementExtractor: + """需求提取器 - LLM增强版""" + + # 需求类型前缀映射 + TYPE_PREFIX = { + 'functional': 'FR', + 'interface': 'IR', + 'performance': 'PR', + 'security': 'SR', + 'reliability': 'RR', + 'other': 'OR' + } + + # 中文类型到英文的映射 + TYPE_MAPPING = { + '功能需求': 'functional', + '接口需求': 'interface', + '其他需求': 'other' + } + + # 非需求章节(应该跳过的) + NON_REQUIREMENT_SECTIONS = [ + '标识', '系统概述', '文档概述', '引用文档', + '合格性规定', '需求可追踪性', '注释', '附录', + '范围', '概述' + ] + + def __init__(self, config: Dict = None, llm=None): + self.config = config or {} + self.llm = llm + self.requirements: List[Requirement] = [] + self._req_counters: Dict[str, Dict[str, int]] = {} # {section_number: {type: count}} + + def extract_from_sections(self, sections: List[Section]) -> List[Requirement]: + """ + 从章节列表中提取需求 + + Args: + sections: 解析后的章节列表 + + Returns: + 需求列表 + """ + self.requirements = [] + self._req_counters = {} + + for section in sections: + self._process_section(section) + + logger.info(f"共提取 {len(self.requirements)} 个需求项") + return self.requirements + + def _process_section(self, section: Section, depth: int = 0) -> None: + """递归处理章节,提取需求""" + # 检查是否应该跳过此章节 + if self._should_skip_section(section): + logger.debug(f"跳过非需求章节: {section.number} {section.title}") + for child in section.children: + self._process_section(child, depth + 1) + return + + # 先提取当前章节需求(包含表格) + reqs = self._extract_requirements_from_section(section) + self.requirements.extend(reqs) + + # 再递归处理子章节 + for child in section.children: + self._process_section(child, depth + 1) + + def _should_skip_section(self, section: Section) -> bool: + """判断是否应该跳过此章节""" + # 检查标题是否包含非需求关键词 + for keyword in self.NON_REQUIREMENT_SECTIONS: + if keyword in section.title: + return True + + # 检查是否是系统描述章节(如3.1.1通常是系统描述) + if self._is_system_description(section): + return True + + return False + + def _is_system_description(self, section: Section) -> bool: + """判断是否是系统描述章节(应该跳过)""" + # 检查标题 + desc_keywords = ['系统描述', '功能描述', '概述', '示意图', '组成'] + for kw in desc_keywords: + if kw in section.title: + return True + + # 使用LLM判断 + if self.llm and section.content: + try: + result = self._llm_check_system_description(section) + return result + except Exception as e: + logger.warning(f"LLM判断失败,使用规则判断: {e}") + + return False + + def _llm_check_system_description(self, section: Section) -> bool: + """使用LLM判断是否是系统描述""" + prompt = f"""请判断以下章节是否是对系统的整体描述(而不是具体的功能需求)。 + +章节编号:{section.number} +章节标题:{section.title} +章节内容(前500字符): +{section.content[:500] if section.content else '无'} + +请只回答"是"或"否": +- "是":这是系统整体描述、功能模块组成介绍、系统架构说明等概述性内容 +- "否":这是具体的功能需求、接口需求、性能要求等可提取的需求内容 + +回答(只需要回答"是"或"否"):""" + + response = self.llm.call(prompt).strip() + return '是' in response + + def _extract_requirements_from_section(self, section: Section) -> List[Requirement]: + """从单个章节提取需求""" + requirements = [] + + # 获取需求类型 + req_type = self._identify_requirement_type(section.title, section.content) + + if self.llm: + # 使用LLM提取需求 + reqs = self._llm_extract_requirements(section, req_type) + requirements.extend(reqs) + else: + # 使用规则提取 + reqs = self._rule_extract_requirements(section, req_type) + requirements.extend(reqs) + + return requirements + + def _llm_extract_requirements(self, section: Section, req_type: str) -> List[Requirement]: + """使用LLM提取需求""" + requirements = [] + + content_text = section.content or "" + table_text = self._format_tables_for_prompt(section.tables) + if len(content_text.strip()) < 8 and not table_text: + return requirements + + # 根据需求类型构建不同的提示词 + if req_type == 'interface': + # 接口需求:允许改写润色,并提取接口详细信息 + prompt = f"""请从以下SRS文档章节中提取具体的接口需求,并对需求描述进行改写润色。同时智能识别每个接口的详细信息。 + +章节编号:{section.number} +章节标题:{section.title} +章节内容: +{content_text} + +章节内表格(若有): +{table_text if table_text else '无'} + +提取要求: +1. 只提取具体的、可验证的接口需求 +2. 不要提取系统描述、背景说明等非需求内容 +3. 去除原文中的换行符、表格格式噪声 +4. 对提取的需求描述进行改写润色,使其更加清晰完整 +5. 每条需求应该是完整的句子,描述清楚接口规范 +6. 如果有多条需求,请分别列出 +7. 对于每条接口需求,请智能识别以下信息: + - interface_name: 接口名称 + - interface_type: 接口类型 (如:CAN接口、以太网接口、串口等) + - source: 来源/发送方(数据或信号从哪里来) + - destination: 目的地/接收方(数据或信号发送到哪里) +8. 如果某个字段无法从文本中识别,请填写"未知" +9. 若原文给出需求编号,请优先使用原文编号(req_id) + +请以JSON格式输出,格式如下: +{{ + "requirements": [ + {{ + "req_id": "需求编号(如有)", + "description": "接口需求描述", + "interface_name": "接口名称", + "interface_type": "接口类型", + "source": "来源", + "destination": "目的地" + }} + ] +}} + +如果该章节没有可提取的需求,返回空数组: +{{"requirements": []}} + +JSON输出:""" + else: + # 功能需求、其他需求:保留原文描述,不改写润色 + prompt = f"""请从以下SRS文档章节中提取具体的软件需求。保持原文描述,不要改写或润色。 + +章节编号:{section.number} +章节标题:{section.title} +章节内容: +{content_text} + +章节内表格(若有): +{table_text if table_text else '无'} + +提取要求: +1. 同时提取正文与表格中的具体、可验证的软件需求 +2. 不要提取系统描述、背景说明等非需求内容 +3. 保持原文描述,不要对需求进行改写、润色或重新组织 +4. 去除原文中的多余换行符和表格格式符号,但保留语句内容 +5. 每条需求应该是完整的句子 +6. 如果有多条需求,请分别列出 +7. 如果一段需求描述内有多条需求,请尽量拆分成独立的需求项 +8. 过滤重复或过于相似的需求,只保留独特的需求 +9. 若原文给出需求编号,请优先使用原文编号(req_id) + +请以JSON格式输出,格式如下: +{{ + "requirements": [ + {{"req_id": "需求编号(如有)", "description": "需求描述1"}}, + {{"req_id": "需求编号(如有)", "description": "需求描述2"}} + ] +}} + +如果该章节没有可提取的需求,返回空数组: +{{"requirements": []}} + +JSON输出:""" + + try: + response = self.llm.call(prompt) + data = self._parse_llm_json_response(response) + + if data and 'requirements' in data: + # 查找父需求编号(第一个合法完整编号的需求) + parent_req_id = "" + complete_id_pattern = r'^[A-Za-z0-9]{2,10}[-_].+$' + for req_data in data['requirements']: + temp_id = self._normalize_req_id(req_data.get('req_id', '') or req_data.get('id', '')) + if not temp_id: + temp_desc = req_data.get('description', '').strip() + temp_id, _ = self._extract_requirement_id_from_text(temp_desc) + # 验证是否为合法的完整编号格式 + if temp_id and re.match(complete_id_pattern, temp_id): + parent_req_id = temp_id.replace('_', '-') + break + + for i, req_data in enumerate(data['requirements'], 1): + desc = req_data.get('description', '').strip() + if desc and len(desc) > 5: + # 清理描述中的多余换行符和表格符号 + desc = self._clean_description(desc) + + # 需求ID优先使用文档给出的编号 + doc_req_id = self._normalize_req_id(req_data.get('req_id', '') or req_data.get('id', '')) + if not doc_req_id: + doc_req_id, desc = self._extract_requirement_id_from_text(desc) + + # 生成最终的需求ID(三级优先级) + req_id = self._generate_requirement_id(req_type, section.number, i, doc_req_id, parent_req_id) + + # 接口需求提取额外字段 + interface_name = "" + interface_type = "" + source = "" + destination = "" + if req_type == 'interface': + interface_name = req_data.get('interface_name', '未知').strip() + interface_type = req_data.get('interface_type', '未知').strip() + source = req_data.get('source', '未知').strip() + destination = req_data.get('destination', '未知').strip() + + req = Requirement( + req_id=req_id, + description=desc, + req_type=req_type, + section_number=section.number, + section_title=section.title, + section_uid=section.uid, + interface_name=interface_name, + interface_type=interface_type, + source=source, + destination=destination + ) + requirements.append(req) + except Exception as e: + logger.warning(f"LLM提取需求失败: {e},使用规则提取") + return self._rule_extract_requirements(section, req_type) + + return requirements + + def _rule_extract_requirements(self, section: Section, req_type: str) -> List[Requirement]: + """使用规则提取需求(备用方法)""" + requirements = [] + content = section.content + + # 正文需求 + descriptions = [] + if content and len(content.strip()) >= 8: + descriptions = self._extract_list_items(content) + + if not descriptions: + # 如果没有列表项,将整个内容作为一个需求 + desc = self._clean_description(content) + if len(desc) > 5: + descriptions = [f"{section.title}:{desc}"] + + # 表格需求 + table_requirements = self._extract_requirements_from_tables_rule(section.tables) + + # 查找父需求编号(第一个合法完整编号) + parent_req_id = "" + complete_id_pattern = r'^[A-Za-z0-9]{2,10}[-_].+$' + for desc in descriptions: + temp_id, _ = self._extract_requirement_id_from_text(desc) + # 验证是否为合法的完整编号格式 + if temp_id and re.match(complete_id_pattern, temp_id): + parent_req_id = temp_id.replace('_', '-') + break + if not parent_req_id: + for temp_id, _ in table_requirements: + # 验证是否为合法的完整编号格式 + if temp_id and re.match(complete_id_pattern, temp_id): + parent_req_id = temp_id.replace('_', '-') + break + + index = 1 + for desc in descriptions: + desc = self._clean_description(desc) + if len(desc) > 5: + doc_req_id, cleaned_desc = self._extract_requirement_id_from_text(desc) + # 生成最终的需求ID(三级优先级) + req_id = self._generate_requirement_id(req_type, section.number, index, doc_req_id, parent_req_id) + req = Requirement( + req_id=req_id, + description=cleaned_desc, + req_type=req_type, + section_number=section.number, + section_title=section.title, + section_uid=section.uid + ) + requirements.append(req) + index += 1 + + for doc_req_id, desc in table_requirements: + # 生成最终的需求ID(三级优先级) + req_id = self._generate_requirement_id(req_type, section.number, index, doc_req_id, parent_req_id) + req = Requirement( + req_id=req_id, + description=desc, + req_type=req_type, + section_number=section.number, + section_title=section.title, + section_uid=section.uid + ) + requirements.append(req) + index += 1 + + return requirements + + def _extract_list_items(self, content: str) -> List[str]: + """提取列表项""" + items = [] + + # 模式1: a) b) c) 或 1) 2) 3) + patterns = [ + r'([a-z][\))])\s*(.+?)(?=[a-z][\))]|$)', + r'(\d+[\))])\s*(.+?)(?=\d+[\))]|$)', + r'([①②③④⑤⑥⑦⑧⑨⑩])\s*(.+?)(?=[①②③④⑤⑥⑦⑧⑨⑩]|$)' + ] + + for pattern in patterns: + matches = re.findall(pattern, content, re.DOTALL) + if matches: + for marker, text in matches: + text = text.strip() + if text and len(text) > 5: + items.append(text) + break + + return items + + def _identify_requirement_type(self, title: str, content: str) -> str: + """ + 通过标题和内容识别需求类型 + + 根据章节标题和内容判断需求类型: + - 标题或内容中包含"接口"相关词汇 -> 接口需求 + - 其他情况 -> 功能需求(默认) + + 注意:不能仅靠标题判断是否为功能需求,若无法识别具体类型,默认为功能需求 + """ + title_lower = title.lower() + content_lower = (content or "").lower()[:500] # 只检查前500字符 + combined_text = title_lower + " " + content_lower + + # 优先识别接口需求,根据具体文件情况修改关键词 + interface_keywords = ['接口', 'interface', 'api', '串口', '通信协议', '数据交换'] + for keyword in interface_keywords: + if keyword in combined_text: + return 'interface' + + # 默认为功能需求(不能仅靠标题判断,无法识别时默认为功能需求) + return 'functional' + + def _generate_requirement_id(self, req_type: str, section_number: str, index: int, + doc_req_id: str = "", parent_req_id: str = "") -> str: + """ + 生成需求ID(三级优先级) + + 优先级规则: + 1. 如果doc_req_id是合法的完整编号(以2-10个字母或数字开头,后跟分隔符),直接使用 + 例如: NY01-01、FR-3.1.2-1、AIRSAT07-GD03-04 + 2. 如果doc_req_id是代号/序号,且有parent_req_id,则组合 + 格式: {parent_req_id}-{doc_req_id},例如: NY01-01-K101 + 3. 否则自动生成 + 格式: {PREFIX}-{section_number}-{index},例如: IR-4.1.1-1(保留章节号中的点号) + + Args: + req_type: 需求类型 + section_number: 章节编号 + index: 序号 + doc_req_id: 文档中提取的编号/代号 + parent_req_id: 父需求编号(用于子需求) + """ + # 优先级1:合法的完整编号(以2-10个字母或数字开头,后跟分隔符) + if doc_req_id: + # 检查是否为合法的完整编号格式:2-10个字母或数字开头 + 分隔符 + 其他字符 + # 例如: NY01-01、FR-3.1.2-1、AIRSAT07-GD03-04 + complete_id_pattern = r'^[A-Za-z0-9]{2,10}[-_].+$' + if re.match(complete_id_pattern, doc_req_id): + return doc_req_id.replace('_', '-') + + # 优先级2:代号/序号 + 父需求编号 + if doc_req_id and parent_req_id: + return f"{parent_req_id}-{doc_req_id}" + + # 优先级3:自动生成(保留章节号中的点号) + prefix = self.TYPE_PREFIX.get(req_type, 'FR') # 默认FR(功能需求) + section_part = section_number if section_number else "NA" + return f"{prefix}-{section_part}-{index}" + + def _normalize_req_id(self, req_id: str) -> str: + """规范化需求编号""" + if not req_id: + return "" + req_id = str(req_id).strip() + return req_id + + def _clean_description(self, text: str) -> str: + """清理需求描述""" + # 替换换行符为空格 + text = re.sub(r'\n+', ' ', text) + # 替换多个空格为单个空格 + text = re.sub(r'\s+', ' ', text) + # 去除表格噪声 + text = re.sub(r'[\|│┃]+', ' ', text) + # 去除首尾空白 + text = text.strip() + # 限制长度 + if len(text) > 1000: + text = text[:1000] + '...' + return text + + def _format_tables_for_prompt(self, tables: List[List[List[str]]]) -> str: + """格式化表格内容用于LLM提示词""" + if not tables: + return "" + lines = [] + for idx, table in enumerate(tables, 1): + lines.append(f"表格{idx}:") + for row in table: + row_text = " | ".join(self._clean_description(cell) for cell in row if cell) + if row_text: + lines.append(row_text) + return "\n".join(lines) + + def _extract_requirement_id_from_text(self, text: str) -> Tuple[Optional[str], str]: + """ + 从文本中提取需求编号 + + 支持的格式: + 1. 完整编号:NY01-01、FR-3.1.2-1 + 2. 代号/序号:K101、D61、a)、1) + """ + if not text: + return None, text + + # 模式1:完整需求编号(如 NY01-01、FR-3.1.2-1) + pattern1 = r'^\s*([A-Za-z]{2,6}[-_]\d+(?:[-.\d]+)*)\s*[::\)\]】]?\s*(.+)$' + match = re.match(pattern1, text) + if match: + return match.group(1).strip(), match.group(2).strip() + + # 模式2:代号(如 K101、D61) + pattern2 = r'^\s*([A-Za-z]\d+)\s*[::\)\]】]?\s*(.+)$' + match = re.match(pattern2, text) + if match: + return match.group(1).strip(), match.group(2).strip() + + # 模式3:序号(如 a)、1)) + pattern3 = r'^\s*([a-z0-9]{1,2}[\))])\s*(.+)$' + match = re.match(pattern3, text) + if match: + code = match.group(1).strip().rstrip('))') + return code, match.group(2).strip() + + return None, text + + def _extract_requirements_from_tables_rule(self, tables: List[List[List[str]]]) -> List[Tuple[Optional[str], str]]: + """从表格中提取需求(规则方式)""" + results = [] + if not tables: + return results + + id_keywords = ['需求编号', '编号', '序号', 'id', 'ID'] + desc_keywords = ['需求', '描述', '内容', '说明', '要求'] + + for table in tables: + if not table: + continue + header = table[0] if table else [] + header_lower = [h.lower() for h in header] + id_idx = None + desc_idx = None + for i, h in enumerate(header_lower): + if any(k.lower() in h for k in id_keywords): + id_idx = i + if any(k.lower() in h for k in desc_keywords): + desc_idx = i + + start_row = 1 if (id_idx is not None or desc_idx is not None) else 0 + for row in table[start_row:]: + if not row: + continue + row = [self._clean_description(cell) for cell in row] + if not any(row): + continue + + req_id = None + desc = "" + if id_idx is not None and id_idx < len(row): + req_id = self._normalize_req_id(row[id_idx]) + if desc_idx is not None and desc_idx < len(row): + desc = row[desc_idx] + if not desc: + # 如果无明确描述列,拼接整行作为描述 + desc = " | ".join([cell for cell in row if cell]) + + # 若描述里包含编号,尝试再次提取 + if not req_id: + req_id, desc = self._extract_requirement_id_from_text(desc) + + if desc and len(desc) > 5: + results.append((req_id, desc)) + + return results + + def _parse_llm_json_response(self, response: str) -> Optional[Dict]: + """解析LLM的JSON响应""" + try: + return json.loads(response) + except json.JSONDecodeError: + # 尝试提取JSON代码块 + try: + json_match = re.search(r'```(?:json)?\s*(.*?)\s*```', response, re.DOTALL) + if json_match: + return json.loads(json_match.group(1)) + + # 尝试查找JSON对象 + json_match = re.search(r'\{.*\}', response, re.DOTALL) + if json_match: + return json.loads(json_match.group(0)) + except Exception: + pass + + logger.warning(f"无法解析LLM响应为JSON: {response[:200]}") + return None + + def get_statistics(self) -> Dict: + """获取需求统计信息""" + stats = { + 'total': len(self.requirements), + 'by_type': {} + } + + for req in self.requirements: + req_type = req.type + if req_type not in stats['by_type']: + stats['by_type'][req_type] = 0 + stats['by_type'][req_type] += 1 + + return stats diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..53e5a65 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,134 @@ +# src/utils.py +""" +工具函数模块 - 提供各种辅助功能 +""" + +import os +import logging +from pathlib import Path +from typing import Dict, Any, List, Optional +import yaml + +logger = logging.getLogger(__name__) + + +def load_config(config_path: str = None) -> Dict[str, Any]: + """ + 加载配置文件 + + Args: + config_path: 配置文件路径,如果为None则使用默认路径 + + Returns: + 配置字典 + """ + if config_path is None: + config_path = os.path.join(os.path.dirname(__file__), '..', 'config.yaml') + + if not os.path.exists(config_path): + logger.warning(f"配置文件不存在: {config_path}") + return {} + + try: + with open(config_path, 'r', encoding='utf-8') as f: + config = yaml.safe_load(f) + logger.info(f"成功加载配置文件: {config_path}") + return config or {} + except Exception as e: + logger.error(f"加载配置文件失败: {e}") + return {} + + +def setup_logging(config: Dict[str, Any]) -> None: + """ + 配置日志系统 + + Args: + config: 配置字典 + """ + logging_config = config.get('logging', {}) + level = logging_config.get('level', 'INFO') + log_format = logging_config.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + log_file = logging_config.get('file', None) + + # 创建logger + logging.basicConfig( + level=getattr(logging, level), + format=log_format, + handlers=[ + logging.StreamHandler(), + logging.FileHandler(log_file) if log_file else logging.NullHandler() + ] + ) + + +def validate_file_path(file_path: str, allowed_extensions: List[str] = None) -> bool: + """ + 验证文件路径的合法性 + + Args: + file_path: 文件路径 + allowed_extensions: 允许的文件扩展名列表(如['.pdf', '.docx']) + + Returns: + 文件是否合法 + """ + if not os.path.exists(file_path): + logger.error(f"文件不存在: {file_path}") + return False + + if not os.path.isfile(file_path): + logger.error(f"路径不是文件: {file_path}") + return False + + if allowed_extensions: + ext = Path(file_path).suffix.lower() + if ext not in allowed_extensions: + logger.error(f"不支持的文件格式: {ext}") + return False + + return True + + +def ensure_directory_exists(directory: str) -> bool: + """ + 确保目录存在,如果不存在则创建 + + Args: + directory: 目录路径 + + Returns: + 目录是否存在或创建成功 + """ + try: + Path(directory).mkdir(parents=True, exist_ok=True) + return True + except Exception as e: + logger.error(f"创建目录失败: {e}") + return False + + +def get_env_or_config(env_var: str, config_dict: Dict[str, Any], + default: Any = None) -> Any: + """ + 优先从环境变量读取,其次从配置字典读取 + + Args: + env_var: 环境变量名 + config_dict: 配置字典 + default: 默认值 + + Returns: + 获取到的值 + """ + # 尝试从环境变量读取 + env_value = os.environ.get(env_var) + if env_value: + return env_value + + # 尝试从配置字典读取 + config_value = config_dict.get(env_var) + if config_value and not config_value.startswith('${'): + return config_value + + return default