Compare commits
7 Commits
fea4f2b512
...
feature/ge
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92a9077f3c | ||
|
|
77b2d6a27d | ||
|
|
7222475b27 | ||
|
|
f9598333e4 | ||
|
|
3a1fb5d840 | ||
|
|
5850276db0 | ||
|
|
f60afe046a |
25
handoff-2026-05-26-10-56-09.md
Normal file
25
handoff-2026-05-26-10-56-09.md
Normal file
@@ -0,0 +1,25 @@
|
||||
# Handoff - 2026-05-26
|
||||
|
||||
## Completed Tasks
|
||||
- 将技能合集列表从固定常量改为动态扫描 `skills/` 下包含 `index.md` 的目录,解决后台新增 `skills/interesting_physics_skills` 后前端“技能合集”下拉框不显示的问题。
|
||||
- 新增 `GET /skill-collections` 接口,用于前端实时刷新可用技能合集列表。
|
||||
- 新增 `POST /skill-collections/upload` 接口,严格限制上传 `.zip` 技能合集压缩包,并按 `skills/<压缩包文件名去掉.zip>/` 的规范解压安装。
|
||||
- 为 zip 安装流程增加校验:有效 zip、根目录必须包含 `index.md`、拒绝路径穿越、拒绝空包、拒绝无有效 `SKILL.md` 的合集。
|
||||
- 在前端新增简洁的“上传技能合集”区域,上传成功后显示提醒,并立即刷新“技能合集”下拉框且选中新上传的合集。
|
||||
- 修正首页顶部技能合集数量展示,改为显示当前动态发现的合集数量。
|
||||
- 补充测试覆盖动态发现后台新增目录、上传 zip 后解压并进入列表、拒绝非 zip、拒绝非法路径 zip。
|
||||
- 执行验证:`python -m pytest -q` 通过,结果为 `17 passed in 6.91s`;`git diff --check` 通过。
|
||||
- 启动本地 FastAPI 服务用于页面验证,当前地址为 `http://127.0.0.1:8002`。
|
||||
|
||||
## Blockers
|
||||
- 当前 zip 格式按现有 `skills/GJB438B-2009_prd_skills.zip` 规范处理,即 `index.md` 必须位于压缩包根目录;如果后续需要支持“压缩包内再包一层目录”的格式,需要补充规范转换逻辑。
|
||||
- 上传同名合集时当前实现会用新解压内容替换 `skills/<合集名>/`,需要在后续产品设计中确认是否增加覆盖确认、版本备份或回滚能力。
|
||||
- 本地测试中 `fastapi.testclient.TestClient` 在当前环境会挂起,因此测试改为直接调用异步路由函数和安装函数;后续如升级依赖或调整测试环境,可再恢复端到端 HTTP 客户端测试。
|
||||
- 默认端口 `8000` 和 `8001` 启动失败,最终使用 `8002` 启动服务。
|
||||
|
||||
## Next Steps
|
||||
- 明天使用真实技能合集 zip 在浏览器中做一次完整手工验证:上传、成功提示、下拉框刷新、选择新合集并执行 DOCX 分析。
|
||||
- 为上传同名合集补充更明确的管理策略,例如覆盖确认、保留上一版本备份或禁止覆盖。
|
||||
- 评估是否支持多种 zip 打包结构,并在文档中明确技能合集 zip 的标准目录格式。
|
||||
- 考虑增加前端上传状态样式区分,例如成功、失败、校验错误使用不同颜色,但保持当前页面简洁风格。
|
||||
- 如后续要正式部署,补充接口级日志,记录上传合集名称、技能数量、校验失败原因和安装时间。
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -19,3 +19,6 @@ deploy.md
|
||||
handoff-2026-05-19.md
|
||||
|
||||
prompts_folder/
|
||||
|
||||
handoff-2026-05-26-10-56-09.md
|
||||
.vscode/launch.json
|
||||
|
||||
11
app/main.py
11
app/main.py
@@ -24,6 +24,7 @@ from app.analyzer import (
|
||||
from app.config import load_api_config
|
||||
from app.docx_parser import parse_docx
|
||||
from app.report_generator import generate_docx_report, generate_markdown_report
|
||||
from app.review_filler import fill_review_docx_from_analysis
|
||||
from app.skill_loader import load_skill_catalog
|
||||
|
||||
|
||||
@@ -33,6 +34,7 @@ OUTPUT_DIR = ROOT_DIR / "outputs"
|
||||
SKILL_ROOT = ROOT_DIR / "skills"
|
||||
DEFAULT_SKILL_COLLECTION = "GJB438C-2021_prd_skills"
|
||||
CONFIG_PATH = ROOT_DIR / "configs" / "api_config.yaml"
|
||||
REVIEW_DOCX_TEMPLATE = ROOT_DIR / "test" / "附录A文档审查.docx"
|
||||
MAX_UPLOAD_BYTES = 30 * 1024 * 1024
|
||||
MAX_SKILL_ARCHIVE_BYTES = 50 * 1024 * 1024
|
||||
ProgressCallback = Callable[[int, str], None]
|
||||
@@ -263,14 +265,21 @@ def analyze_saved_docx(
|
||||
|
||||
progress(85, "正在生成 Markdown 分析文档")
|
||||
markdown_path = generate_markdown_report(report, OUTPUT_DIR)
|
||||
progress(92, "正在生成 DOCX 文档审查单")
|
||||
review_docx_path = markdown_path.with_name(f"{markdown_path.stem}_review.docx")
|
||||
fill_review_docx_from_analysis(markdown_path, REVIEW_DOCX_TEMPLATE, review_docx_path)
|
||||
progress(100, "分析完成")
|
||||
|
||||
return {
|
||||
"source_filename": parsed.filename,
|
||||
"summary": report.summary,
|
||||
"matched_skills": report.matched_skills,
|
||||
"downloads": {"markdown": f"/download/{markdown_path.name}"},
|
||||
"downloads": {
|
||||
"markdown": f"/download/{markdown_path.name}",
|
||||
"review_docx": f"/download/{review_docx_path.name}",
|
||||
},
|
||||
"markdown_filename": markdown_path.name,
|
||||
"review_docx_filename": review_docx_path.name,
|
||||
}
|
||||
|
||||
|
||||
|
||||
567
app/review_filler.py
Normal file
567
app/review_filler.py
Normal file
@@ -0,0 +1,567 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
from docx import Document
|
||||
from docx.oxml.ns import qn
|
||||
from docx.table import Table as DocxTable
|
||||
|
||||
|
||||
ReviewResult = Literal["通过", "未通过", "不适用"]
|
||||
EvidencePolarity = Literal["positive", "negative", "manual", "suggestion"]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ReviewCriterion:
|
||||
table_heading: str
|
||||
table_index: int
|
||||
row_index: int
|
||||
sequence: str
|
||||
category: str
|
||||
content: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ReviewTable:
|
||||
heading: str
|
||||
table_index: int
|
||||
header_row_index: int
|
||||
sequence_col: int
|
||||
category_col: int
|
||||
content_col: int
|
||||
pass_col: int
|
||||
fail_col: int
|
||||
na_col: int
|
||||
criteria: list[ReviewCriterion]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AnalysisEvidence:
|
||||
section: str
|
||||
topic: str
|
||||
text: str
|
||||
polarity: EvidencePolarity
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ParsedAnalysis:
|
||||
path: str
|
||||
source_filename: str
|
||||
matched_skills: list[str]
|
||||
full_text: str
|
||||
evidences: list[AnalysisEvidence]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ReviewDecision:
|
||||
criterion: ReviewCriterion
|
||||
result: ReviewResult
|
||||
confidence: float
|
||||
reason: str
|
||||
evidence: list[AnalysisEvidence]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ReviewFillResult:
|
||||
analysis_markdown: str
|
||||
review_docx: str
|
||||
output_docx: str
|
||||
target_heading: str
|
||||
decisions: list[ReviewDecision]
|
||||
|
||||
|
||||
SECTION_POLARITY: dict[str, EvidencePolarity] = {
|
||||
"符合项": "positive",
|
||||
"不符合项": "negative",
|
||||
"缺失章节或缺失证据": "negative",
|
||||
"整改建议": "suggestion",
|
||||
"需人工复核事项": "manual",
|
||||
}
|
||||
|
||||
SECTION_ALIASES = [
|
||||
("不符合项", "不符合项"),
|
||||
("符合项", "符合项"),
|
||||
("缺失章节或缺失证据", "缺失章节或缺失证据"),
|
||||
("缺失章节", "缺失章节或缺失证据"),
|
||||
("需人工复核事项", "需人工复核事项"),
|
||||
("整改建议", "整改建议"),
|
||||
]
|
||||
|
||||
KEYWORD_GROUPS = {
|
||||
"标识": {"完整标识", "标识号", "缩略名", "版本号", "发布号"},
|
||||
"系统概述": {"系统概述", "用途", "一般特性", "安全性", "可靠性", "实时性", "技术风险"},
|
||||
"文档概述": {"文档概述", "保密性", "安全保密"},
|
||||
"引用文档": {"引用文档", "引用文件", "编号", "标题", "修订版", "日期"},
|
||||
"状态方式": {"状态", "方式", "正常模式", "减载模式", "降级", "紧急"},
|
||||
"能力需求": {"能力需求", "CSCI能力", "软件任务"},
|
||||
"外部接口": {"外部接口", "接口图", "1553B", "CAN", "RS422"},
|
||||
"内部接口": {"内部接口", "内部接口需求"},
|
||||
"内部数据": {"内部数据", "数据结构", "全局变量", "数据字典"},
|
||||
"适应性": {"适应性", "运行环境", "适配"},
|
||||
"保密性": {"保密性", "保密"},
|
||||
"安全性": {"安全性", "安全"},
|
||||
"环境适应性": {"环境适应性", "环境"},
|
||||
"计算机资源": {"计算机资源", "处理时间", "内存", "存储", "CPU", "资源预算"},
|
||||
"人员训练": {"人员", "训练"},
|
||||
"优先级": {"优先级", "关键性", "必须", "应当", "可选"},
|
||||
"合格性规定": {"合格性规定", "检验方法", "测试", "演示", "推断"},
|
||||
"需求可追踪性": {"需求可追踪性", "可追踪性", "追踪矩阵", "双向追踪", "追溯"},
|
||||
"可验证": {"可验证", "可测试", "验证", "判据", "粒度"},
|
||||
"准确清晰": {"准确", "清晰", "歧义", "明确", "二义性", "术语"},
|
||||
"图表": {"图", "图形", "流程图", "时序图", "图文"},
|
||||
"一致性": {"一致", "冲突", "矛盾"},
|
||||
}
|
||||
|
||||
|
||||
def parse_analysis_markdown(path: Path | str) -> ParsedAnalysis:
|
||||
markdown_path = Path(path)
|
||||
text = markdown_path.read_text(encoding="utf-8")
|
||||
source_filename = _extract_front_matter_value(text, "源文件")
|
||||
matched = _extract_front_matter_value(text, "命中技能")
|
||||
matched_skills = [item.strip() for item in matched.split(",") if item.strip() and item.strip() != "无"]
|
||||
model_text = text.split("## 模型分析原文", 1)[-1]
|
||||
evidences = _extract_evidences(model_text)
|
||||
return ParsedAnalysis(
|
||||
path=str(markdown_path),
|
||||
source_filename=source_filename,
|
||||
matched_skills=matched_skills,
|
||||
full_text=text,
|
||||
evidences=evidences,
|
||||
)
|
||||
|
||||
|
||||
def extract_review_tables(docx_path: Path | str) -> list[ReviewTable]:
|
||||
document = Document(docx_path)
|
||||
table_objects = iter(document.tables)
|
||||
review_tables: list[ReviewTable] = []
|
||||
current_heading = ""
|
||||
table_index = 0
|
||||
|
||||
for child in document._element.body.iterchildren():
|
||||
if child.tag == qn("w:p"):
|
||||
text = _paragraph_text(child)
|
||||
if text:
|
||||
current_heading = text
|
||||
continue
|
||||
if child.tag != qn("w:tbl"):
|
||||
continue
|
||||
|
||||
table = next(table_objects)
|
||||
parsed = _parse_review_table(table, current_heading, table_index)
|
||||
if parsed is not None:
|
||||
review_tables.append(parsed)
|
||||
table_index += 1
|
||||
|
||||
return review_tables
|
||||
|
||||
|
||||
def select_review_table(analysis: ParsedAnalysis, tables: list[ReviewTable]) -> ReviewTable:
|
||||
candidates = [table for table in tables if "A.1" not in table.heading]
|
||||
if not candidates:
|
||||
raise ValueError("No review tables found after skipping A.1")
|
||||
|
||||
text = f"{analysis.source_filename}\n{' '.join(analysis.matched_skills)}\n{analysis.full_text}"
|
||||
scores: list[tuple[int, ReviewTable]] = []
|
||||
for table in candidates:
|
||||
heading = table.heading
|
||||
score = 0
|
||||
if "A.2" in heading:
|
||||
score += _term_score(text, ["需求规格说明", "SRS", "需求章节", "需求可追踪性", "合格性规定"])
|
||||
score += 6 if any("requirement" in skill for skill in analysis.matched_skills) else 0
|
||||
if "A.3" in heading:
|
||||
score += _term_score(text, ["设计说明", "详细设计", "软件单元", "CSCI级设计决策"])
|
||||
if "A.4" in heading:
|
||||
score += _term_score(text, ["用户手册", "安装", "操作", "用户功能", "快速参考"])
|
||||
scores.append((score, table))
|
||||
|
||||
scores.sort(key=lambda item: (-item[0], item[1].table_index))
|
||||
best_score, best_table = scores[0]
|
||||
if best_score <= 0:
|
||||
raise ValueError("Unable to select a review table from the analysis report")
|
||||
return best_table
|
||||
|
||||
|
||||
def build_review_decisions(analysis: ParsedAnalysis, table: ReviewTable) -> list[ReviewDecision]:
|
||||
decisions: list[ReviewDecision] = []
|
||||
for criterion in table.criteria:
|
||||
matched = _match_evidence(criterion, analysis.evidences)
|
||||
decisions.append(_decide_review_result(criterion, matched))
|
||||
return decisions
|
||||
|
||||
|
||||
def build_review_decisions_for_tables(
|
||||
analysis: ParsedAnalysis,
|
||||
tables: list[ReviewTable],
|
||||
) -> list[ReviewDecision]:
|
||||
decisions: list[ReviewDecision] = []
|
||||
for table in tables:
|
||||
decisions.extend(build_review_decisions(analysis, table))
|
||||
return decisions
|
||||
|
||||
|
||||
def apply_review_decisions(
|
||||
review_docx_path: Path | str,
|
||||
decisions: list[ReviewDecision],
|
||||
output_docx_path: Path | str,
|
||||
mark: str = "✔",
|
||||
) -> Path:
|
||||
if not decisions:
|
||||
raise ValueError("No review decisions to apply")
|
||||
|
||||
document = Document(review_docx_path)
|
||||
tables_by_index = {index: table for index, table in enumerate(document.tables)}
|
||||
grouped: dict[int, list[ReviewDecision]] = {}
|
||||
for decision in decisions:
|
||||
grouped.setdefault(decision.criterion.table_index, []).append(decision)
|
||||
|
||||
for table_index, table_decisions in grouped.items():
|
||||
table = tables_by_index[table_index]
|
||||
review_table = _parse_review_table(table, table_decisions[0].criterion.table_heading, table_index)
|
||||
if review_table is None:
|
||||
raise ValueError(f"Unable to parse review table {table_index}")
|
||||
by_sequence = {decision.criterion.sequence: decision for decision in table_decisions}
|
||||
result_cols = {
|
||||
"通过": review_table.pass_col,
|
||||
"未通过": review_table.fail_col,
|
||||
"不适用": review_table.na_col,
|
||||
}
|
||||
for criterion in review_table.criteria:
|
||||
decision = by_sequence.get(criterion.sequence)
|
||||
if decision is None:
|
||||
continue
|
||||
row = table.rows[criterion.row_index]
|
||||
selected_col = result_cols[decision.result]
|
||||
for col in (review_table.pass_col, review_table.fail_col, review_table.na_col):
|
||||
row.cells[col].text = mark if col == selected_col else ""
|
||||
|
||||
output_path = Path(output_docx_path)
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
document.save(output_path)
|
||||
return output_path
|
||||
|
||||
|
||||
def validate_review_results(docx_path: Path | str, target_heading: str, mark: str = "✔") -> list[str]:
|
||||
errors: list[str] = []
|
||||
target = _find_table_by_heading(extract_review_tables(docx_path), target_heading)
|
||||
if target is None:
|
||||
return [f"未找到目标审查表:{target_heading}"]
|
||||
|
||||
document = Document(docx_path)
|
||||
table = document.tables[target.table_index]
|
||||
for criterion in target.criteria:
|
||||
row = table.rows[criterion.row_index]
|
||||
marks = [
|
||||
row.cells[target.pass_col].text.strip(),
|
||||
row.cells[target.fail_col].text.strip(),
|
||||
row.cells[target.na_col].text.strip(),
|
||||
]
|
||||
if sum(1 for value in marks if value == mark) != 1:
|
||||
errors.append(f"{target.heading} 序号 {criterion.sequence} 审查结果不是有且仅有一个勾:{marks}")
|
||||
return errors
|
||||
|
||||
|
||||
def validate_review_tables(docx_path: Path | str, target_headings: list[str], mark: str = "✔") -> list[str]:
|
||||
errors: list[str] = []
|
||||
for heading in target_headings:
|
||||
errors.extend(validate_review_results(docx_path, heading, mark=mark))
|
||||
return errors
|
||||
|
||||
|
||||
def fill_review_docx_from_analysis(
|
||||
analysis_markdown_path: Path | str,
|
||||
review_docx_path: Path | str,
|
||||
output_docx_path: Path | str,
|
||||
target_heading: str | None = None,
|
||||
) -> ReviewFillResult:
|
||||
analysis = parse_analysis_markdown(analysis_markdown_path)
|
||||
tables = extract_review_tables(review_docx_path)
|
||||
target_tables = _target_review_tables(tables, target_heading)
|
||||
if not target_tables:
|
||||
raise ValueError(f"Unable to find target review table: {target_heading}")
|
||||
|
||||
decisions = build_review_decisions_for_tables(analysis, target_tables)
|
||||
output_path = apply_review_decisions(review_docx_path, decisions, output_docx_path)
|
||||
errors = validate_review_tables(output_path, [table.heading for table in target_tables])
|
||||
if errors:
|
||||
raise ValueError("; ".join(errors))
|
||||
|
||||
return ReviewFillResult(
|
||||
analysis_markdown=str(analysis_markdown_path),
|
||||
review_docx=str(review_docx_path),
|
||||
output_docx=str(output_path),
|
||||
target_heading="; ".join(table.heading for table in target_tables),
|
||||
decisions=decisions,
|
||||
)
|
||||
|
||||
|
||||
def write_decisions_json(result: ReviewFillResult, output_path: Path | str) -> Path:
|
||||
path = Path(output_path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(_jsonable(result), ensure_ascii=False, indent=2), encoding="utf-8")
|
||||
return path
|
||||
|
||||
|
||||
def _parse_review_table(table: DocxTable, heading: str, table_index: int) -> ReviewTable | None:
|
||||
header = _find_review_header(table)
|
||||
if header is None:
|
||||
return None
|
||||
header_row_index, sequence_col, category_col, content_col, pass_col, fail_col, na_col = header
|
||||
criteria: list[ReviewCriterion] = []
|
||||
for row_index in range(header_row_index + 1, len(table.rows)):
|
||||
cells = table.rows[row_index].cells
|
||||
if max(sequence_col, category_col, content_col, pass_col, fail_col, na_col) >= len(cells):
|
||||
continue
|
||||
sequence = _cell_text(cells[sequence_col])
|
||||
if not sequence.isdigit():
|
||||
continue
|
||||
criteria.append(
|
||||
ReviewCriterion(
|
||||
table_heading=heading,
|
||||
table_index=table_index,
|
||||
row_index=row_index,
|
||||
sequence=sequence,
|
||||
category=_cell_text(cells[category_col]),
|
||||
content=_cell_text(cells[content_col]),
|
||||
)
|
||||
)
|
||||
if not criteria:
|
||||
return None
|
||||
return ReviewTable(
|
||||
heading=heading,
|
||||
table_index=table_index,
|
||||
header_row_index=header_row_index,
|
||||
sequence_col=sequence_col,
|
||||
category_col=category_col,
|
||||
content_col=content_col,
|
||||
pass_col=pass_col,
|
||||
fail_col=fail_col,
|
||||
na_col=na_col,
|
||||
criteria=criteria,
|
||||
)
|
||||
|
||||
|
||||
def _find_review_header(table: DocxTable) -> tuple[int, int, int, int, int, int, int] | None:
|
||||
for row_index, row in enumerate(table.rows):
|
||||
texts = [_cell_text(cell) for cell in row.cells]
|
||||
if "序号" not in texts or "审查内容" not in texts:
|
||||
continue
|
||||
for option_row_index in range(row_index, min(row_index + 3, len(table.rows))):
|
||||
option_texts = [_cell_text(cell) for cell in table.rows[option_row_index].cells]
|
||||
if {"通过", "未通过", "不适用"}.issubset(option_texts):
|
||||
return (
|
||||
option_row_index,
|
||||
texts.index("序号"),
|
||||
texts.index("审查项") if "审查项" in texts else 1,
|
||||
texts.index("审查内容"),
|
||||
option_texts.index("通过"),
|
||||
option_texts.index("未通过"),
|
||||
option_texts.index("不适用"),
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def _extract_evidences(model_text: str) -> list[AnalysisEvidence]:
|
||||
heading_re = re.compile(r"^###\s*(?:\d+[.、]\s*)?(.+?)\s*$")
|
||||
current_section = ""
|
||||
blocks: dict[str, list[str]] = {}
|
||||
for line in model_text.splitlines():
|
||||
match = heading_re.match(line.strip())
|
||||
if match:
|
||||
title = _normalize_section_title(match.group(1))
|
||||
current_section = title if title in SECTION_POLARITY else ""
|
||||
if current_section:
|
||||
blocks.setdefault(current_section, [])
|
||||
continue
|
||||
if current_section:
|
||||
blocks[current_section].append(line)
|
||||
|
||||
evidences: list[AnalysisEvidence] = []
|
||||
for section, lines in blocks.items():
|
||||
for item in _paragraph_items(lines):
|
||||
topic = _extract_topic(item)
|
||||
evidences.append(
|
||||
AnalysisEvidence(
|
||||
section=section,
|
||||
topic=topic,
|
||||
text=item,
|
||||
polarity=SECTION_POLARITY[section],
|
||||
)
|
||||
)
|
||||
return evidences
|
||||
|
||||
|
||||
def _paragraph_items(lines: list[str]) -> list[str]:
|
||||
items: list[str] = []
|
||||
current: list[str] = []
|
||||
bullet_re = re.compile(r"^\s*(?:[-*]|\d+[.、])\s+(.+)")
|
||||
for raw_line in lines:
|
||||
line = raw_line.strip()
|
||||
if not line or line in {"---"}:
|
||||
continue
|
||||
bullet = bullet_re.match(line)
|
||||
if bullet:
|
||||
if current:
|
||||
items.append(_clean_markdown(" ".join(current)))
|
||||
current = [bullet.group(1)]
|
||||
elif current:
|
||||
current.append(line)
|
||||
else:
|
||||
current = [line]
|
||||
if current:
|
||||
items.append(_clean_markdown(" ".join(current)))
|
||||
return [item for item in items if item]
|
||||
|
||||
|
||||
def _match_evidence(criterion: ReviewCriterion, evidences: list[AnalysisEvidence]) -> list[AnalysisEvidence]:
|
||||
keywords = _criterion_keywords(criterion)
|
||||
scored: list[tuple[int, AnalysisEvidence]] = []
|
||||
for evidence in evidences:
|
||||
normalized = _normalize_text(f"{evidence.topic} {evidence.text}")
|
||||
score = sum(1 for keyword in keywords if _normalize_text(keyword) in normalized)
|
||||
if criterion.category == "准确性" and evidence.polarity in {"manual", "negative"}:
|
||||
score += sum(1 for term in ["歧义", "明确", "可验证", "可测试", "粒度", "清晰"] if term in normalized)
|
||||
if criterion.category == "一致性":
|
||||
score += sum(1 for term in ["一致", "冲突", "矛盾"] if term in normalized)
|
||||
if score >= 2:
|
||||
scored.append((score, evidence))
|
||||
scored.sort(key=lambda item: (-item[0], -_polarity_weight(item[1].polarity)))
|
||||
return [evidence for _, evidence in scored[:5]]
|
||||
|
||||
|
||||
def _decide_review_result(criterion: ReviewCriterion, evidences: list[AnalysisEvidence]) -> ReviewDecision:
|
||||
negative = [item for item in evidences if item.polarity == "negative"]
|
||||
manual = [item for item in evidences if item.polarity == "manual"]
|
||||
positive = [item for item in evidences if item.polarity == "positive"]
|
||||
|
||||
if _is_not_applicable(criterion):
|
||||
return ReviewDecision(criterion, "不适用", 0.7, "该审查内容不适用于当前自动选择的审查单。", evidences)
|
||||
|
||||
if negative:
|
||||
return ReviewDecision(criterion, "未通过", 0.88, _reason_from_evidence("分析报告存在不符合或缺失证据", negative), evidences)
|
||||
|
||||
if criterion.category == "准确性" and manual:
|
||||
return ReviewDecision(criterion, "未通过", 0.74, _reason_from_evidence("分析报告提示需要人工复核,不能作为通过证据", manual), evidences)
|
||||
|
||||
if criterion.category == "一致性" and not positive:
|
||||
return ReviewDecision(criterion, "未通过", 0.62, "当前分析报告未提供足够的一致性通过证据。", evidences)
|
||||
|
||||
if positive:
|
||||
return ReviewDecision(criterion, "通过", 0.78, _reason_from_evidence("分析报告提供了符合证据", positive), evidences)
|
||||
|
||||
if manual:
|
||||
return ReviewDecision(criterion, "未通过", 0.65, _reason_from_evidence("分析报告提示需要人工复核", manual), evidences)
|
||||
|
||||
return ReviewDecision(criterion, "未通过", 0.5, "分析报告未提供充分通过证据。", evidences)
|
||||
|
||||
|
||||
def _criterion_keywords(criterion: ReviewCriterion) -> set[str]:
|
||||
keywords = {token for token in _tokenize(criterion.content) if re.search(r"[A-Za-z0-9]", token)}
|
||||
for group_name, group_keywords in KEYWORD_GROUPS.items():
|
||||
name_match = group_name in criterion.content and group_name != "标识"
|
||||
if any(keyword in criterion.content for keyword in group_keywords) or name_match:
|
||||
keywords.update(group_keywords)
|
||||
if criterion.category == "准确性":
|
||||
keywords.update({"准确", "清晰", "明确", "歧义", "可验证", "可测试", "粒度"})
|
||||
elif criterion.category == "一致性":
|
||||
keywords.update({"一致", "冲突", "矛盾"})
|
||||
return {keyword for keyword in keywords if len(keyword) >= 2}
|
||||
|
||||
|
||||
def _tokenize(text: str) -> list[str]:
|
||||
ascii_tokens = re.findall(r"[A-Za-z][A-Za-z0-9_-]{1,}", text)
|
||||
chinese_tokens = re.findall(r"[\u4e00-\u9fff]{2,}", text)
|
||||
return ascii_tokens + chinese_tokens
|
||||
|
||||
|
||||
def _extract_front_matter_value(text: str, label: str) -> str:
|
||||
match = re.search(rf"^- {re.escape(label)}:(.+)$", text, flags=re.MULTILINE)
|
||||
return match.group(1).strip() if match else ""
|
||||
|
||||
|
||||
def _normalize_section_title(title: str) -> str:
|
||||
stripped = title.strip(" #::")
|
||||
for key, value in SECTION_ALIASES:
|
||||
if key in stripped:
|
||||
return value
|
||||
return stripped
|
||||
|
||||
|
||||
def _extract_topic(text: str) -> str:
|
||||
cleaned = text.strip()
|
||||
cleaned = cleaned.lstrip("-*0123456789.、 ")
|
||||
if ":" in cleaned:
|
||||
return cleaned.split(":", 1)[0].strip()
|
||||
if ":" in cleaned:
|
||||
return cleaned.split(":", 1)[0].strip()
|
||||
return cleaned[:30]
|
||||
|
||||
|
||||
def _clean_markdown(text: str) -> str:
|
||||
cleaned = re.sub(r"`([^`]+)`", r"\1", text)
|
||||
cleaned = re.sub(r"\*\*([^*]+)\*\*", r"\1", cleaned)
|
||||
cleaned = re.sub(r"\s+", " ", cleaned)
|
||||
return cleaned.strip()
|
||||
|
||||
|
||||
def _normalize_text(text: str) -> str:
|
||||
return re.sub(r"[\s`*_#::,。,;;()()、\-]+", "", text).lower()
|
||||
|
||||
|
||||
def _paragraph_text(element) -> str:
|
||||
return "".join(text.text or "" for text in element.iter(qn("w:t"))).strip()
|
||||
|
||||
|
||||
def _cell_text(cell) -> str:
|
||||
return cell.text.strip().replace("\n", "")
|
||||
|
||||
|
||||
def _term_score(text: str, terms: list[str]) -> int:
|
||||
return sum(4 for term in terms if term in text)
|
||||
|
||||
|
||||
def _polarity_weight(polarity: EvidencePolarity) -> int:
|
||||
return {"negative": 4, "manual": 3, "positive": 2, "suggestion": 1}[polarity]
|
||||
|
||||
|
||||
def _is_not_applicable(criterion: ReviewCriterion) -> bool:
|
||||
content = criterion.content
|
||||
if "用户" in criterion.table_heading and "CSCI" in content:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _reason_from_evidence(prefix: str, evidences: list[AnalysisEvidence]) -> str:
|
||||
if not evidences:
|
||||
return prefix
|
||||
text = evidences[0].text
|
||||
return f"{prefix}:{text[:160]}"
|
||||
|
||||
|
||||
def _find_table_by_heading(tables: list[ReviewTable], heading: str | None) -> ReviewTable | None:
|
||||
if not heading:
|
||||
return None
|
||||
for table in tables:
|
||||
if heading in table.heading:
|
||||
return table
|
||||
return None
|
||||
|
||||
|
||||
def _target_review_tables(tables: list[ReviewTable], target_heading: str | None) -> list[ReviewTable]:
|
||||
if target_heading:
|
||||
table = _find_table_by_heading(tables, target_heading)
|
||||
return [table] if table is not None else []
|
||||
return [table for table in tables if "A.1" not in table.heading]
|
||||
|
||||
|
||||
def _jsonable(value):
|
||||
if hasattr(value, "__dataclass_fields__"):
|
||||
return {key: _jsonable(item) for key, item in asdict(value).items()}
|
||||
if isinstance(value, list):
|
||||
return [_jsonable(item) for item in value]
|
||||
if isinstance(value, dict):
|
||||
return {key: _jsonable(item) for key, item in value.items()}
|
||||
return value
|
||||
@@ -5,6 +5,7 @@ const skillUploadStatus = document.querySelector("#skill-upload-status");
|
||||
const result = document.querySelector("#result");
|
||||
const summary = document.querySelector("#summary");
|
||||
const skills = document.querySelector("#skills");
|
||||
const reviewDocxLink = document.querySelector("#download-review-docx");
|
||||
const mdLink = document.querySelector("#download-md");
|
||||
const progressBar = document.querySelector("#analysis-progress");
|
||||
const statusText = document.querySelector("#analysis-status");
|
||||
@@ -120,6 +121,7 @@ form.addEventListener("submit", async (event) => {
|
||||
item.textContent = name;
|
||||
skills.appendChild(item);
|
||||
});
|
||||
reviewDocxLink.href = task.downloads.review_docx;
|
||||
mdLink.href = task.downloads.markdown;
|
||||
} catch (error) {
|
||||
summary.textContent = error.message;
|
||||
|
||||
@@ -97,7 +97,7 @@
|
||||
<p id="summary"></p>
|
||||
<div id="skills" class="skills"></div>
|
||||
<div class="downloads">
|
||||
<!-- <a id="download-docx" href="#">下载 DOCX 报告</a> -->
|
||||
<a id="download-review-docx" href="#">下载 DOCX 审查单</a>
|
||||
<a id="download-md" href="#">下载 Markdown 报告</a>
|
||||
</div>
|
||||
</section>
|
||||
|
||||
26
handoff-2026-05-26-16-44-34-implement-0004.md
Normal file
26
handoff-2026-05-26-16-44-34-implement-0004.md
Normal file
@@ -0,0 +1,26 @@
|
||||
# Handoff - 2026-05-26
|
||||
|
||||
## Completed Tasks
|
||||
- 完成 DOCX Open XML 底层解析与修改脚本 `scripts/docx_full_parser.py`,支持读取 DOCX 包内 XML 部件、元素、XPath、关系、图片资源,并支持文本替换和审查结果列写回。
|
||||
- 根据 `test/question_table_example.png` 的表格结构,实现审查单中“通过/未通过/不适用”三选一互斥勾选逻辑,并生成过 `test/附录A文档审查.modify.docx` 进行验证。
|
||||
- 编写 `test/PLAN.md`,设计基于分析 Markdown 自动填写 `附录A文档审查.docx` 的独立功能方案,明确跳过 A.1、区分完整性/准确性/一致性、解析证据与写回 DOCX 的流程。
|
||||
- 新增独立模块 `app/review_filler.py`,实现分析 Markdown 解析、审查表提取、审查项判定、DOCX 写回和结果校验。
|
||||
- 新增命令行入口 `scripts/fill_review_docx.py`,可独立执行审查单自动填写,不接入现有 FastAPI 主流程。
|
||||
- 新增 `tests/test_review_filler.py`,覆盖 Markdown 证据解析、A.1 跳过、审查表抽取、决策生成和 DOCX 三选一互斥写回。
|
||||
- 修正初版只填写 A.2 的遗漏问题,改为默认跳过 A.1,并填写 A.2、A.3、A.4 所有审查表;只有显式传 `--target-heading` 时才单表填写。
|
||||
- 重新生成 `test/中央处理机正常模式软件任务书V1_00_094006f6_附录A文档审查.docx` 和对应 JSON 决策明细,确认 A.2/A.3/A.4 共 70 个序号行均完成互斥勾选。
|
||||
- 完成验证:`pytest` 通过,结果为 `24 passed`;`git diff --check` 通过。
|
||||
|
||||
## Blockers
|
||||
- 当前分析 Markdown 仍是自然语言报告,不是逐审查项结构化结果;自动判定依赖关键词和证据极性规则,准确性有限。
|
||||
- 当前规则采用保守策略,缺少明确通过证据时多判为“未通过”,可能需要人工复核以避免过度严格。
|
||||
- A.3、A.4 使用同一份需求规格说明分析报告进行填写,证据并不完全匹配设计文档和用户手册审查内容;后续最好分别使用对应文档的分析报告。
|
||||
- “不适用”判定规则目前较弱,大多数无证据项会落到“未通过”,需要补充更明确的文档类型与审查项适用性规则。
|
||||
- `app/review_filler.py` 尚未接入现有 Web 系统,当前只作为独立模块和命令行工具使用。
|
||||
|
||||
## Next Steps
|
||||
- 人工抽查生成的 `test/中央处理机正常模式软件任务书V1_00_094006f6_附录A文档审查.docx`,重点复核 A.3、A.4 的审查结果是否应由对应文档分析报告重新驱动。
|
||||
- 优化分析报告生成格式,让模型直接输出结构化 JSON 或表格,包含审查单标题、序号、结果、原因和证据,降低后处理规则复杂度。
|
||||
- 为“不适用”补充清晰判定策略,例如按文档类型、审查表类型、审查内容关键词建立适用性映射。
|
||||
- 增加更多真实样本文档的集成测试,覆盖需求规格说明、设计文档、用户手册三类审查单分别填写的场景。
|
||||
- 用户确认独立模块输出无问题后,再规划接入 FastAPI:新增审查单填写接口、下载勾选后 DOCX、下载 JSON 决策明细。
|
||||
23
handoff-2026-05-26-17-14-49-integrate-0005.md
Normal file
23
handoff-2026-05-26-17-14-49-integrate-0005.md
Normal file
@@ -0,0 +1,23 @@
|
||||
# Handoff - 2026-05-26
|
||||
|
||||
## Completed Tasks
|
||||
- 昨天完成了独立模块 `app/review_filler.py` 向 FastAPI 主流程的集成:在 Markdown 分析报告生成后,自动调用审查单填充逻辑,生成已勾选的 DOCX 文档审查单。
|
||||
- 新增审查单模板路径 `REVIEW_DOCX_TEMPLATE`,当前沿用 `test/附录A文档审查.docx`,生成结果写入现有 `outputs/` 目录,并通过 `/download/{filename}` 下载。
|
||||
- 扩展分析任务返回值,在原有 `markdown` 下载项之外新增 `review_docx` 下载项,同时保留 `markdown_filename` 并新增 `review_docx_filename`。
|
||||
- 更新系统 UI,在分析结果区域新增“下载 DOCX 审查单”按钮,并在前端轮询任务完成后绑定 `task.downloads.review_docx`。
|
||||
- 补充 Web 集成测试,验证页面包含新下载入口、分析流程生成 DOCX 审查单,并校验 A.2、A.3、A.4 审查表每个序号行均满足三选一互斥勾选。
|
||||
- 完成验证:`pytest tests/test_web.py tests/test_review_filler.py` 通过,`pytest` 全量测试通过,结果为 `24 passed`,`git diff --check` 通过。
|
||||
- 启动本地服务并用真实 `/analyze` 上传流程做了运行验证,确认任务完成后返回 Markdown 和 DOCX 审查单两个下载项。
|
||||
|
||||
## Blockers
|
||||
- 当前审查单模板仍位于 `test/附录A文档审查.docx`,可运行但不够产品化;后续建议迁移到专门的模板或资源目录。
|
||||
- `app/review_filler.py` 的判定仍依赖 Markdown 自然语言报告和关键词规则,准确性受模型输出格式影响,自动勾选结果仍需要人工复核。
|
||||
- 本地启发式分析模式下没有结构化“符合项/不符合项”证据段,审查单可生成并通过互斥校验,但判定质量偏保守。
|
||||
- 默认会填写 A.2、A.3、A.4 全部审查单;如果上传文档只对应单一文档类型,后续可能需要在 Web 流程中提供目标审查表选择。
|
||||
|
||||
## Next Steps
|
||||
- 明天计划将审查单模板从 `test/` 迁移到正式资源目录,例如 `resources/templates/` 或 `app/templates/docx/`,并更新常量和测试。
|
||||
- 优化模型分析输出格式,增加结构化审查证据或审查项结果,降低 `review_filler` 对自然语言关键词匹配的依赖。
|
||||
- 在 UI 中评估是否增加“目标审查表”选择项,支持只生成 A.2、A.3 或 A.4 的审查单填写结果。
|
||||
- 增加端到端测试,覆盖 `/analyze` 提交、任务轮询、Markdown 下载和 DOCX 审查单下载的完整 HTTP 流程。
|
||||
- 继续抽查真实样本文档生成的审查单,重点确认“未通过”和“不适用”判定是否符合人工审查预期。
|
||||
643
scripts/docx_full_parser.py
Normal file
643
scripts/docx_full_parser.py
Normal file
@@ -0,0 +1,643 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import fnmatch
|
||||
import json
|
||||
import shutil
|
||||
import zipfile
|
||||
from collections.abc import Callable, Iterable
|
||||
from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import Any
|
||||
|
||||
from lxml import etree
|
||||
|
||||
|
||||
DOCX_NAMESPACES = {
|
||||
"a": "http://schemas.openxmlformats.org/drawingml/2006/main",
|
||||
"cp": "http://schemas.openxmlformats.org/package/2006/metadata/core-properties",
|
||||
"dc": "http://purl.org/dc/elements/1.1/",
|
||||
"dcterms": "http://purl.org/dc/terms/",
|
||||
"ep": "http://schemas.openxmlformats.org/officeDocument/2006/extended-properties",
|
||||
"m": "http://schemas.openxmlformats.org/officeDocument/2006/math",
|
||||
"mc": "http://schemas.openxmlformats.org/markup-compatibility/2006",
|
||||
"o": "urn:schemas-microsoft-com:office:office",
|
||||
"pkg": "http://schemas.microsoft.com/office/2006/xmlPackage",
|
||||
"r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships",
|
||||
"rel": "http://schemas.openxmlformats.org/package/2006/relationships",
|
||||
"v": "urn:schemas-microsoft-com:vml",
|
||||
"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main",
|
||||
"wp": "http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing",
|
||||
"wps": "http://schemas.microsoft.com/office/word/2010/wordprocessingShape",
|
||||
}
|
||||
|
||||
TEXT_TAGS = {
|
||||
f"{{{DOCX_NAMESPACES['w']}}}t",
|
||||
f"{{{DOCX_NAMESPACES['w']}}}instrText",
|
||||
f"{{{DOCX_NAMESPACES['a']}}}t",
|
||||
f"{{{DOCX_NAMESPACES['m']}}}t",
|
||||
}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DocxRelationship:
|
||||
relationship_id: str
|
||||
relationship_type: str
|
||||
target: str
|
||||
mode: str | None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DocxPart:
|
||||
name: str
|
||||
content_type: str | None
|
||||
size: int
|
||||
is_xml: bool
|
||||
relationships: list[DocxRelationship]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DocxElement:
|
||||
element_id: str
|
||||
part_name: str
|
||||
xpath: str
|
||||
tag: str
|
||||
kind: str
|
||||
text: str
|
||||
attributes: dict[str, str]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DocxImage:
|
||||
part_name: str
|
||||
size: int
|
||||
content_type: str | None
|
||||
referenced_by: list[str]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DocxExtraction:
|
||||
source: str
|
||||
parts: list[DocxPart]
|
||||
elements: list[DocxElement]
|
||||
images: list[DocxImage]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ReviewRowUpdate:
|
||||
table_index: int
|
||||
heading: str
|
||||
sequence: str
|
||||
review_content: str
|
||||
result: str
|
||||
|
||||
|
||||
def _local_name(qname: str) -> str:
|
||||
if qname.startswith("{"):
|
||||
return qname.rsplit("}", 1)[1]
|
||||
return qname
|
||||
|
||||
|
||||
def _part_uri(part_name: str) -> str:
|
||||
return "/" + part_name.lstrip("/")
|
||||
|
||||
|
||||
def _relationship_part_name(source_part: str) -> str:
|
||||
if source_part == "_rels/.rels":
|
||||
return source_part
|
||||
source = Path(source_part)
|
||||
return str(source.parent / "_rels" / f"{source.name}.rels")
|
||||
|
||||
|
||||
def _resolve_relationship_target(source_part: str, target: str) -> str:
|
||||
if target.startswith("/") or "://" in target:
|
||||
return target.lstrip("/")
|
||||
if source_part == "_rels/.rels":
|
||||
return target
|
||||
base = Path(source_part).parent
|
||||
return str((base / target).as_posix())
|
||||
|
||||
|
||||
def _content_type_for(part_name: str, defaults: dict[str, str], overrides: dict[str, str]) -> str | None:
|
||||
overridden = overrides.get(_part_uri(part_name))
|
||||
if overridden:
|
||||
return overridden
|
||||
suffix = Path(part_name).suffix.lstrip(".")
|
||||
return defaults.get(suffix)
|
||||
|
||||
|
||||
def _element_text(element: etree._Element) -> str:
|
||||
tag = element.tag
|
||||
if tag in TEXT_TAGS:
|
||||
return element.text or ""
|
||||
local = _local_name(tag)
|
||||
if local in {"p", "tc", "tbl", "comment", "footnote", "endnote", "sdt"}:
|
||||
return "".join(text for text in element.itertext()).strip()
|
||||
return (element.text or "").strip()
|
||||
|
||||
|
||||
def _element_kind(element: etree._Element) -> str:
|
||||
local = _local_name(element.tag)
|
||||
return {
|
||||
"document": "document",
|
||||
"body": "body",
|
||||
"p": "paragraph",
|
||||
"r": "run",
|
||||
"t": "text",
|
||||
"tab": "tab",
|
||||
"br": "break",
|
||||
"tbl": "table",
|
||||
"tr": "table_row",
|
||||
"tc": "table_cell",
|
||||
"drawing": "drawing",
|
||||
"pict": "picture",
|
||||
"hyperlink": "hyperlink",
|
||||
"sectPr": "section_properties",
|
||||
"header": "header",
|
||||
"footer": "footer",
|
||||
"footnote": "footnote",
|
||||
"endnote": "endnote",
|
||||
"comment": "comment",
|
||||
"style": "style",
|
||||
"num": "numbering",
|
||||
"abstractNum": "abstract_numbering",
|
||||
}.get(local, local)
|
||||
|
||||
|
||||
def _simplify_attributes(element: etree._Element) -> dict[str, str]:
|
||||
simplified: dict[str, str] = {}
|
||||
for key, value in element.attrib.items():
|
||||
simplified[_local_name(key)] = value
|
||||
return simplified
|
||||
|
||||
|
||||
def _w_tag(local_name: str) -> str:
|
||||
return f"{{{DOCX_NAMESPACES['w']}}}{local_name}"
|
||||
|
||||
|
||||
def _xml_text(element: etree._Element) -> str:
|
||||
return "".join(text for text in element.itertext()).strip()
|
||||
|
||||
|
||||
def _table_rows(table: etree._Element) -> list[list[etree._Element]]:
|
||||
return [row.findall(_w_tag("tc")) for row in table.findall(_w_tag("tr"))]
|
||||
|
||||
|
||||
def _set_word_cell_text(cell: etree._Element, text: str) -> None:
|
||||
tc_pr = cell.find(_w_tag("tcPr"))
|
||||
for child in list(cell):
|
||||
if child is not tc_pr:
|
||||
cell.remove(child)
|
||||
|
||||
paragraph = etree.SubElement(cell, _w_tag("p"))
|
||||
run = etree.SubElement(paragraph, _w_tag("r"))
|
||||
text_node = etree.SubElement(run, _w_tag("t"))
|
||||
text_node.text = text
|
||||
|
||||
|
||||
def _find_review_header(rows: list[list[etree._Element]]) -> tuple[int, int, int, int, int, int] | None:
|
||||
for row_index, cells in enumerate(rows):
|
||||
texts = [_xml_text(cell) for cell in cells]
|
||||
if "序号" not in texts or "审查内容" not in texts:
|
||||
continue
|
||||
for option_row_index in range(row_index, min(row_index + 3, len(rows))):
|
||||
option_texts = [_xml_text(cell) for cell in rows[option_row_index]]
|
||||
if {"通过", "未通过", "不适用"}.issubset(option_texts):
|
||||
return (
|
||||
option_row_index,
|
||||
texts.index("序号"),
|
||||
texts.index("审查内容"),
|
||||
option_texts.index("通过"),
|
||||
option_texts.index("未通过"),
|
||||
option_texts.index("不适用"),
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def _is_sequence(value: str) -> bool:
|
||||
return value.strip().isdigit()
|
||||
|
||||
|
||||
class DocxPackage:
|
||||
"""Read, inspect, and update a DOCX file without Office/COM automation.
|
||||
|
||||
A DOCX file is a ZIP package containing XML parts, relationship files, and
|
||||
binary assets. This class exposes every XML element by part name and XPath,
|
||||
while keeping non-XML parts byte-for-byte unless the caller replaces them.
|
||||
"""
|
||||
|
||||
def __init__(self, path: Path | str) -> None:
|
||||
self.path = Path(path)
|
||||
if not self.path.exists():
|
||||
raise FileNotFoundError(self.path)
|
||||
if self.path.suffix.lower() != ".docx":
|
||||
raise ValueError(f"Expected a .docx file: {self.path}")
|
||||
|
||||
self._raw_parts: dict[str, bytes] = {}
|
||||
self._xml_trees: dict[str, etree._ElementTree] = {}
|
||||
self._content_type_defaults: dict[str, str] = {}
|
||||
self._content_type_overrides: dict[str, str] = {}
|
||||
self._relationships: dict[str, list[DocxRelationship]] = {}
|
||||
|
||||
self._load_package()
|
||||
|
||||
def _load_package(self) -> None:
|
||||
with zipfile.ZipFile(self.path) as archive:
|
||||
for info in archive.infolist():
|
||||
if info.is_dir():
|
||||
continue
|
||||
self._raw_parts[info.filename] = archive.read(info.filename)
|
||||
|
||||
self._load_content_types()
|
||||
self._load_relationships()
|
||||
|
||||
def _load_content_types(self) -> None:
|
||||
data = self._raw_parts.get("[Content_Types].xml")
|
||||
if not data:
|
||||
return
|
||||
root = etree.fromstring(data)
|
||||
for child in root:
|
||||
local = _local_name(child.tag)
|
||||
if local == "Default":
|
||||
self._content_type_defaults[child.attrib["Extension"]] = child.attrib["ContentType"]
|
||||
elif local == "Override":
|
||||
self._content_type_overrides[child.attrib["PartName"]] = child.attrib["ContentType"]
|
||||
|
||||
def _load_relationships(self) -> None:
|
||||
for part_name, data in self._raw_parts.items():
|
||||
if not part_name.endswith(".rels"):
|
||||
continue
|
||||
root = etree.fromstring(data)
|
||||
source_part = self._source_part_for_relationships(part_name)
|
||||
relationships: list[DocxRelationship] = []
|
||||
for child in root:
|
||||
if _local_name(child.tag) != "Relationship":
|
||||
continue
|
||||
relationships.append(
|
||||
DocxRelationship(
|
||||
relationship_id=child.attrib.get("Id", ""),
|
||||
relationship_type=child.attrib.get("Type", ""),
|
||||
target=child.attrib.get("Target", ""),
|
||||
mode=child.attrib.get("TargetMode"),
|
||||
)
|
||||
)
|
||||
self._relationships[source_part] = relationships
|
||||
|
||||
@staticmethod
|
||||
def _source_part_for_relationships(relationship_part: str) -> str:
|
||||
if relationship_part == "_rels/.rels":
|
||||
return relationship_part
|
||||
marker = "/_rels/"
|
||||
if marker not in relationship_part:
|
||||
return relationship_part
|
||||
directory, filename = relationship_part.split(marker, 1)
|
||||
return f"{directory}/{filename.removesuffix('.rels')}"
|
||||
|
||||
def _parse_xml_part(self, part_name: str) -> etree._ElementTree | None:
|
||||
if part_name in self._xml_trees:
|
||||
return self._xml_trees[part_name]
|
||||
data = self._raw_parts[part_name]
|
||||
if not self._looks_like_xml(part_name, data):
|
||||
return None
|
||||
parser = etree.XMLParser(resolve_entities=False, remove_blank_text=False, huge_tree=True)
|
||||
try:
|
||||
tree = etree.ElementTree(etree.fromstring(data, parser=parser))
|
||||
except etree.XMLSyntaxError:
|
||||
return None
|
||||
self._xml_trees[part_name] = tree
|
||||
return tree
|
||||
|
||||
def _looks_like_xml(self, part_name: str, data: bytes) -> bool:
|
||||
content_type = self.content_type(part_name) or ""
|
||||
if part_name.endswith((".xml", ".rels")):
|
||||
return True
|
||||
return "xml" in content_type or data.lstrip().startswith(b"<")
|
||||
|
||||
def content_type(self, part_name: str) -> str | None:
|
||||
return _content_type_for(part_name, self._content_type_defaults, self._content_type_overrides)
|
||||
|
||||
def parts(self) -> list[DocxPart]:
|
||||
result: list[DocxPart] = []
|
||||
for part_name in sorted(self._raw_parts):
|
||||
tree = self._parse_xml_part(part_name)
|
||||
result.append(
|
||||
DocxPart(
|
||||
name=part_name,
|
||||
content_type=self.content_type(part_name),
|
||||
size=len(self._raw_parts[part_name]),
|
||||
is_xml=tree is not None,
|
||||
relationships=self._relationships.get(part_name, []),
|
||||
)
|
||||
)
|
||||
return result
|
||||
|
||||
def iter_elements(self, part_patterns: Iterable[str] | None = None) -> Iterable[DocxElement]:
|
||||
patterns = tuple(part_patterns or ("*.xml", "*.rels"))
|
||||
for part_name in sorted(self._raw_parts):
|
||||
if not any(fnmatch.fnmatch(part_name, pattern) for pattern in patterns):
|
||||
continue
|
||||
tree = self._parse_xml_part(part_name)
|
||||
if tree is None:
|
||||
continue
|
||||
for element in tree.iter():
|
||||
xpath = tree.getpath(element)
|
||||
yield DocxElement(
|
||||
element_id=f"{part_name}::{xpath}",
|
||||
part_name=part_name,
|
||||
xpath=xpath,
|
||||
tag=_local_name(element.tag),
|
||||
kind=_element_kind(element),
|
||||
text=_element_text(element),
|
||||
attributes=_simplify_attributes(element),
|
||||
)
|
||||
|
||||
def text_blocks(self) -> list[DocxElement]:
|
||||
return [element for element in self.iter_elements(("word/*.xml",)) if element.text]
|
||||
|
||||
def images(self) -> list[DocxImage]:
|
||||
references: dict[str, list[str]] = {}
|
||||
for source_part, relationships in self._relationships.items():
|
||||
for relationship in relationships:
|
||||
if relationship.mode == "External":
|
||||
continue
|
||||
target = _resolve_relationship_target(source_part, relationship.target)
|
||||
references.setdefault(target, []).append(f"{source_part}#{relationship.relationship_id}")
|
||||
|
||||
images: list[DocxImage] = []
|
||||
for part_name in sorted(self._raw_parts):
|
||||
content_type = self.content_type(part_name) or ""
|
||||
if content_type.startswith("image/") or part_name.startswith("word/media/"):
|
||||
images.append(
|
||||
DocxImage(
|
||||
part_name=part_name,
|
||||
size=len(self._raw_parts[part_name]),
|
||||
content_type=content_type or None,
|
||||
referenced_by=references.get(part_name, []),
|
||||
)
|
||||
)
|
||||
return images
|
||||
|
||||
def extract(self, part_patterns: Iterable[str] | None = None) -> DocxExtraction:
|
||||
return DocxExtraction(
|
||||
source=str(self.path),
|
||||
parts=self.parts(),
|
||||
elements=list(self.iter_elements(part_patterns)),
|
||||
images=self.images(),
|
||||
)
|
||||
|
||||
def xpath(self, part_name: str, expression: str) -> list[etree._Element]:
|
||||
tree = self._parse_xml_part(part_name)
|
||||
if tree is None:
|
||||
raise ValueError(f"Part is not parseable XML: {part_name}")
|
||||
return tree.xpath(expression, namespaces=DOCX_NAMESPACES)
|
||||
|
||||
def replace_text(self, old: str, new: str, part_patterns: Iterable[str] | None = None) -> int:
|
||||
if not old:
|
||||
raise ValueError("old text must not be empty")
|
||||
|
||||
patterns = tuple(part_patterns or ("word/*.xml", "docProps/*.xml"))
|
||||
replacements = 0
|
||||
for part_name in sorted(self._raw_parts):
|
||||
if not any(fnmatch.fnmatch(part_name, pattern) for pattern in patterns):
|
||||
continue
|
||||
tree = self._parse_xml_part(part_name)
|
||||
if tree is None:
|
||||
continue
|
||||
rewritten_nodes: set[etree._Element] = set()
|
||||
for container in tree.xpath(".//w:p | .//a:p", namespaces=DOCX_NAMESPACES):
|
||||
text_nodes = [node for node in container.iter() if node.tag in TEXT_TAGS and node.text]
|
||||
if len(text_nodes) < 2:
|
||||
continue
|
||||
joined = "".join(node.text or "" for node in text_nodes)
|
||||
if old not in joined or any(old in (node.text or "") for node in text_nodes):
|
||||
continue
|
||||
replacements += joined.count(old)
|
||||
text_nodes[0].text = joined.replace(old, new)
|
||||
for node in text_nodes[1:]:
|
||||
node.text = ""
|
||||
rewritten_nodes.update(text_nodes)
|
||||
|
||||
for element in tree.iter():
|
||||
if element in rewritten_nodes:
|
||||
continue
|
||||
if element.text and old in element.text:
|
||||
replacements += element.text.count(old)
|
||||
element.text = element.text.replace(old, new)
|
||||
if element.tail and old in element.tail:
|
||||
replacements += element.tail.count(old)
|
||||
element.tail = element.tail.replace(old, new)
|
||||
return replacements
|
||||
|
||||
def set_element_text(self, element_id: str, text: str) -> None:
|
||||
try:
|
||||
part_name, xpath = element_id.split("::", 1)
|
||||
except ValueError as exc:
|
||||
raise ValueError(f"Invalid element id: {element_id}") from exc
|
||||
matches = self.xpath(part_name, xpath)
|
||||
if len(matches) != 1:
|
||||
raise ValueError(f"Expected exactly one element for {element_id}, found {len(matches)}")
|
||||
matches[0].text = text
|
||||
|
||||
def update_xml(self, part_name: str, xpath: str, updater: Callable[[etree._Element], None]) -> int:
|
||||
matches = self.xpath(part_name, xpath)
|
||||
for element in matches:
|
||||
updater(element)
|
||||
return len(matches)
|
||||
|
||||
def replace_part(self, part_name: str, data: bytes) -> None:
|
||||
if part_name not in self._raw_parts:
|
||||
raise KeyError(part_name)
|
||||
self._raw_parts[part_name] = data
|
||||
self._xml_trees.pop(part_name, None)
|
||||
|
||||
def fill_review_results(
|
||||
self,
|
||||
heading_contains: str | None = None,
|
||||
result: str = "通过",
|
||||
mark: str = "✔",
|
||||
) -> list[ReviewRowUpdate]:
|
||||
if result not in {"通过", "未通过", "不适用"}:
|
||||
raise ValueError("result must be one of: 通过, 未通过, 不适用")
|
||||
|
||||
tree = self._parse_xml_part("word/document.xml")
|
||||
if tree is None:
|
||||
raise ValueError("word/document.xml is not parseable XML")
|
||||
|
||||
body = tree.getroot().find(_w_tag("body"))
|
||||
if body is None:
|
||||
return []
|
||||
|
||||
updates: list[ReviewRowUpdate] = []
|
||||
current_heading = ""
|
||||
review_table_index = 0
|
||||
for child in body:
|
||||
if child.tag == _w_tag("p"):
|
||||
paragraph_text = _xml_text(child)
|
||||
if paragraph_text:
|
||||
current_heading = paragraph_text
|
||||
continue
|
||||
if child.tag != _w_tag("tbl"):
|
||||
continue
|
||||
|
||||
rows = _table_rows(child)
|
||||
header = _find_review_header(rows)
|
||||
if header is None:
|
||||
continue
|
||||
|
||||
review_table_index += 1
|
||||
if heading_contains and heading_contains not in current_heading:
|
||||
continue
|
||||
|
||||
header_row_index, sequence_col, content_col, pass_col, fail_col, na_col = header
|
||||
option_columns = {
|
||||
"通过": pass_col,
|
||||
"未通过": fail_col,
|
||||
"不适用": na_col,
|
||||
}
|
||||
selected_col = option_columns[result]
|
||||
|
||||
for cells in rows[header_row_index + 1 :]:
|
||||
if max(sequence_col, content_col, pass_col, fail_col, na_col) >= len(cells):
|
||||
continue
|
||||
sequence = _xml_text(cells[sequence_col])
|
||||
if not _is_sequence(sequence):
|
||||
continue
|
||||
review_content = _xml_text(cells[content_col])
|
||||
for option_col in (pass_col, fail_col, na_col):
|
||||
_set_word_cell_text(cells[option_col], mark if option_col == selected_col else "")
|
||||
updates.append(
|
||||
ReviewRowUpdate(
|
||||
table_index=review_table_index,
|
||||
heading=current_heading,
|
||||
sequence=sequence,
|
||||
review_content=review_content,
|
||||
result=result,
|
||||
)
|
||||
)
|
||||
|
||||
return updates
|
||||
|
||||
def save(self, output_path: Path | str) -> Path:
|
||||
destination = Path(output_path)
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with NamedTemporaryFile(delete=False, suffix=".docx", dir=destination.parent) as tmp:
|
||||
temp_path = Path(tmp.name)
|
||||
|
||||
try:
|
||||
with zipfile.ZipFile(temp_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
|
||||
for part_name in sorted(self._raw_parts):
|
||||
if part_name in self._xml_trees:
|
||||
data = etree.tostring(
|
||||
self._xml_trees[part_name],
|
||||
encoding="UTF-8",
|
||||
xml_declaration=True,
|
||||
standalone=None,
|
||||
)
|
||||
else:
|
||||
data = self._raw_parts[part_name]
|
||||
archive.writestr(part_name, data)
|
||||
shutil.move(str(temp_path), destination)
|
||||
finally:
|
||||
if temp_path.exists():
|
||||
temp_path.unlink()
|
||||
return destination
|
||||
|
||||
|
||||
def _to_jsonable(value: Any) -> Any:
|
||||
if hasattr(value, "__dataclass_fields__"):
|
||||
return asdict(value)
|
||||
if isinstance(value, list):
|
||||
return [_to_jsonable(item) for item in value]
|
||||
if isinstance(value, dict):
|
||||
return {key: _to_jsonable(item) for key, item in value.items()}
|
||||
return value
|
||||
|
||||
|
||||
def _command_extract(args: argparse.Namespace) -> None:
|
||||
package = DocxPackage(args.docx)
|
||||
extraction = package.extract(args.part)
|
||||
print(json.dumps(_to_jsonable(extraction), ensure_ascii=False, indent=2))
|
||||
|
||||
|
||||
def _command_text(args: argparse.Namespace) -> None:
|
||||
package = DocxPackage(args.docx)
|
||||
for block in package.text_blocks():
|
||||
print(f"{block.element_id}\t{block.kind}\t{block.text}")
|
||||
|
||||
|
||||
def _command_replace(args: argparse.Namespace) -> None:
|
||||
package = DocxPackage(args.docx)
|
||||
count = package.replace_text(args.old, args.new, args.part)
|
||||
package.save(args.output)
|
||||
print(f"replacements={count}")
|
||||
print(f"output={args.output}")
|
||||
|
||||
|
||||
def _command_fill_review_results(args: argparse.Namespace) -> None:
|
||||
package = DocxPackage(args.docx)
|
||||
updates = package.fill_review_results(
|
||||
heading_contains=args.heading,
|
||||
result=args.result,
|
||||
mark=args.mark,
|
||||
)
|
||||
package.save(args.output)
|
||||
print(f"updated_rows={len(updates)}")
|
||||
for update in updates:
|
||||
print(f"{update.heading}\t{update.sequence}\t{update.result}\t{update.review_content}")
|
||||
print(f"output={args.output}")
|
||||
|
||||
|
||||
def build_arg_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="Extract and modify DOCX Open XML package content.")
|
||||
subparsers = parser.add_subparsers(dest="command", required=True)
|
||||
|
||||
extract = subparsers.add_parser("extract", help="Print all package parts and XML elements as JSON.")
|
||||
extract.add_argument("docx", type=Path)
|
||||
extract.add_argument("--part", action="append", help="Glob pattern such as word/*.xml; can be repeated.")
|
||||
extract.set_defaults(func=_command_extract)
|
||||
|
||||
text = subparsers.add_parser("text", help="Print text-bearing DOCX elements.")
|
||||
text.add_argument("docx", type=Path)
|
||||
text.set_defaults(func=_command_text)
|
||||
|
||||
replace = subparsers.add_parser("replace", help="Replace text in XML parts and save a new DOCX.")
|
||||
replace.add_argument("docx", type=Path)
|
||||
replace.add_argument("old")
|
||||
replace.add_argument("new")
|
||||
replace.add_argument("-o", "--output", type=Path, required=True)
|
||||
replace.add_argument("--part", action="append", help="Glob pattern such as word/*.xml; can be repeated.")
|
||||
replace.set_defaults(func=_command_replace)
|
||||
|
||||
fill = subparsers.add_parser(
|
||||
"fill-review-results",
|
||||
help="Fill mutually exclusive review-result columns in tables with 序号/审查内容/通过/未通过/不适用 headers.",
|
||||
)
|
||||
fill.add_argument("docx", type=Path)
|
||||
fill.add_argument("-o", "--output", type=Path, required=True)
|
||||
fill.add_argument("--heading", help="Only update review tables after a heading containing this text.")
|
||||
fill.add_argument("--result", choices=("通过", "未通过", "不适用"), default="通过")
|
||||
fill.add_argument("--mark", default="✔")
|
||||
fill.set_defaults(func=_command_fill_review_results)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = build_arg_parser()
|
||||
args = parser.parse_args()
|
||||
args.func(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
"""
|
||||
- 命令行使用:
|
||||
- python scripts/docx_full_parser.py text test/附录A文档审查.docx
|
||||
- python scripts/docx_full_parser.py extract test/附录A文档审查.docx
|
||||
- python scripts/docx_full_parser.py replace input.docx 原文 新文 -o output.docx
|
||||
|
||||
"""
|
||||
55
scripts/fill_review_docx.py
Normal file
55
scripts/fill_review_docx.py
Normal file
@@ -0,0 +1,55 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
ROOT_DIR = Path(__file__).resolve().parent.parent
|
||||
sys.path.insert(0, str(ROOT_DIR))
|
||||
|
||||
from app.review_filler import fill_review_docx_from_analysis, write_decisions_json
|
||||
|
||||
|
||||
def build_arg_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="Fill Appendix A DOCX review results from an analysis Markdown file.")
|
||||
parser.add_argument("--analysis-md", type=Path, required=True, help="Path to the analysis Markdown file.")
|
||||
parser.add_argument("--review-docx", type=Path, required=True, help="Path to the Appendix A review DOCX file.")
|
||||
parser.add_argument("--output-docx", type=Path, required=True, help="Path for the filled review DOCX file.")
|
||||
parser.add_argument("--output-json", type=Path, help="Optional path for review decision details.")
|
||||
parser.add_argument("--target-heading", help="Optional review table heading filter, such as A.2.")
|
||||
return parser
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = build_arg_parser().parse_args()
|
||||
result = fill_review_docx_from_analysis(
|
||||
analysis_markdown_path=args.analysis_md,
|
||||
review_docx_path=args.review_docx,
|
||||
output_docx_path=args.output_docx,
|
||||
target_heading=args.target_heading,
|
||||
)
|
||||
if args.output_json:
|
||||
write_decisions_json(result, args.output_json)
|
||||
|
||||
counts: dict[str, int] = {}
|
||||
for decision in result.decisions:
|
||||
counts[decision.result] = counts.get(decision.result, 0) + 1
|
||||
|
||||
print(f"target_heading={result.target_heading}")
|
||||
print(f"decisions={len(result.decisions)}")
|
||||
for result_name in ("通过", "未通过", "不适用"):
|
||||
print(f"{result_name}={counts.get(result_name, 0)}")
|
||||
print(f"output_docx={result.output_docx}")
|
||||
if args.output_json:
|
||||
print(f"output_json={args.output_json}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
"""
|
||||
python scripts/fill_review_docx.py --analysis-md test/中央处理机正常模式软件任务书V1_00_094006f6_analysis.md --review-docx test/附录A文档审查.docx --output-docx test/中央处理机正
|
||||
│ 常模式软件任务书V1_00_094006f6_附录A文档审查.docx --output-json test/中央处理机正常模式软件任务书V1_00_094006f6_附录A文档审查.json
|
||||
"""
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ from pathlib import Path
|
||||
from docx import Document
|
||||
|
||||
from app.docx_parser import parse_docx
|
||||
from scripts.docx_full_parser import DocxPackage
|
||||
|
||||
|
||||
def test_parse_docx_extracts_headings_paragraphs_and_tables(tmp_path: Path) -> None:
|
||||
@@ -22,3 +23,97 @@ def test_parse_docx_extracts_headings_paragraphs_and_tables(tmp_path: Path) -> N
|
||||
assert "REQ-001" in parsed.text
|
||||
assert parsed.headings[0].text == "软件需求规格说明"
|
||||
assert parsed.tables[0][0] == ["需求编号", "REQ-001"]
|
||||
|
||||
|
||||
def test_docx_package_extracts_elements_across_parts_and_replaces_text(tmp_path: Path) -> None:
|
||||
docx_path = tmp_path / "full.docx"
|
||||
output_path = tmp_path / "modified.docx"
|
||||
document = Document()
|
||||
document.add_heading("原始标题", level=1)
|
||||
document.add_paragraph("正文原始内容")
|
||||
document.sections[0].header.paragraphs[0].text = "页眉原始内容"
|
||||
document.sections[0].footer.paragraphs[0].text = "页脚原始内容"
|
||||
table = document.add_table(rows=1, cols=1)
|
||||
table.cell(0, 0).text = "表格原始内容"
|
||||
document.save(docx_path)
|
||||
|
||||
package = DocxPackage(docx_path)
|
||||
extraction = package.extract()
|
||||
text = "\n".join(element.text for element in extraction.elements)
|
||||
|
||||
assert any(part.name == "word/document.xml" for part in extraction.parts)
|
||||
assert "原始标题" in text
|
||||
assert "页眉原始内容" in text
|
||||
assert "页脚原始内容" in text
|
||||
assert any(element.kind == "table" for element in extraction.elements)
|
||||
|
||||
replacements = package.replace_text("原始", "修改后")
|
||||
package.save(output_path)
|
||||
|
||||
assert replacements >= 4
|
||||
modified = Document(output_path)
|
||||
assert "修改后标题" in "\n".join(paragraph.text for paragraph in modified.paragraphs)
|
||||
assert modified.sections[0].header.paragraphs[0].text == "页眉修改后内容"
|
||||
assert modified.sections[0].footer.paragraphs[0].text == "页脚修改后内容"
|
||||
assert modified.tables[0].cell(0, 0).text == "表格修改后内容"
|
||||
|
||||
|
||||
def test_docx_package_replaces_text_split_across_runs(tmp_path: Path) -> None:
|
||||
docx_path = tmp_path / "split.docx"
|
||||
output_path = tmp_path / "split-modified.docx"
|
||||
document = Document()
|
||||
paragraph = document.add_paragraph()
|
||||
paragraph.add_run("附录")
|
||||
paragraph.add_run("A ")
|
||||
paragraph.add_run("文档审查单")
|
||||
document.save(docx_path)
|
||||
|
||||
package = DocxPackage(docx_path)
|
||||
replacements = package.replace_text("附录A 文档审查单", "附录A 文档检查单")
|
||||
package.save(output_path)
|
||||
|
||||
modified = Document(output_path)
|
||||
assert replacements == 1
|
||||
assert modified.paragraphs[0].text == "附录A 文档检查单"
|
||||
|
||||
|
||||
def test_docx_package_fills_review_result_columns(tmp_path: Path) -> None:
|
||||
docx_path = tmp_path / "review.docx"
|
||||
output_path = tmp_path / "review-modified.docx"
|
||||
document = Document()
|
||||
document.add_paragraph("A.3软件设计文档审查单")
|
||||
table = document.add_table(rows=5, cols=7)
|
||||
table.rows[0].cells[0].text = "文档名称"
|
||||
table.rows[1].cells[0].text = "序号"
|
||||
table.rows[1].cells[1].text = "审查项"
|
||||
table.rows[1].cells[2].text = "审查内容"
|
||||
table.rows[1].cells[3].text = "审查结果(填√)"
|
||||
table.rows[1].cells[6].text = "备注"
|
||||
table.rows[2].cells[0].text = "序号"
|
||||
table.rows[2].cells[1].text = "审查项"
|
||||
table.rows[2].cells[2].text = "审查内容"
|
||||
table.rows[2].cells[3].text = "通过"
|
||||
table.rows[2].cells[4].text = "未通过"
|
||||
table.rows[2].cells[5].text = "不适用"
|
||||
table.rows[2].cells[6].text = "备注"
|
||||
table.rows[3].cells[0].text = "1"
|
||||
table.rows[3].cells[1].text = "完整性"
|
||||
table.rows[3].cells[2].text = "标识描述本文档所适用系统和软件的完整标识。"
|
||||
table.rows[3].cells[4].text = "旧值"
|
||||
table.rows[4].cells[0].text = "2"
|
||||
table.rows[4].cells[1].text = "完整性"
|
||||
table.rows[4].cells[2].text = "系统概述本文档适用的系统和软件的用途。"
|
||||
document.save(docx_path)
|
||||
|
||||
package = DocxPackage(docx_path)
|
||||
updates = package.fill_review_results(heading_contains="A.3", result="通过")
|
||||
package.save(output_path)
|
||||
|
||||
assert [update.sequence for update in updates] == ["1", "2"]
|
||||
assert updates[0].review_content == "标识描述本文档所适用系统和软件的完整标识。"
|
||||
modified = Document(output_path)
|
||||
modified_table = modified.tables[0]
|
||||
assert modified_table.rows[3].cells[3].text == "✔"
|
||||
assert modified_table.rows[3].cells[4].text == ""
|
||||
assert modified_table.rows[3].cells[5].text == ""
|
||||
assert modified_table.rows[4].cells[3].text == "✔"
|
||||
|
||||
91
tests/test_review_filler.py
Normal file
91
tests/test_review_filler.py
Normal file
@@ -0,0 +1,91 @@
|
||||
from pathlib import Path
|
||||
|
||||
from docx import Document
|
||||
|
||||
from app.review_filler import (
|
||||
build_review_decisions,
|
||||
extract_review_tables,
|
||||
fill_review_docx_from_analysis,
|
||||
parse_analysis_markdown,
|
||||
select_review_table,
|
||||
validate_review_results,
|
||||
)
|
||||
|
||||
|
||||
ROOT_DIR = Path(__file__).resolve().parent.parent
|
||||
ANALYSIS_MD = ROOT_DIR / "test" / "中央处理机正常模式软件任务书V1_00_094006f6_analysis.md"
|
||||
REVIEW_DOCX = ROOT_DIR / "test" / "附录A文档审查.docx"
|
||||
|
||||
|
||||
def test_parse_analysis_markdown_extracts_evidence_sections() -> None:
|
||||
analysis = parse_analysis_markdown(ANALYSIS_MD)
|
||||
sections = {item.section for item in analysis.evidences}
|
||||
polarities = {item.polarity for item in analysis.evidences}
|
||||
|
||||
assert analysis.source_filename == "中央处理机正常模式软件任务书V1.00.docx"
|
||||
assert "符合项" in sections
|
||||
assert "不符合项" in sections
|
||||
assert "缺失章节或缺失证据" in sections
|
||||
assert {"positive", "negative", "manual"}.issubset(polarities)
|
||||
assert any("合格性规定" in item.text for item in analysis.evidences)
|
||||
|
||||
|
||||
def test_extract_review_tables_skips_qitao_and_selects_a2_for_requirements_analysis() -> None:
|
||||
analysis = parse_analysis_markdown(ANALYSIS_MD)
|
||||
tables = extract_review_tables(REVIEW_DOCX)
|
||||
selected = select_review_table(analysis, tables)
|
||||
|
||||
assert [table.heading for table in tables] == [
|
||||
"A.2软件需求规格说明审查单",
|
||||
"A.3软件设计文档审查单",
|
||||
"A.4用户手册审查单",
|
||||
]
|
||||
assert selected.heading == "A.2软件需求规格说明审查单"
|
||||
assert len(selected.criteria) == 24
|
||||
assert selected.criteria[0].sequence == "1"
|
||||
assert selected.criteria[0].category == "完整性"
|
||||
|
||||
|
||||
def test_build_review_decisions_uses_negative_evidence_for_missing_sections() -> None:
|
||||
analysis = parse_analysis_markdown(ANALYSIS_MD)
|
||||
table = select_review_table(analysis, extract_review_tables(REVIEW_DOCX))
|
||||
decisions = build_review_decisions(analysis, table)
|
||||
|
||||
assert len(decisions) == 24
|
||||
assert {decision.result for decision in decisions}.issubset({"通过", "未通过", "不适用"})
|
||||
assert decisions[0].criterion.sequence == "1"
|
||||
assert decisions[0].result == "未通过"
|
||||
assert any("缩略名" in evidence.text or "版本号" in evidence.text for evidence in decisions[0].evidence)
|
||||
|
||||
missing_qualification = [
|
||||
decision
|
||||
for decision in decisions
|
||||
if "合格性规定" in decision.criterion.content or "合格性" in decision.reason
|
||||
]
|
||||
assert missing_qualification
|
||||
assert all(decision.result == "未通过" for decision in missing_qualification)
|
||||
|
||||
|
||||
def test_fill_review_docx_from_analysis_writes_mutually_exclusive_results(tmp_path: Path) -> None:
|
||||
output_docx = tmp_path / "review-filled.docx"
|
||||
result = fill_review_docx_from_analysis(ANALYSIS_MD, REVIEW_DOCX, output_docx)
|
||||
|
||||
assert result.target_heading == "A.2软件需求规格说明审查单; A.3软件设计文档审查单; A.4用户手册审查单"
|
||||
assert len(result.decisions) == 70
|
||||
assert output_docx.exists()
|
||||
assert validate_review_results(output_docx, "A.2") == []
|
||||
assert validate_review_results(output_docx, "A.3") == []
|
||||
assert validate_review_results(output_docx, "A.4") == []
|
||||
|
||||
document = Document(output_docx)
|
||||
expected_rows = {1: 24, 2: 18, 3: 28}
|
||||
for table_index, expected_count in expected_rows.items():
|
||||
marked_rows = 0
|
||||
for row in document.tables[table_index].rows[3:]:
|
||||
sequence = row.cells[0].text.strip()
|
||||
if not sequence.isdigit():
|
||||
continue
|
||||
marks = [row.cells[index].text.strip() for index in (3, 4, 5)]
|
||||
assert sum(1 for value in marks if value == "✔") == 1
|
||||
marked_rows += 1
|
||||
assert marked_rows == expected_count
|
||||
@@ -6,6 +6,7 @@ from docx import Document
|
||||
|
||||
import app.main as main
|
||||
from app.main import OUTPUT_DIR, ROOT_DIR, analyze_saved_docx, app
|
||||
from app.review_filler import validate_review_results
|
||||
|
||||
|
||||
class FakeUploadFile:
|
||||
@@ -44,7 +45,9 @@ def test_index_template_contains_upload_ui() -> None:
|
||||
assert "analysis-progress" in html
|
||||
assert "analysis-status" in html
|
||||
assert "下载 Markdown 报告" in html
|
||||
assert "下载 DOCX 审查单" in html
|
||||
assert "download-md" in js
|
||||
assert "download-review-docx" in js
|
||||
assert "pollTask" in js
|
||||
assert "skill_collection" in html
|
||||
assert "skill-upload-form" in html
|
||||
@@ -150,9 +153,14 @@ def test_analyze_saved_docx_creates_downloadable_report(tmp_path: Path) -> None:
|
||||
payload = analyze_saved_docx(docx_path, provider="deepseek", use_model=False)
|
||||
|
||||
assert payload["source_filename"] == "upload.docx"
|
||||
assert "docx" not in payload["downloads"]
|
||||
assert payload["downloads"]["markdown"].endswith(".md")
|
||||
assert payload["downloads"]["review_docx"].endswith(".docx")
|
||||
assert (OUTPUT_DIR / Path(payload["downloads"]["markdown"]).name).exists()
|
||||
review_docx_path = OUTPUT_DIR / Path(payload["downloads"]["review_docx"]).name
|
||||
assert review_docx_path.exists()
|
||||
assert validate_review_results(review_docx_path, "A.2") == []
|
||||
assert validate_review_results(review_docx_path, "A.3") == []
|
||||
assert validate_review_results(review_docx_path, "A.4") == []
|
||||
|
||||
|
||||
def test_analyze_saved_docx_uses_selected_collection(tmp_path: Path) -> None:
|
||||
|
||||
Reference in New Issue
Block a user