diff --git a/scripts/docx_full_parser.py b/scripts/docx_full_parser.py new file mode 100644 index 0000000..1fb2422 --- /dev/null +++ b/scripts/docx_full_parser.py @@ -0,0 +1,643 @@ +from __future__ import annotations + +import argparse +import fnmatch +import json +import shutil +import zipfile +from collections.abc import Callable, Iterable +from dataclasses import asdict, dataclass +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Any + +from lxml import etree + + +DOCX_NAMESPACES = { + "a": "http://schemas.openxmlformats.org/drawingml/2006/main", + "cp": "http://schemas.openxmlformats.org/package/2006/metadata/core-properties", + "dc": "http://purl.org/dc/elements/1.1/", + "dcterms": "http://purl.org/dc/terms/", + "ep": "http://schemas.openxmlformats.org/officeDocument/2006/extended-properties", + "m": "http://schemas.openxmlformats.org/officeDocument/2006/math", + "mc": "http://schemas.openxmlformats.org/markup-compatibility/2006", + "o": "urn:schemas-microsoft-com:office:office", + "pkg": "http://schemas.microsoft.com/office/2006/xmlPackage", + "r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships", + "rel": "http://schemas.openxmlformats.org/package/2006/relationships", + "v": "urn:schemas-microsoft-com:vml", + "w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main", + "wp": "http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing", + "wps": "http://schemas.microsoft.com/office/word/2010/wordprocessingShape", +} + +TEXT_TAGS = { + f"{{{DOCX_NAMESPACES['w']}}}t", + f"{{{DOCX_NAMESPACES['w']}}}instrText", + f"{{{DOCX_NAMESPACES['a']}}}t", + f"{{{DOCX_NAMESPACES['m']}}}t", +} + + +@dataclass(frozen=True) +class DocxRelationship: + relationship_id: str + relationship_type: str + target: str + mode: str | None + + +@dataclass(frozen=True) +class DocxPart: + name: str + content_type: str | None + size: int + is_xml: bool + relationships: list[DocxRelationship] + + +@dataclass(frozen=True) +class DocxElement: + element_id: str + part_name: str + xpath: str + tag: str + kind: str + text: str + attributes: dict[str, str] + + +@dataclass(frozen=True) +class DocxImage: + part_name: str + size: int + content_type: str | None + referenced_by: list[str] + + +@dataclass(frozen=True) +class DocxExtraction: + source: str + parts: list[DocxPart] + elements: list[DocxElement] + images: list[DocxImage] + + +@dataclass(frozen=True) +class ReviewRowUpdate: + table_index: int + heading: str + sequence: str + review_content: str + result: str + + +def _local_name(qname: str) -> str: + if qname.startswith("{"): + return qname.rsplit("}", 1)[1] + return qname + + +def _part_uri(part_name: str) -> str: + return "/" + part_name.lstrip("/") + + +def _relationship_part_name(source_part: str) -> str: + if source_part == "_rels/.rels": + return source_part + source = Path(source_part) + return str(source.parent / "_rels" / f"{source.name}.rels") + + +def _resolve_relationship_target(source_part: str, target: str) -> str: + if target.startswith("/") or "://" in target: + return target.lstrip("/") + if source_part == "_rels/.rels": + return target + base = Path(source_part).parent + return str((base / target).as_posix()) + + +def _content_type_for(part_name: str, defaults: dict[str, str], overrides: dict[str, str]) -> str | None: + overridden = overrides.get(_part_uri(part_name)) + if overridden: + return overridden + suffix = Path(part_name).suffix.lstrip(".") + return defaults.get(suffix) + + +def _element_text(element: etree._Element) -> str: + tag = element.tag + if tag in TEXT_TAGS: + return element.text or "" + local = _local_name(tag) + if local in {"p", "tc", "tbl", "comment", "footnote", "endnote", "sdt"}: + return "".join(text for text in element.itertext()).strip() + return (element.text or "").strip() + + +def _element_kind(element: etree._Element) -> str: + local = _local_name(element.tag) + return { + "document": "document", + "body": "body", + "p": "paragraph", + "r": "run", + "t": "text", + "tab": "tab", + "br": "break", + "tbl": "table", + "tr": "table_row", + "tc": "table_cell", + "drawing": "drawing", + "pict": "picture", + "hyperlink": "hyperlink", + "sectPr": "section_properties", + "header": "header", + "footer": "footer", + "footnote": "footnote", + "endnote": "endnote", + "comment": "comment", + "style": "style", + "num": "numbering", + "abstractNum": "abstract_numbering", + }.get(local, local) + + +def _simplify_attributes(element: etree._Element) -> dict[str, str]: + simplified: dict[str, str] = {} + for key, value in element.attrib.items(): + simplified[_local_name(key)] = value + return simplified + + +def _w_tag(local_name: str) -> str: + return f"{{{DOCX_NAMESPACES['w']}}}{local_name}" + + +def _xml_text(element: etree._Element) -> str: + return "".join(text for text in element.itertext()).strip() + + +def _table_rows(table: etree._Element) -> list[list[etree._Element]]: + return [row.findall(_w_tag("tc")) for row in table.findall(_w_tag("tr"))] + + +def _set_word_cell_text(cell: etree._Element, text: str) -> None: + tc_pr = cell.find(_w_tag("tcPr")) + for child in list(cell): + if child is not tc_pr: + cell.remove(child) + + paragraph = etree.SubElement(cell, _w_tag("p")) + run = etree.SubElement(paragraph, _w_tag("r")) + text_node = etree.SubElement(run, _w_tag("t")) + text_node.text = text + + +def _find_review_header(rows: list[list[etree._Element]]) -> tuple[int, int, int, int, int, int] | None: + for row_index, cells in enumerate(rows): + texts = [_xml_text(cell) for cell in cells] + if "序号" not in texts or "审查内容" not in texts: + continue + for option_row_index in range(row_index, min(row_index + 3, len(rows))): + option_texts = [_xml_text(cell) for cell in rows[option_row_index]] + if {"通过", "未通过", "不适用"}.issubset(option_texts): + return ( + option_row_index, + texts.index("序号"), + texts.index("审查内容"), + option_texts.index("通过"), + option_texts.index("未通过"), + option_texts.index("不适用"), + ) + return None + + +def _is_sequence(value: str) -> bool: + return value.strip().isdigit() + + +class DocxPackage: + """Read, inspect, and update a DOCX file without Office/COM automation. + + A DOCX file is a ZIP package containing XML parts, relationship files, and + binary assets. This class exposes every XML element by part name and XPath, + while keeping non-XML parts byte-for-byte unless the caller replaces them. + """ + + def __init__(self, path: Path | str) -> None: + self.path = Path(path) + if not self.path.exists(): + raise FileNotFoundError(self.path) + if self.path.suffix.lower() != ".docx": + raise ValueError(f"Expected a .docx file: {self.path}") + + self._raw_parts: dict[str, bytes] = {} + self._xml_trees: dict[str, etree._ElementTree] = {} + self._content_type_defaults: dict[str, str] = {} + self._content_type_overrides: dict[str, str] = {} + self._relationships: dict[str, list[DocxRelationship]] = {} + + self._load_package() + + def _load_package(self) -> None: + with zipfile.ZipFile(self.path) as archive: + for info in archive.infolist(): + if info.is_dir(): + continue + self._raw_parts[info.filename] = archive.read(info.filename) + + self._load_content_types() + self._load_relationships() + + def _load_content_types(self) -> None: + data = self._raw_parts.get("[Content_Types].xml") + if not data: + return + root = etree.fromstring(data) + for child in root: + local = _local_name(child.tag) + if local == "Default": + self._content_type_defaults[child.attrib["Extension"]] = child.attrib["ContentType"] + elif local == "Override": + self._content_type_overrides[child.attrib["PartName"]] = child.attrib["ContentType"] + + def _load_relationships(self) -> None: + for part_name, data in self._raw_parts.items(): + if not part_name.endswith(".rels"): + continue + root = etree.fromstring(data) + source_part = self._source_part_for_relationships(part_name) + relationships: list[DocxRelationship] = [] + for child in root: + if _local_name(child.tag) != "Relationship": + continue + relationships.append( + DocxRelationship( + relationship_id=child.attrib.get("Id", ""), + relationship_type=child.attrib.get("Type", ""), + target=child.attrib.get("Target", ""), + mode=child.attrib.get("TargetMode"), + ) + ) + self._relationships[source_part] = relationships + + @staticmethod + def _source_part_for_relationships(relationship_part: str) -> str: + if relationship_part == "_rels/.rels": + return relationship_part + marker = "/_rels/" + if marker not in relationship_part: + return relationship_part + directory, filename = relationship_part.split(marker, 1) + return f"{directory}/{filename.removesuffix('.rels')}" + + def _parse_xml_part(self, part_name: str) -> etree._ElementTree | None: + if part_name in self._xml_trees: + return self._xml_trees[part_name] + data = self._raw_parts[part_name] + if not self._looks_like_xml(part_name, data): + return None + parser = etree.XMLParser(resolve_entities=False, remove_blank_text=False, huge_tree=True) + try: + tree = etree.ElementTree(etree.fromstring(data, parser=parser)) + except etree.XMLSyntaxError: + return None + self._xml_trees[part_name] = tree + return tree + + def _looks_like_xml(self, part_name: str, data: bytes) -> bool: + content_type = self.content_type(part_name) or "" + if part_name.endswith((".xml", ".rels")): + return True + return "xml" in content_type or data.lstrip().startswith(b"<") + + def content_type(self, part_name: str) -> str | None: + return _content_type_for(part_name, self._content_type_defaults, self._content_type_overrides) + + def parts(self) -> list[DocxPart]: + result: list[DocxPart] = [] + for part_name in sorted(self._raw_parts): + tree = self._parse_xml_part(part_name) + result.append( + DocxPart( + name=part_name, + content_type=self.content_type(part_name), + size=len(self._raw_parts[part_name]), + is_xml=tree is not None, + relationships=self._relationships.get(part_name, []), + ) + ) + return result + + def iter_elements(self, part_patterns: Iterable[str] | None = None) -> Iterable[DocxElement]: + patterns = tuple(part_patterns or ("*.xml", "*.rels")) + for part_name in sorted(self._raw_parts): + if not any(fnmatch.fnmatch(part_name, pattern) for pattern in patterns): + continue + tree = self._parse_xml_part(part_name) + if tree is None: + continue + for element in tree.iter(): + xpath = tree.getpath(element) + yield DocxElement( + element_id=f"{part_name}::{xpath}", + part_name=part_name, + xpath=xpath, + tag=_local_name(element.tag), + kind=_element_kind(element), + text=_element_text(element), + attributes=_simplify_attributes(element), + ) + + def text_blocks(self) -> list[DocxElement]: + return [element for element in self.iter_elements(("word/*.xml",)) if element.text] + + def images(self) -> list[DocxImage]: + references: dict[str, list[str]] = {} + for source_part, relationships in self._relationships.items(): + for relationship in relationships: + if relationship.mode == "External": + continue + target = _resolve_relationship_target(source_part, relationship.target) + references.setdefault(target, []).append(f"{source_part}#{relationship.relationship_id}") + + images: list[DocxImage] = [] + for part_name in sorted(self._raw_parts): + content_type = self.content_type(part_name) or "" + if content_type.startswith("image/") or part_name.startswith("word/media/"): + images.append( + DocxImage( + part_name=part_name, + size=len(self._raw_parts[part_name]), + content_type=content_type or None, + referenced_by=references.get(part_name, []), + ) + ) + return images + + def extract(self, part_patterns: Iterable[str] | None = None) -> DocxExtraction: + return DocxExtraction( + source=str(self.path), + parts=self.parts(), + elements=list(self.iter_elements(part_patterns)), + images=self.images(), + ) + + def xpath(self, part_name: str, expression: str) -> list[etree._Element]: + tree = self._parse_xml_part(part_name) + if tree is None: + raise ValueError(f"Part is not parseable XML: {part_name}") + return tree.xpath(expression, namespaces=DOCX_NAMESPACES) + + def replace_text(self, old: str, new: str, part_patterns: Iterable[str] | None = None) -> int: + if not old: + raise ValueError("old text must not be empty") + + patterns = tuple(part_patterns or ("word/*.xml", "docProps/*.xml")) + replacements = 0 + for part_name in sorted(self._raw_parts): + if not any(fnmatch.fnmatch(part_name, pattern) for pattern in patterns): + continue + tree = self._parse_xml_part(part_name) + if tree is None: + continue + rewritten_nodes: set[etree._Element] = set() + for container in tree.xpath(".//w:p | .//a:p", namespaces=DOCX_NAMESPACES): + text_nodes = [node for node in container.iter() if node.tag in TEXT_TAGS and node.text] + if len(text_nodes) < 2: + continue + joined = "".join(node.text or "" for node in text_nodes) + if old not in joined or any(old in (node.text or "") for node in text_nodes): + continue + replacements += joined.count(old) + text_nodes[0].text = joined.replace(old, new) + for node in text_nodes[1:]: + node.text = "" + rewritten_nodes.update(text_nodes) + + for element in tree.iter(): + if element in rewritten_nodes: + continue + if element.text and old in element.text: + replacements += element.text.count(old) + element.text = element.text.replace(old, new) + if element.tail and old in element.tail: + replacements += element.tail.count(old) + element.tail = element.tail.replace(old, new) + return replacements + + def set_element_text(self, element_id: str, text: str) -> None: + try: + part_name, xpath = element_id.split("::", 1) + except ValueError as exc: + raise ValueError(f"Invalid element id: {element_id}") from exc + matches = self.xpath(part_name, xpath) + if len(matches) != 1: + raise ValueError(f"Expected exactly one element for {element_id}, found {len(matches)}") + matches[0].text = text + + def update_xml(self, part_name: str, xpath: str, updater: Callable[[etree._Element], None]) -> int: + matches = self.xpath(part_name, xpath) + for element in matches: + updater(element) + return len(matches) + + def replace_part(self, part_name: str, data: bytes) -> None: + if part_name not in self._raw_parts: + raise KeyError(part_name) + self._raw_parts[part_name] = data + self._xml_trees.pop(part_name, None) + + def fill_review_results( + self, + heading_contains: str | None = None, + result: str = "通过", + mark: str = "✔", + ) -> list[ReviewRowUpdate]: + if result not in {"通过", "未通过", "不适用"}: + raise ValueError("result must be one of: 通过, 未通过, 不适用") + + tree = self._parse_xml_part("word/document.xml") + if tree is None: + raise ValueError("word/document.xml is not parseable XML") + + body = tree.getroot().find(_w_tag("body")) + if body is None: + return [] + + updates: list[ReviewRowUpdate] = [] + current_heading = "" + review_table_index = 0 + for child in body: + if child.tag == _w_tag("p"): + paragraph_text = _xml_text(child) + if paragraph_text: + current_heading = paragraph_text + continue + if child.tag != _w_tag("tbl"): + continue + + rows = _table_rows(child) + header = _find_review_header(rows) + if header is None: + continue + + review_table_index += 1 + if heading_contains and heading_contains not in current_heading: + continue + + header_row_index, sequence_col, content_col, pass_col, fail_col, na_col = header + option_columns = { + "通过": pass_col, + "未通过": fail_col, + "不适用": na_col, + } + selected_col = option_columns[result] + + for cells in rows[header_row_index + 1 :]: + if max(sequence_col, content_col, pass_col, fail_col, na_col) >= len(cells): + continue + sequence = _xml_text(cells[sequence_col]) + if not _is_sequence(sequence): + continue + review_content = _xml_text(cells[content_col]) + for option_col in (pass_col, fail_col, na_col): + _set_word_cell_text(cells[option_col], mark if option_col == selected_col else "") + updates.append( + ReviewRowUpdate( + table_index=review_table_index, + heading=current_heading, + sequence=sequence, + review_content=review_content, + result=result, + ) + ) + + return updates + + def save(self, output_path: Path | str) -> Path: + destination = Path(output_path) + destination.parent.mkdir(parents=True, exist_ok=True) + + with NamedTemporaryFile(delete=False, suffix=".docx", dir=destination.parent) as tmp: + temp_path = Path(tmp.name) + + try: + with zipfile.ZipFile(temp_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: + for part_name in sorted(self._raw_parts): + if part_name in self._xml_trees: + data = etree.tostring( + self._xml_trees[part_name], + encoding="UTF-8", + xml_declaration=True, + standalone=None, + ) + else: + data = self._raw_parts[part_name] + archive.writestr(part_name, data) + shutil.move(str(temp_path), destination) + finally: + if temp_path.exists(): + temp_path.unlink() + return destination + + +def _to_jsonable(value: Any) -> Any: + if hasattr(value, "__dataclass_fields__"): + return asdict(value) + if isinstance(value, list): + return [_to_jsonable(item) for item in value] + if isinstance(value, dict): + return {key: _to_jsonable(item) for key, item in value.items()} + return value + + +def _command_extract(args: argparse.Namespace) -> None: + package = DocxPackage(args.docx) + extraction = package.extract(args.part) + print(json.dumps(_to_jsonable(extraction), ensure_ascii=False, indent=2)) + + +def _command_text(args: argparse.Namespace) -> None: + package = DocxPackage(args.docx) + for block in package.text_blocks(): + print(f"{block.element_id}\t{block.kind}\t{block.text}") + + +def _command_replace(args: argparse.Namespace) -> None: + package = DocxPackage(args.docx) + count = package.replace_text(args.old, args.new, args.part) + package.save(args.output) + print(f"replacements={count}") + print(f"output={args.output}") + + +def _command_fill_review_results(args: argparse.Namespace) -> None: + package = DocxPackage(args.docx) + updates = package.fill_review_results( + heading_contains=args.heading, + result=args.result, + mark=args.mark, + ) + package.save(args.output) + print(f"updated_rows={len(updates)}") + for update in updates: + print(f"{update.heading}\t{update.sequence}\t{update.result}\t{update.review_content}") + print(f"output={args.output}") + + +def build_arg_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Extract and modify DOCX Open XML package content.") + subparsers = parser.add_subparsers(dest="command", required=True) + + extract = subparsers.add_parser("extract", help="Print all package parts and XML elements as JSON.") + extract.add_argument("docx", type=Path) + extract.add_argument("--part", action="append", help="Glob pattern such as word/*.xml; can be repeated.") + extract.set_defaults(func=_command_extract) + + text = subparsers.add_parser("text", help="Print text-bearing DOCX elements.") + text.add_argument("docx", type=Path) + text.set_defaults(func=_command_text) + + replace = subparsers.add_parser("replace", help="Replace text in XML parts and save a new DOCX.") + replace.add_argument("docx", type=Path) + replace.add_argument("old") + replace.add_argument("new") + replace.add_argument("-o", "--output", type=Path, required=True) + replace.add_argument("--part", action="append", help="Glob pattern such as word/*.xml; can be repeated.") + replace.set_defaults(func=_command_replace) + + fill = subparsers.add_parser( + "fill-review-results", + help="Fill mutually exclusive review-result columns in tables with 序号/审查内容/通过/未通过/不适用 headers.", + ) + fill.add_argument("docx", type=Path) + fill.add_argument("-o", "--output", type=Path, required=True) + fill.add_argument("--heading", help="Only update review tables after a heading containing this text.") + fill.add_argument("--result", choices=("通过", "未通过", "不适用"), default="通过") + fill.add_argument("--mark", default="✔") + fill.set_defaults(func=_command_fill_review_results) + + return parser + + +def main() -> None: + parser = build_arg_parser() + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() + + +""" + - 命令行使用: + - python scripts/docx_full_parser.py text test/附录A文档审查.docx + - python scripts/docx_full_parser.py extract test/附录A文档审查.docx + - python scripts/docx_full_parser.py replace input.docx 原文 新文 -o output.docx + +""" diff --git a/tests/test_docx_parser.py b/tests/test_docx_parser.py index 01988af..fb8167f 100644 --- a/tests/test_docx_parser.py +++ b/tests/test_docx_parser.py @@ -3,6 +3,7 @@ from pathlib import Path from docx import Document from app.docx_parser import parse_docx +from scripts.docx_full_parser import DocxPackage def test_parse_docx_extracts_headings_paragraphs_and_tables(tmp_path: Path) -> None: @@ -22,3 +23,97 @@ def test_parse_docx_extracts_headings_paragraphs_and_tables(tmp_path: Path) -> N assert "REQ-001" in parsed.text assert parsed.headings[0].text == "软件需求规格说明" assert parsed.tables[0][0] == ["需求编号", "REQ-001"] + + +def test_docx_package_extracts_elements_across_parts_and_replaces_text(tmp_path: Path) -> None: + docx_path = tmp_path / "full.docx" + output_path = tmp_path / "modified.docx" + document = Document() + document.add_heading("原始标题", level=1) + document.add_paragraph("正文原始内容") + document.sections[0].header.paragraphs[0].text = "页眉原始内容" + document.sections[0].footer.paragraphs[0].text = "页脚原始内容" + table = document.add_table(rows=1, cols=1) + table.cell(0, 0).text = "表格原始内容" + document.save(docx_path) + + package = DocxPackage(docx_path) + extraction = package.extract() + text = "\n".join(element.text for element in extraction.elements) + + assert any(part.name == "word/document.xml" for part in extraction.parts) + assert "原始标题" in text + assert "页眉原始内容" in text + assert "页脚原始内容" in text + assert any(element.kind == "table" for element in extraction.elements) + + replacements = package.replace_text("原始", "修改后") + package.save(output_path) + + assert replacements >= 4 + modified = Document(output_path) + assert "修改后标题" in "\n".join(paragraph.text for paragraph in modified.paragraphs) + assert modified.sections[0].header.paragraphs[0].text == "页眉修改后内容" + assert modified.sections[0].footer.paragraphs[0].text == "页脚修改后内容" + assert modified.tables[0].cell(0, 0).text == "表格修改后内容" + + +def test_docx_package_replaces_text_split_across_runs(tmp_path: Path) -> None: + docx_path = tmp_path / "split.docx" + output_path = tmp_path / "split-modified.docx" + document = Document() + paragraph = document.add_paragraph() + paragraph.add_run("附录") + paragraph.add_run("A ") + paragraph.add_run("文档审查单") + document.save(docx_path) + + package = DocxPackage(docx_path) + replacements = package.replace_text("附录A 文档审查单", "附录A 文档检查单") + package.save(output_path) + + modified = Document(output_path) + assert replacements == 1 + assert modified.paragraphs[0].text == "附录A 文档检查单" + + +def test_docx_package_fills_review_result_columns(tmp_path: Path) -> None: + docx_path = tmp_path / "review.docx" + output_path = tmp_path / "review-modified.docx" + document = Document() + document.add_paragraph("A.3软件设计文档审查单") + table = document.add_table(rows=5, cols=7) + table.rows[0].cells[0].text = "文档名称" + table.rows[1].cells[0].text = "序号" + table.rows[1].cells[1].text = "审查项" + table.rows[1].cells[2].text = "审查内容" + table.rows[1].cells[3].text = "审查结果(填√)" + table.rows[1].cells[6].text = "备注" + table.rows[2].cells[0].text = "序号" + table.rows[2].cells[1].text = "审查项" + table.rows[2].cells[2].text = "审查内容" + table.rows[2].cells[3].text = "通过" + table.rows[2].cells[4].text = "未通过" + table.rows[2].cells[5].text = "不适用" + table.rows[2].cells[6].text = "备注" + table.rows[3].cells[0].text = "1" + table.rows[3].cells[1].text = "完整性" + table.rows[3].cells[2].text = "标识描述本文档所适用系统和软件的完整标识。" + table.rows[3].cells[4].text = "旧值" + table.rows[4].cells[0].text = "2" + table.rows[4].cells[1].text = "完整性" + table.rows[4].cells[2].text = "系统概述本文档适用的系统和软件的用途。" + document.save(docx_path) + + package = DocxPackage(docx_path) + updates = package.fill_review_results(heading_contains="A.3", result="通过") + package.save(output_path) + + assert [update.sequence for update in updates] == ["1", "2"] + assert updates[0].review_content == "标识描述本文档所适用系统和软件的完整标识。" + modified = Document(output_path) + modified_table = modified.tables[0] + assert modified_table.rows[3].cells[3].text == "✔" + assert modified_table.rows[3].cells[4].text == "" + assert modified_table.rows[3].cells[5].text == "" + assert modified_table.rows[4].cells[3].text == "✔"