Files
linux_format_docs_check/app/review_filler.py

568 lines
21 KiB
Python
Raw Normal View History

from __future__ import annotations
import json
import re
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Literal
from docx import Document
from docx.oxml.ns import qn
from docx.table import Table as DocxTable
ReviewResult = Literal["通过", "未通过", "不适用"]
EvidencePolarity = Literal["positive", "negative", "manual", "suggestion"]
@dataclass(frozen=True)
class ReviewCriterion:
table_heading: str
table_index: int
row_index: int
sequence: str
category: str
content: str
@dataclass(frozen=True)
class ReviewTable:
heading: str
table_index: int
header_row_index: int
sequence_col: int
category_col: int
content_col: int
pass_col: int
fail_col: int
na_col: int
criteria: list[ReviewCriterion]
@dataclass(frozen=True)
class AnalysisEvidence:
section: str
topic: str
text: str
polarity: EvidencePolarity
@dataclass(frozen=True)
class ParsedAnalysis:
path: str
source_filename: str
matched_skills: list[str]
full_text: str
evidences: list[AnalysisEvidence]
@dataclass(frozen=True)
class ReviewDecision:
criterion: ReviewCriterion
result: ReviewResult
confidence: float
reason: str
evidence: list[AnalysisEvidence]
@dataclass(frozen=True)
class ReviewFillResult:
analysis_markdown: str
review_docx: str
output_docx: str
target_heading: str
decisions: list[ReviewDecision]
SECTION_POLARITY: dict[str, EvidencePolarity] = {
"符合项": "positive",
"不符合项": "negative",
"缺失章节或缺失证据": "negative",
"整改建议": "suggestion",
"需人工复核事项": "manual",
}
SECTION_ALIASES = [
("不符合项", "不符合项"),
("符合项", "符合项"),
("缺失章节或缺失证据", "缺失章节或缺失证据"),
("缺失章节", "缺失章节或缺失证据"),
("需人工复核事项", "需人工复核事项"),
("整改建议", "整改建议"),
]
KEYWORD_GROUPS = {
"标识": {"完整标识", "标识号", "缩略名", "版本号", "发布号"},
"系统概述": {"系统概述", "用途", "一般特性", "安全性", "可靠性", "实时性", "技术风险"},
"文档概述": {"文档概述", "保密性", "安全保密"},
"引用文档": {"引用文档", "引用文件", "编号", "标题", "修订版", "日期"},
"状态方式": {"状态", "方式", "正常模式", "减载模式", "降级", "紧急"},
"能力需求": {"能力需求", "CSCI能力", "软件任务"},
"外部接口": {"外部接口", "接口图", "1553B", "CAN", "RS422"},
"内部接口": {"内部接口", "内部接口需求"},
"内部数据": {"内部数据", "数据结构", "全局变量", "数据字典"},
"适应性": {"适应性", "运行环境", "适配"},
"保密性": {"保密性", "保密"},
"安全性": {"安全性", "安全"},
"环境适应性": {"环境适应性", "环境"},
"计算机资源": {"计算机资源", "处理时间", "内存", "存储", "CPU", "资源预算"},
"人员训练": {"人员", "训练"},
"优先级": {"优先级", "关键性", "必须", "应当", "可选"},
"合格性规定": {"合格性规定", "检验方法", "测试", "演示", "推断"},
"需求可追踪性": {"需求可追踪性", "可追踪性", "追踪矩阵", "双向追踪", "追溯"},
"可验证": {"可验证", "可测试", "验证", "判据", "粒度"},
"准确清晰": {"准确", "清晰", "歧义", "明确", "二义性", "术语"},
"图表": {"", "图形", "流程图", "时序图", "图文"},
"一致性": {"一致", "冲突", "矛盾"},
}
def parse_analysis_markdown(path: Path | str) -> ParsedAnalysis:
markdown_path = Path(path)
text = markdown_path.read_text(encoding="utf-8")
source_filename = _extract_front_matter_value(text, "源文件")
matched = _extract_front_matter_value(text, "命中技能")
matched_skills = [item.strip() for item in matched.split(",") if item.strip() and item.strip() != ""]
model_text = text.split("## 模型分析原文", 1)[-1]
evidences = _extract_evidences(model_text)
return ParsedAnalysis(
path=str(markdown_path),
source_filename=source_filename,
matched_skills=matched_skills,
full_text=text,
evidences=evidences,
)
def extract_review_tables(docx_path: Path | str) -> list[ReviewTable]:
document = Document(docx_path)
table_objects = iter(document.tables)
review_tables: list[ReviewTable] = []
current_heading = ""
table_index = 0
for child in document._element.body.iterchildren():
if child.tag == qn("w:p"):
text = _paragraph_text(child)
if text:
current_heading = text
continue
if child.tag != qn("w:tbl"):
continue
table = next(table_objects)
parsed = _parse_review_table(table, current_heading, table_index)
if parsed is not None:
review_tables.append(parsed)
table_index += 1
return review_tables
def select_review_table(analysis: ParsedAnalysis, tables: list[ReviewTable]) -> ReviewTable:
candidates = [table for table in tables if "A.1" not in table.heading]
if not candidates:
raise ValueError("No review tables found after skipping A.1")
text = f"{analysis.source_filename}\n{' '.join(analysis.matched_skills)}\n{analysis.full_text}"
scores: list[tuple[int, ReviewTable]] = []
for table in candidates:
heading = table.heading
score = 0
if "A.2" in heading:
score += _term_score(text, ["需求规格说明", "SRS", "需求章节", "需求可追踪性", "合格性规定"])
score += 6 if any("requirement" in skill for skill in analysis.matched_skills) else 0
if "A.3" in heading:
score += _term_score(text, ["设计说明", "详细设计", "软件单元", "CSCI级设计决策"])
if "A.4" in heading:
score += _term_score(text, ["用户手册", "安装", "操作", "用户功能", "快速参考"])
scores.append((score, table))
scores.sort(key=lambda item: (-item[0], item[1].table_index))
best_score, best_table = scores[0]
if best_score <= 0:
raise ValueError("Unable to select a review table from the analysis report")
return best_table
def build_review_decisions(analysis: ParsedAnalysis, table: ReviewTable) -> list[ReviewDecision]:
decisions: list[ReviewDecision] = []
for criterion in table.criteria:
matched = _match_evidence(criterion, analysis.evidences)
decisions.append(_decide_review_result(criterion, matched))
return decisions
def build_review_decisions_for_tables(
analysis: ParsedAnalysis,
tables: list[ReviewTable],
) -> list[ReviewDecision]:
decisions: list[ReviewDecision] = []
for table in tables:
decisions.extend(build_review_decisions(analysis, table))
return decisions
def apply_review_decisions(
review_docx_path: Path | str,
decisions: list[ReviewDecision],
output_docx_path: Path | str,
mark: str = "",
) -> Path:
if not decisions:
raise ValueError("No review decisions to apply")
document = Document(review_docx_path)
tables_by_index = {index: table for index, table in enumerate(document.tables)}
grouped: dict[int, list[ReviewDecision]] = {}
for decision in decisions:
grouped.setdefault(decision.criterion.table_index, []).append(decision)
for table_index, table_decisions in grouped.items():
table = tables_by_index[table_index]
review_table = _parse_review_table(table, table_decisions[0].criterion.table_heading, table_index)
if review_table is None:
raise ValueError(f"Unable to parse review table {table_index}")
by_sequence = {decision.criterion.sequence: decision for decision in table_decisions}
result_cols = {
"通过": review_table.pass_col,
"未通过": review_table.fail_col,
"不适用": review_table.na_col,
}
for criterion in review_table.criteria:
decision = by_sequence.get(criterion.sequence)
if decision is None:
continue
row = table.rows[criterion.row_index]
selected_col = result_cols[decision.result]
for col in (review_table.pass_col, review_table.fail_col, review_table.na_col):
row.cells[col].text = mark if col == selected_col else ""
output_path = Path(output_docx_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
document.save(output_path)
return output_path
def validate_review_results(docx_path: Path | str, target_heading: str, mark: str = "") -> list[str]:
errors: list[str] = []
target = _find_table_by_heading(extract_review_tables(docx_path), target_heading)
if target is None:
return [f"未找到目标审查表:{target_heading}"]
document = Document(docx_path)
table = document.tables[target.table_index]
for criterion in target.criteria:
row = table.rows[criterion.row_index]
marks = [
row.cells[target.pass_col].text.strip(),
row.cells[target.fail_col].text.strip(),
row.cells[target.na_col].text.strip(),
]
if sum(1 for value in marks if value == mark) != 1:
errors.append(f"{target.heading} 序号 {criterion.sequence} 审查结果不是有且仅有一个勾:{marks}")
return errors
def validate_review_tables(docx_path: Path | str, target_headings: list[str], mark: str = "") -> list[str]:
errors: list[str] = []
for heading in target_headings:
errors.extend(validate_review_results(docx_path, heading, mark=mark))
return errors
def fill_review_docx_from_analysis(
analysis_markdown_path: Path | str,
review_docx_path: Path | str,
output_docx_path: Path | str,
target_heading: str | None = None,
) -> ReviewFillResult:
analysis = parse_analysis_markdown(analysis_markdown_path)
tables = extract_review_tables(review_docx_path)
target_tables = _target_review_tables(tables, target_heading)
if not target_tables:
raise ValueError(f"Unable to find target review table: {target_heading}")
decisions = build_review_decisions_for_tables(analysis, target_tables)
output_path = apply_review_decisions(review_docx_path, decisions, output_docx_path)
errors = validate_review_tables(output_path, [table.heading for table in target_tables])
if errors:
raise ValueError("; ".join(errors))
return ReviewFillResult(
analysis_markdown=str(analysis_markdown_path),
review_docx=str(review_docx_path),
output_docx=str(output_path),
target_heading="; ".join(table.heading for table in target_tables),
decisions=decisions,
)
def write_decisions_json(result: ReviewFillResult, output_path: Path | str) -> Path:
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(_jsonable(result), ensure_ascii=False, indent=2), encoding="utf-8")
return path
def _parse_review_table(table: DocxTable, heading: str, table_index: int) -> ReviewTable | None:
header = _find_review_header(table)
if header is None:
return None
header_row_index, sequence_col, category_col, content_col, pass_col, fail_col, na_col = header
criteria: list[ReviewCriterion] = []
for row_index in range(header_row_index + 1, len(table.rows)):
cells = table.rows[row_index].cells
if max(sequence_col, category_col, content_col, pass_col, fail_col, na_col) >= len(cells):
continue
sequence = _cell_text(cells[sequence_col])
if not sequence.isdigit():
continue
criteria.append(
ReviewCriterion(
table_heading=heading,
table_index=table_index,
row_index=row_index,
sequence=sequence,
category=_cell_text(cells[category_col]),
content=_cell_text(cells[content_col]),
)
)
if not criteria:
return None
return ReviewTable(
heading=heading,
table_index=table_index,
header_row_index=header_row_index,
sequence_col=sequence_col,
category_col=category_col,
content_col=content_col,
pass_col=pass_col,
fail_col=fail_col,
na_col=na_col,
criteria=criteria,
)
def _find_review_header(table: DocxTable) -> tuple[int, int, int, int, int, int, int] | None:
for row_index, row in enumerate(table.rows):
texts = [_cell_text(cell) for cell in row.cells]
if "序号" not in texts or "审查内容" not in texts:
continue
for option_row_index in range(row_index, min(row_index + 3, len(table.rows))):
option_texts = [_cell_text(cell) for cell in table.rows[option_row_index].cells]
if {"通过", "未通过", "不适用"}.issubset(option_texts):
return (
option_row_index,
texts.index("序号"),
texts.index("审查项") if "审查项" in texts else 1,
texts.index("审查内容"),
option_texts.index("通过"),
option_texts.index("未通过"),
option_texts.index("不适用"),
)
return None
def _extract_evidences(model_text: str) -> list[AnalysisEvidence]:
heading_re = re.compile(r"^###\s*(?:\d+[.、]\s*)?(.+?)\s*$")
current_section = ""
blocks: dict[str, list[str]] = {}
for line in model_text.splitlines():
match = heading_re.match(line.strip())
if match:
title = _normalize_section_title(match.group(1))
current_section = title if title in SECTION_POLARITY else ""
if current_section:
blocks.setdefault(current_section, [])
continue
if current_section:
blocks[current_section].append(line)
evidences: list[AnalysisEvidence] = []
for section, lines in blocks.items():
for item in _paragraph_items(lines):
topic = _extract_topic(item)
evidences.append(
AnalysisEvidence(
section=section,
topic=topic,
text=item,
polarity=SECTION_POLARITY[section],
)
)
return evidences
def _paragraph_items(lines: list[str]) -> list[str]:
items: list[str] = []
current: list[str] = []
bullet_re = re.compile(r"^\s*(?:[-*]|\d+[.、])\s+(.+)")
for raw_line in lines:
line = raw_line.strip()
if not line or line in {"---"}:
continue
bullet = bullet_re.match(line)
if bullet:
if current:
items.append(_clean_markdown(" ".join(current)))
current = [bullet.group(1)]
elif current:
current.append(line)
else:
current = [line]
if current:
items.append(_clean_markdown(" ".join(current)))
return [item for item in items if item]
def _match_evidence(criterion: ReviewCriterion, evidences: list[AnalysisEvidence]) -> list[AnalysisEvidence]:
keywords = _criterion_keywords(criterion)
scored: list[tuple[int, AnalysisEvidence]] = []
for evidence in evidences:
normalized = _normalize_text(f"{evidence.topic} {evidence.text}")
score = sum(1 for keyword in keywords if _normalize_text(keyword) in normalized)
if criterion.category == "准确性" and evidence.polarity in {"manual", "negative"}:
score += sum(1 for term in ["歧义", "明确", "可验证", "可测试", "粒度", "清晰"] if term in normalized)
if criterion.category == "一致性":
score += sum(1 for term in ["一致", "冲突", "矛盾"] if term in normalized)
if score >= 2:
scored.append((score, evidence))
scored.sort(key=lambda item: (-item[0], -_polarity_weight(item[1].polarity)))
return [evidence for _, evidence in scored[:5]]
def _decide_review_result(criterion: ReviewCriterion, evidences: list[AnalysisEvidence]) -> ReviewDecision:
negative = [item for item in evidences if item.polarity == "negative"]
manual = [item for item in evidences if item.polarity == "manual"]
positive = [item for item in evidences if item.polarity == "positive"]
if _is_not_applicable(criterion):
return ReviewDecision(criterion, "不适用", 0.7, "该审查内容不适用于当前自动选择的审查单。", evidences)
if negative:
return ReviewDecision(criterion, "未通过", 0.88, _reason_from_evidence("分析报告存在不符合或缺失证据", negative), evidences)
if criterion.category == "准确性" and manual:
return ReviewDecision(criterion, "未通过", 0.74, _reason_from_evidence("分析报告提示需要人工复核,不能作为通过证据", manual), evidences)
if criterion.category == "一致性" and not positive:
return ReviewDecision(criterion, "未通过", 0.62, "当前分析报告未提供足够的一致性通过证据。", evidences)
if positive:
return ReviewDecision(criterion, "通过", 0.78, _reason_from_evidence("分析报告提供了符合证据", positive), evidences)
if manual:
return ReviewDecision(criterion, "未通过", 0.65, _reason_from_evidence("分析报告提示需要人工复核", manual), evidences)
return ReviewDecision(criterion, "未通过", 0.5, "分析报告未提供充分通过证据。", evidences)
def _criterion_keywords(criterion: ReviewCriterion) -> set[str]:
keywords = {token for token in _tokenize(criterion.content) if re.search(r"[A-Za-z0-9]", token)}
for group_name, group_keywords in KEYWORD_GROUPS.items():
name_match = group_name in criterion.content and group_name != "标识"
if any(keyword in criterion.content for keyword in group_keywords) or name_match:
keywords.update(group_keywords)
if criterion.category == "准确性":
keywords.update({"准确", "清晰", "明确", "歧义", "可验证", "可测试", "粒度"})
elif criterion.category == "一致性":
keywords.update({"一致", "冲突", "矛盾"})
return {keyword for keyword in keywords if len(keyword) >= 2}
def _tokenize(text: str) -> list[str]:
ascii_tokens = re.findall(r"[A-Za-z][A-Za-z0-9_-]{1,}", text)
chinese_tokens = re.findall(r"[\u4e00-\u9fff]{2,}", text)
return ascii_tokens + chinese_tokens
def _extract_front_matter_value(text: str, label: str) -> str:
match = re.search(rf"^- {re.escape(label)}(.+)$", text, flags=re.MULTILINE)
return match.group(1).strip() if match else ""
def _normalize_section_title(title: str) -> str:
stripped = title.strip(" #:")
for key, value in SECTION_ALIASES:
if key in stripped:
return value
return stripped
def _extract_topic(text: str) -> str:
cleaned = text.strip()
cleaned = cleaned.lstrip("-*0123456789.、 ")
if "" in cleaned:
return cleaned.split("", 1)[0].strip()
if ":" in cleaned:
return cleaned.split(":", 1)[0].strip()
return cleaned[:30]
def _clean_markdown(text: str) -> str:
cleaned = re.sub(r"`([^`]+)`", r"\1", text)
cleaned = re.sub(r"\*\*([^*]+)\*\*", r"\1", cleaned)
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned.strip()
def _normalize_text(text: str) -> str:
return re.sub(r"[\s`*_#:,。,;()、\-]+", "", text).lower()
def _paragraph_text(element) -> str:
return "".join(text.text or "" for text in element.iter(qn("w:t"))).strip()
def _cell_text(cell) -> str:
return cell.text.strip().replace("\n", "")
def _term_score(text: str, terms: list[str]) -> int:
return sum(4 for term in terms if term in text)
def _polarity_weight(polarity: EvidencePolarity) -> int:
return {"negative": 4, "manual": 3, "positive": 2, "suggestion": 1}[polarity]
def _is_not_applicable(criterion: ReviewCriterion) -> bool:
content = criterion.content
if "用户" in criterion.table_heading and "CSCI" in content:
return True
return False
def _reason_from_evidence(prefix: str, evidences: list[AnalysisEvidence]) -> str:
if not evidences:
return prefix
text = evidences[0].text
return f"{prefix}{text[:160]}"
def _find_table_by_heading(tables: list[ReviewTable], heading: str | None) -> ReviewTable | None:
if not heading:
return None
for table in tables:
if heading in table.heading:
return table
return None
def _target_review_tables(tables: list[ReviewTable], target_heading: str | None) -> list[ReviewTable]:
if target_heading:
table = _find_table_by_heading(tables, target_heading)
return [table] if table is not None else []
return [table for table in tables if "A.1" not in table.heading]
def _jsonable(value):
if hasattr(value, "__dataclass_fields__"):
return {key: _jsonable(item) for key, item in asdict(value).items()}
if isinstance(value, list):
return [_jsonable(item) for item in value]
if isinstance(value, dict):
return {key: _jsonable(item) for key, item in value.items()}
return value