Files
Extract_reqs/src/document_parser.py

837 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
文档解析模块 - LLM增强版
支持PDF和Docx格式针对GJB438B标准SRS文档优化
"""
import re
import logging
import importlib
from abc import ABC, abstractmethod
from typing import List, Dict, Tuple, Optional, Any
from pathlib import Path
try:
from docx import Document
HAS_DOCX = True
except ImportError:
HAS_DOCX = False
try:
import PyPDF2
HAS_PDF = True
except ImportError:
HAS_PDF = False
HAS_PDF_TABLE = importlib.util.find_spec("pdfplumber") is not None
logger = logging.getLogger(__name__)
class Section:
"""表示文档中的一个章节"""
def __init__(self, level: int, title: str, number: str = None, content: str = "", uid: str = ""):
self.level = level
self.title = title
self.number = number
self.content = content
self.uid = uid
self.parent = None
self.children = []
self.tables = []
self.blocks = []
def add_child(self, child: 'Section') -> None:
self.children.append(child)
child.parent = self
def add_content(self, text: str) -> None:
text = (text or "").strip()
if not text:
return
if self.content:
self.content += "\n" + text
else:
self.content = text
self.blocks.append({"type": "text", "text": text})
def add_table(self, table_data: List[List[str]]) -> None:
if not table_data:
return
self.tables.append(table_data)
table_index = len(self.tables) - 1
self.blocks.append({"type": "table", "table_index": table_index, "table": table_data})
def generate_auto_number(self, parent_number: str = "", sibling_index: int = 1) -> None:
"""
自动生成章节编号(当章节没有编号时)
Args:
parent_number: 父章节编号
sibling_index: 在同级章节中的序号从1开始
"""
if not self.number:
if parent_number:
self.number = f"{parent_number}.{sibling_index}"
else:
self.number = str(sibling_index)
def __repr__(self) -> str:
return f"Section(level={self.level}, number='{self.number}', title='{self.title}')"
class DocumentParser(ABC):
"""文档解析器基类"""
def __init__(self, file_path: str):
self.file_path = file_path
self.sections: List[Section] = []
self.document_title = ""
self.raw_text = ""
self.llm = None
self._uid_counter = 0
def set_llm(self, llm) -> None:
"""设置LLM实例"""
self.llm = llm
@abstractmethod
def parse(self) -> List[Section]:
pass
def get_document_title(self) -> str:
return self.document_title
def _next_uid(self) -> str:
self._uid_counter += 1
return f"sec-{self._uid_counter}"
def _auto_number_sections(self, sections: List[Section], parent_number: str = "") -> None:
"""
为没有编号的章节自动生成编号
规则使用Word样式确定级别跳过前置章节目录、概述等
从第一个正文章节(如"外部接口"开始编号为1
Args:
sections: 章节列表
parent_number: 父章节编号
"""
if not sections:
return
# 仅为缺失编号的章节补号;已存在的文档原始编号必须保留。
sibling_index = 0
for section in sections:
has_number = bool((section.number or "").strip()) and not self._is_chinese_number(section.number)
if not has_number:
sibling_index += 1
section.generate_auto_number(parent_number, sibling_index)
if section.children:
self._auto_number_sections(section.children, section.number)
def _is_chinese_number(self, text: str) -> bool:
"""检查是否是中文数字编号"""
chinese_numbers = '一二三四五六七八九十百千万'
return text and all(c in chinese_numbers for c in text)
class DocxParser(DocumentParser):
"""DOCX格式文档解析器"""
def __init__(self, file_path: str):
if not HAS_DOCX:
raise ImportError("python-docx库未安装请运行: pip install python-docx")
super().__init__(file_path)
self.document = None
def parse(self) -> List[Section]:
try:
self.document = Document(self.file_path)
self.document_title = self.document.core_properties.title or "SRS Document"
section_stack = {}
for block in self._iter_block_items(self.document):
from docx.text.paragraph import Paragraph
from docx.table import Table
if isinstance(block, Paragraph):
text = block.text.strip()
if not text:
continue
heading_info = self._parse_heading(block, text)
if heading_info:
number, title, level = heading_info
section = Section(level=level, title=title, number=number, uid=self._next_uid())
if level == 1 or not section_stack:
self.sections.append(section)
section_stack = {1: section}
else:
parent_level = level - 1
while parent_level >= 1 and parent_level not in section_stack:
parent_level -= 1
if parent_level >= 1 and parent_level in section_stack:
section_stack[parent_level].add_child(section)
elif self.sections:
self.sections[-1].add_child(section)
section_stack[level] = section
for l in list(section_stack.keys()):
if l > level:
del section_stack[l]
else:
# 添加内容到当前章节
if section_stack:
max_level = max(section_stack.keys())
section_stack[max_level].add_content(text)
else:
# 没有标题时,创建默认章节
default_section = Section(level=1, title="未命名章节", number="", uid=self._next_uid())
default_section.add_content(text)
self.sections.append(default_section)
section_stack = {1: default_section}
elif isinstance(block, Table):
# 表格处理
table_data = self._extract_table_data(block)
if table_data:
if section_stack:
max_level = max(section_stack.keys())
section_stack[max_level].add_table(table_data)
else:
default_section = Section(level=1, title="未命名章节", number="", uid=self._next_uid())
default_section.add_table(table_data)
self.sections.append(default_section)
section_stack = {1: default_section}
# 为没有编号的章节自动生成编号
self._auto_number_sections(self.sections)
logger.info(f"完成Docx解析提取{len(self.sections)}个顶级章节")
return self.sections
except Exception as e:
logger.error(f"解析Docx文档失败: {e}")
raise
def _is_valid_heading(self, text: str) -> bool:
"""检查是否是有效的标题"""
if len(text) > 120 or '...' in text:
return False
# 标题应包含中文或字母
if not re.search(r'[\u4e00-\u9fa5A-Za-z]', text):
return False
# 过滤目录项(标题后跟页码,如"概述 2"或"概述 . . . . 2"
if re.search(r'\s{2,}\d+$', text): # 多个空格后跟数字结尾
return False
if re.search(r'[\.。\s]+\d+$', text): # 点号或空格后跟数字结尾
return False
return True
def _parse_heading(self, paragraph, text: str) -> Optional[Tuple[str, str, int]]:
"""解析标题,返回(编号, 标题, 级别)"""
style_name = paragraph.style.name if paragraph.style else ""
is_heading_style = style_name.lower().startswith('heading') if style_name else False
# 数字编号标题
match = re.match(r'^(\d+(?:\.\d+)*)\s*[\.、]?\s*(.+)$', text)
if match and self._is_valid_heading(match.group(2)):
number = match.group(1)
title = match.group(2).strip()
level = len(number.split('.'))
return number, title, level
# 中文编号标题
match = re.match(r'^([一二三四五六七八九十]+)[、\.]+\s*(.+)$', text)
if match and self._is_valid_heading(match.group(2)):
number = match.group(1)
title = match.group(2).strip()
level = 1
return number, title, level
# 样式标题
if is_heading_style and self._is_valid_heading(text):
level = 1
level_match = re.search(r'(\d+)', style_name)
if level_match:
level = int(level_match.group(1))
return "", text, level
return None
def _iter_block_items(self, parent):
"""按文档顺序迭代段落和表格"""
from docx.text.paragraph import Paragraph
from docx.table import Table
from docx.oxml.text.paragraph import CT_P
from docx.oxml.table import CT_Tbl
for child in parent.element.body.iterchildren():
if isinstance(child, CT_P):
yield Paragraph(child, parent)
elif isinstance(child, CT_Tbl):
yield Table(child, parent)
def _extract_table_data(self, table) -> List[List[str]]:
"""提取表格数据"""
table_data = []
for row in table.rows:
row_data = []
for cell in row.cells:
text = cell.text.replace('\n', ' ').strip()
text = re.sub(r'\s+', ' ', text)
row_data.append(text)
if any(cell for cell in row_data):
table_data.append(row_data)
return table_data
class PDFParser(DocumentParser):
"""PDF格式文档解析器 - LLM增强版"""
# GJB438B标准SRS文档的有效章节标题关键词
VALID_TITLE_KEYWORDS = [
'范围', '标识', '概述', '引用', '文档',
'需求', '功能', '接口', '性能', '安全', '保密',
'环境', '资源', '质量', '设计', '约束',
'人员', '培训', '保障', '验收', '交付', '包装',
'优先', '关键', '合格', '追踪', '注释',
'CSCI', '计算机', '软件', '硬件', '通信', '通讯',
'数据', '适应', '可靠', '内部', '外部',
'描述', '要求', '规定', '说明', '定义'
]
TOP_LEVEL_TITLE_KEYWORDS = [
'范围', '标识', '概述', '引用', '文档', '需求', '接口', '性能',
'安全', '保密', '环境', '资源', '质量', '设计', '约束', '验收',
'交付', '包装', '注释'
]
# 明显无效的章节标题模式(噪声)
INVALID_TITLE_PATTERNS = [
'本文档可作为', '参比电位', '补偿电流', '以太网',
'电源', '软件接', '功能\\', '性能 \\', '输入/输出 \\',
'数据处理要求 \\', '固件 \\', '质量控制要求',
'信安科技', '浙江', '公司'
]
def __init__(self, file_path: str):
if not HAS_PDF:
raise ImportError("PyPDF2库未安装请运行: pip install PyPDF2")
super().__init__(file_path)
self.document_title = "SRS Document"
self._page_texts: List[str] = []
def parse(self) -> List[Section]:
"""解析PDF文档"""
try:
# 1. 提取所有文本
self.raw_text = self._extract_all_text()
# 2. 清洗文本
cleaned_text = self._clean_text(self.raw_text)
# 3. 识别章节结构
self.sections = self._parse_sections(cleaned_text)
# 4. 使用LLM验证和清理章节如果可用
if self.llm:
self.sections = self._llm_validate_sections(self.sections)
# 章节识别失败时,创建兜底章节避免后续表格数据丢失。
if not self.sections:
fallback = Section(level=1, title="未命名章节", number="1", uid=self._next_uid())
if cleaned_text:
fallback.add_content(cleaned_text)
self.sections = [fallback]
# 5. 提取并挂接PDF表格到章节若依赖可用
pdf_tables = self._extract_pdf_tables()
if pdf_tables:
self._attach_pdf_tables_to_sections(pdf_tables)
# 6. 为没有编号的章节自动生成编号
self._auto_number_sections(self.sections)
logger.info(f"完成PDF解析提取{len(self.sections)}个顶级章节")
return self.sections
except Exception as e:
logger.error(f"解析PDF文档失败: {e}")
raise
def _extract_all_text(self) -> str:
"""从PDF提取所有文本"""
all_text = []
with open(self.file_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
for page in pdf_reader.pages:
text = page.extract_text()
if text:
all_text.append(text)
self._page_texts = all_text
return '\n'.join(all_text)
def _extract_pdf_tables(self) -> List[Dict[str, Any]]:
"""提取PDF中的表格数据。"""
if not HAS_PDF_TABLE:
logger.warning("未安装pdfplumber跳过PDF表格提取。可执行: pip install pdfplumber")
return []
tables: List[Dict[str, Any]] = []
try:
pdfplumber = importlib.import_module("pdfplumber")
with pdfplumber.open(self.file_path) as pdf:
for page_idx, page in enumerate(pdf.pages):
page_text = ""
if page_idx < len(self._page_texts):
page_text = self._page_texts[page_idx]
table_objs = page.find_tables() or []
if table_objs:
extracted_tables = [(idx, t.extract(), t.bbox) for idx, t in enumerate(table_objs)]
else:
raw_tables = page.extract_tables() or []
extracted_tables = [(idx, t, None) for idx, t in enumerate(raw_tables)]
for table_idx, table, bbox in extracted_tables:
cleaned_table: List[List[str]] = []
for row in table or []:
cells = [re.sub(r'\s+', ' ', str(cell or '')).strip() for cell in row]
# 只要存在非空单元格就保留,避免有效行被误丢弃。
if any(cells):
cleaned_table.append(cells)
if cleaned_table:
section_hint = ""
if bbox:
try:
top = float(bbox[1])
text_above = page.crop((0, 0, page.width, top)).extract_text() or ""
section_hint = self._find_last_section_number(text_above)
except Exception:
section_hint = ""
table_ref = self._extract_table_reference(cleaned_table)
tables.append(
{
"page_idx": page_idx,
"table_idx": table_idx,
"page_text": page_text,
"data": cleaned_table,
"section_hint": section_hint,
"table_ref": table_ref,
}
)
except Exception as e:
logger.warning(f"PDF表格提取失败继续纯文本流程: {e}")
return []
logger.info(f"PDF表格提取完成{len(tables)}个表格")
return tables
def _extract_table_reference(self, table: List[List[str]]) -> str:
"""从表格前几行中提取表号引用如“表3-5”。"""
if not table:
return ""
head_rows = table[:2]
merged = " ".join(" ".join(str(c or "") for c in row) for row in head_rows)
merged = re.sub(r"\s+", "", merged)
m = re.search(r"\s*(\d+(?:[-]\d+){1,3})", merged)
if not m:
return ""
return m.group(1).replace("", "-")
def _build_table_reference_index(self, sections: List[Section]) -> Dict[str, List[Section]]:
"""构建“表号 -> 章节”索引,用于优先精确挂接表格。"""
index: Dict[str, List[Section]] = {}
for section in sections:
content = re.sub(r"\s+", "", section.content or "")
for m in re.finditer(r"\s*(\d+(?:[-]\d+){1,3})", content):
ref = m.group(1).replace("", "-")
index.setdefault(ref, []).append(section)
return index
def _find_last_section_number(self, text: str) -> str:
"""从文本中提取最后出现的章节号。"""
if not text:
return ""
found = ""
for line in text.split("\n"):
line = line.strip()
if not line:
continue
section_info = self._match_section_header(line, set())
if section_info:
found = section_info[0]
return found
def _attach_pdf_tables_to_sections(self, tables: List[Dict[str, Any]]) -> None:
"""将提取出的PDF表格挂接到最匹配的章节。"""
flat_sections = self._flatten_sections(self.sections)
if not flat_sections:
return
section_by_number = {
(s.number or "").strip(): s
for s in flat_sections
if (s.number or "").strip()
}
table_ref_index = self._build_table_reference_index(flat_sections)
last_section: Optional[Section] = None
for table in tables:
target = None
table_ref = (table.get("table_ref") or "").strip()
if table_ref and table_ref in table_ref_index:
candidates = table_ref_index[table_ref]
# 同表号命中多个章节时,优先更深层章节,避免父级“汇总章节”抢占。
target = max(candidates, key=lambda s: (s.level, len(s.content or "")))
section_hint = (table.get("section_hint") or "").strip()
if not target and section_hint and section_hint in section_by_number:
target = section_by_number[section_hint]
if not target:
target = self._match_table_section(table.get("page_text", ""), flat_sections)
# 兜底优先使用上一个命中章节,避免错误挂到首章节造成跨章污染。
if not target:
target = last_section
if not target:
logger.warning(
"未定位到表格归属章节,跳过: page=%s table=%s",
table.get("page_idx", -1),
table.get("table_idx", -1),
)
continue
target.add_table(table["data"])
last_section = target
def _flatten_sections(self, sections: List[Section]) -> List[Section]:
"""按文档顺序拉平章节树。"""
result: List[Section] = []
for section in sections:
result.append(section)
if section.children:
result.extend(self._flatten_sections(section.children))
return result
def _match_table_section(self, page_text: str, sections: List[Section]) -> Optional[Section]:
"""基于页文本匹配表格归属章节。"""
normalized_page = re.sub(r"\s+", "", (page_text or "")).lower()
if not normalized_page:
return None
matched: Optional[Section] = None
matched_score = (-1, -1)
for section in sections:
title = (section.title or "").strip()
if not title:
continue
number = (section.number or "").strip()
candidates = [title]
if number:
candidates.append(f"{number}{title}")
candidates.append(f"{number} {title}")
for candidate in candidates:
normalized_candidate = re.sub(r"\s+", "", candidate).lower()
if normalized_candidate and normalized_candidate in normalized_page:
score = (len(normalized_candidate), section.level)
if score > matched_score:
matched = section
matched_score = score
return matched
def _clean_text(self, text: str) -> str:
"""清洗PDF提取的文本"""
lines = text.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if not line:
continue
# 跳过页码通常是1-3位数字单独一行
if re.match(r'^\d{1,3}$', line):
continue
# 跳过目录行
if line.count('.') > 10 and '...' in line:
continue
cleaned_lines.append(line)
return '\n'.join(cleaned_lines)
def _parse_sections(self, text: str) -> List[Section]:
"""解析章节结构"""
sections = []
section_stack = {}
lines = text.split('\n')
current_section = None
content_buffer = []
found_sections = set()
last_top_level_number = 0
for line in lines:
line = line.strip()
if not line:
continue
# 尝试匹配章节标题
section_info = self._match_section_header(line, found_sections)
if section_info:
number, title = section_info
level = len(number.split('.'))
top_level_number = int(number.split('.')[0])
# 顶级章节序号大幅跳跃通常是误识别如正文中的“8 表...”)。
if level == 1 and last_top_level_number and top_level_number > last_top_level_number + 1:
if line and not self._is_noise(line):
content_buffer.append(line)
continue
# 顶级章节编号倒退通常是正文枚举项被误识别如“1 综合监控...”)。
if level == 1 and last_top_level_number and top_level_number < last_top_level_number:
if line and not self._is_noise(line):
content_buffer.append(line)
continue
if level > 6:
continue
# 保存之前章节的内容
if current_section and content_buffer:
current_section.add_content('\n'.join(content_buffer))
content_buffer = []
# 创建新章节
section = Section(level=level, title=title, number=number, uid=self._next_uid())
found_sections.add(number)
# 建立层次结构
if level == 1:
sections.append(section)
section_stack = {1: section}
last_top_level_number = top_level_number
else:
parent_level = level - 1
while parent_level >= 1 and parent_level not in section_stack:
parent_level -= 1
if parent_level >= 1 and parent_level in section_stack:
section_stack[parent_level].add_child(section)
elif sections:
sections[-1].add_child(section)
else:
sections.append(section)
section_stack = {1: section}
section_stack[level] = section
for l in list(section_stack.keys()):
if l > level:
del section_stack[l]
# 若出现层级跳跃如1->3自动回退到父级+1。
if level > 1 and (level - 1) not in section_stack:
section.level = max(section_stack.keys()) if section_stack else 1
current_section = section
else:
# 收集内容
if line and not self._is_noise(line):
content_buffer.append(line)
# 保存最后一个章节的内容
if current_section and content_buffer:
current_section.add_content('\n'.join(content_buffer))
return sections
def _match_section_header(self, line: str, found_sections: set) -> Optional[Tuple[str, str]]:
"""
匹配章节标题
Returns:
(章节编号, 章节标题) 或 None
"""
# 模式: "3.1 功能需求" / "3.1.2 电场..."
match = re.match(r'^(\d+(?:\.\d+)*)[\s、.)]*(.+)$', line)
if not match:
return None
number = match.group(1)
title = match.group(2).strip()
level = len(number.split('.'))
# 排除目录行
if '...' in title or title.count('.') > 5:
return None
# 验证章节编号
parts = number.split('.')
first_part = int(parts[0])
# 放宽一级章节编号范围非严格GJB结构
if first_part < 1 or first_part > 30:
return None
# 检查子部分是否合理
for part in parts[1:]:
if int(part) > 20:
return None
# 避免重复
if number in found_sections:
return None
# 标题长度检查
if len(title) > 60 or len(title) < 2:
return None
# 过滤更像正文描述的句式。
if self._looks_like_statement(title):
return None
# 过滤疑似正文句子(含句号/分号且过长)。
if len(title) > 24 and re.search(r'[。;;]', title):
return None
# 过滤指令拼接噪声标题(逗号过多通常是正文残片)。
if title.count('') >= 2 and len(title) > 20:
return None
# 放宽标题字符要求兼容部分PDF字体导致中文抽取异常的情况
if not re.search(r'[\u4e00-\u9fa5A-Za-z]', title):
return None
# 检查是否包含无效模式
for invalid_pattern in self.INVALID_TITLE_PATTERNS:
if invalid_pattern in title:
return None
# 标题不能以数字开头
if title[0].isdigit():
return None
# 数字比例检查
digit_ratio = sum(c.isdigit() for c in title) / max(len(title), 1)
if digit_ratio > 0.3:
return None
# 检查标题是否包含反斜杠(通常是表格噪声)
if '\\' in title and '需求' not in title:
return None
# 常见有效标题关键词兜底,降低正文被识别为标题的概率。
if not any(k in title for k in self.VALID_TITLE_KEYWORDS):
return None
# 顶级章节标题需符合SRS结构性关键词避免“综合监控”“电场”等正文短语被识别。
if level == 1 and not any(k in title for k in self.TOP_LEVEL_TITLE_KEYWORDS):
return None
return (number, title)
def _looks_like_statement(self, title: str) -> bool:
"""判断标题是否更像正文语句而非章节名。"""
if not title:
return False
statement_hints = ["", "能够", "可以", "进行", "通过", "", "同时", "", "如果", ""]
if any(h in title for h in statement_hints):
return True
if len(title) > 24 and re.search(r'[,。;;:]', title):
return True
return False
def _is_noise(self, line: str) -> bool:
"""检查是否是噪声行"""
# 纯数字行
if re.match(r'^[\d\s,.]+$', line):
return True
# 非常短的行
if len(line) < 3:
return True
# 罗马数字
if re.match(r'^[ivxIVX]+$', line):
return True
return False
def _llm_validate_sections(self, sections: List[Section]) -> List[Section]:
"""使用LLM验证章节是否有效"""
if not self.llm:
return sections
validated_sections = []
for section in sections:
# 验证顶级章节
if self._is_valid_section_with_llm(section):
# 递归验证子章节
section.children = self._validate_children(section.children)
validated_sections.append(section)
return validated_sections
def _validate_children(self, children: List[Section]) -> List[Section]:
"""递归验证子章节"""
validated = []
for child in children:
if self._is_valid_section_with_llm(child):
child.children = self._validate_children(child.children)
validated.append(child)
return validated
def _is_valid_section_with_llm(self, section: Section) -> bool:
"""使用LLM判断章节是否有效"""
# 先用规则快速过滤明显无效的章节
invalid_titles = [
'本文档可作为', '故障', '实时', '输入/输出',
'固件', '功能\\', '\\4.', '\\3.'
]
for invalid in invalid_titles:
if invalid in section.title:
logger.debug(f"过滤无效章节: {section.number} {section.title}")
return False
# 对于需求相关章节第3章额外验证
if section.number and section.number.startswith('3'):
# 检查标题是否看起来像是有效的需求章节标题
# 有效的标题应该是完整的中文短语
if '\\' in section.title or '/' in section.title:
if not any(kw in section.title for kw in ['输入', '输出', '接口']):
return False
return True
def create_parser(file_path: str) -> DocumentParser:
"""
工厂函数:根据文件扩展名创建相应的解析器
"""
ext = Path(file_path).suffix.lower()
if ext == '.docx':
return DocxParser(file_path)
elif ext == '.pdf':
return PDFParser(file_path)
else:
raise ValueError(f"不支持的文件格式: {ext}")