Files
linux_format_docs_check/scripts/docx_full_parser.py

644 lines
23 KiB
Python
Raw Normal View History

from __future__ import annotations
import argparse
import fnmatch
import json
import shutil
import zipfile
from collections.abc import Callable, Iterable
from dataclasses import asdict, dataclass
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any
from lxml import etree
DOCX_NAMESPACES = {
"a": "http://schemas.openxmlformats.org/drawingml/2006/main",
"cp": "http://schemas.openxmlformats.org/package/2006/metadata/core-properties",
"dc": "http://purl.org/dc/elements/1.1/",
"dcterms": "http://purl.org/dc/terms/",
"ep": "http://schemas.openxmlformats.org/officeDocument/2006/extended-properties",
"m": "http://schemas.openxmlformats.org/officeDocument/2006/math",
"mc": "http://schemas.openxmlformats.org/markup-compatibility/2006",
"o": "urn:schemas-microsoft-com:office:office",
"pkg": "http://schemas.microsoft.com/office/2006/xmlPackage",
"r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships",
"rel": "http://schemas.openxmlformats.org/package/2006/relationships",
"v": "urn:schemas-microsoft-com:vml",
"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main",
"wp": "http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing",
"wps": "http://schemas.microsoft.com/office/word/2010/wordprocessingShape",
}
TEXT_TAGS = {
f"{{{DOCX_NAMESPACES['w']}}}t",
f"{{{DOCX_NAMESPACES['w']}}}instrText",
f"{{{DOCX_NAMESPACES['a']}}}t",
f"{{{DOCX_NAMESPACES['m']}}}t",
}
@dataclass(frozen=True)
class DocxRelationship:
relationship_id: str
relationship_type: str
target: str
mode: str | None
@dataclass(frozen=True)
class DocxPart:
name: str
content_type: str | None
size: int
is_xml: bool
relationships: list[DocxRelationship]
@dataclass(frozen=True)
class DocxElement:
element_id: str
part_name: str
xpath: str
tag: str
kind: str
text: str
attributes: dict[str, str]
@dataclass(frozen=True)
class DocxImage:
part_name: str
size: int
content_type: str | None
referenced_by: list[str]
@dataclass(frozen=True)
class DocxExtraction:
source: str
parts: list[DocxPart]
elements: list[DocxElement]
images: list[DocxImage]
@dataclass(frozen=True)
class ReviewRowUpdate:
table_index: int
heading: str
sequence: str
review_content: str
result: str
def _local_name(qname: str) -> str:
if qname.startswith("{"):
return qname.rsplit("}", 1)[1]
return qname
def _part_uri(part_name: str) -> str:
return "/" + part_name.lstrip("/")
def _relationship_part_name(source_part: str) -> str:
if source_part == "_rels/.rels":
return source_part
source = Path(source_part)
return str(source.parent / "_rels" / f"{source.name}.rels")
def _resolve_relationship_target(source_part: str, target: str) -> str:
if target.startswith("/") or "://" in target:
return target.lstrip("/")
if source_part == "_rels/.rels":
return target
base = Path(source_part).parent
return str((base / target).as_posix())
def _content_type_for(part_name: str, defaults: dict[str, str], overrides: dict[str, str]) -> str | None:
overridden = overrides.get(_part_uri(part_name))
if overridden:
return overridden
suffix = Path(part_name).suffix.lstrip(".")
return defaults.get(suffix)
def _element_text(element: etree._Element) -> str:
tag = element.tag
if tag in TEXT_TAGS:
return element.text or ""
local = _local_name(tag)
if local in {"p", "tc", "tbl", "comment", "footnote", "endnote", "sdt"}:
return "".join(text for text in element.itertext()).strip()
return (element.text or "").strip()
def _element_kind(element: etree._Element) -> str:
local = _local_name(element.tag)
return {
"document": "document",
"body": "body",
"p": "paragraph",
"r": "run",
"t": "text",
"tab": "tab",
"br": "break",
"tbl": "table",
"tr": "table_row",
"tc": "table_cell",
"drawing": "drawing",
"pict": "picture",
"hyperlink": "hyperlink",
"sectPr": "section_properties",
"header": "header",
"footer": "footer",
"footnote": "footnote",
"endnote": "endnote",
"comment": "comment",
"style": "style",
"num": "numbering",
"abstractNum": "abstract_numbering",
}.get(local, local)
def _simplify_attributes(element: etree._Element) -> dict[str, str]:
simplified: dict[str, str] = {}
for key, value in element.attrib.items():
simplified[_local_name(key)] = value
return simplified
def _w_tag(local_name: str) -> str:
return f"{{{DOCX_NAMESPACES['w']}}}{local_name}"
def _xml_text(element: etree._Element) -> str:
return "".join(text for text in element.itertext()).strip()
def _table_rows(table: etree._Element) -> list[list[etree._Element]]:
return [row.findall(_w_tag("tc")) for row in table.findall(_w_tag("tr"))]
def _set_word_cell_text(cell: etree._Element, text: str) -> None:
tc_pr = cell.find(_w_tag("tcPr"))
for child in list(cell):
if child is not tc_pr:
cell.remove(child)
paragraph = etree.SubElement(cell, _w_tag("p"))
run = etree.SubElement(paragraph, _w_tag("r"))
text_node = etree.SubElement(run, _w_tag("t"))
text_node.text = text
def _find_review_header(rows: list[list[etree._Element]]) -> tuple[int, int, int, int, int, int] | None:
for row_index, cells in enumerate(rows):
texts = [_xml_text(cell) for cell in cells]
if "序号" not in texts or "审查内容" not in texts:
continue
for option_row_index in range(row_index, min(row_index + 3, len(rows))):
option_texts = [_xml_text(cell) for cell in rows[option_row_index]]
if {"通过", "未通过", "不适用"}.issubset(option_texts):
return (
option_row_index,
texts.index("序号"),
texts.index("审查内容"),
option_texts.index("通过"),
option_texts.index("未通过"),
option_texts.index("不适用"),
)
return None
def _is_sequence(value: str) -> bool:
return value.strip().isdigit()
class DocxPackage:
"""Read, inspect, and update a DOCX file without Office/COM automation.
A DOCX file is a ZIP package containing XML parts, relationship files, and
binary assets. This class exposes every XML element by part name and XPath,
while keeping non-XML parts byte-for-byte unless the caller replaces them.
"""
def __init__(self, path: Path | str) -> None:
self.path = Path(path)
if not self.path.exists():
raise FileNotFoundError(self.path)
if self.path.suffix.lower() != ".docx":
raise ValueError(f"Expected a .docx file: {self.path}")
self._raw_parts: dict[str, bytes] = {}
self._xml_trees: dict[str, etree._ElementTree] = {}
self._content_type_defaults: dict[str, str] = {}
self._content_type_overrides: dict[str, str] = {}
self._relationships: dict[str, list[DocxRelationship]] = {}
self._load_package()
def _load_package(self) -> None:
with zipfile.ZipFile(self.path) as archive:
for info in archive.infolist():
if info.is_dir():
continue
self._raw_parts[info.filename] = archive.read(info.filename)
self._load_content_types()
self._load_relationships()
def _load_content_types(self) -> None:
data = self._raw_parts.get("[Content_Types].xml")
if not data:
return
root = etree.fromstring(data)
for child in root:
local = _local_name(child.tag)
if local == "Default":
self._content_type_defaults[child.attrib["Extension"]] = child.attrib["ContentType"]
elif local == "Override":
self._content_type_overrides[child.attrib["PartName"]] = child.attrib["ContentType"]
def _load_relationships(self) -> None:
for part_name, data in self._raw_parts.items():
if not part_name.endswith(".rels"):
continue
root = etree.fromstring(data)
source_part = self._source_part_for_relationships(part_name)
relationships: list[DocxRelationship] = []
for child in root:
if _local_name(child.tag) != "Relationship":
continue
relationships.append(
DocxRelationship(
relationship_id=child.attrib.get("Id", ""),
relationship_type=child.attrib.get("Type", ""),
target=child.attrib.get("Target", ""),
mode=child.attrib.get("TargetMode"),
)
)
self._relationships[source_part] = relationships
@staticmethod
def _source_part_for_relationships(relationship_part: str) -> str:
if relationship_part == "_rels/.rels":
return relationship_part
marker = "/_rels/"
if marker not in relationship_part:
return relationship_part
directory, filename = relationship_part.split(marker, 1)
return f"{directory}/{filename.removesuffix('.rels')}"
def _parse_xml_part(self, part_name: str) -> etree._ElementTree | None:
if part_name in self._xml_trees:
return self._xml_trees[part_name]
data = self._raw_parts[part_name]
if not self._looks_like_xml(part_name, data):
return None
parser = etree.XMLParser(resolve_entities=False, remove_blank_text=False, huge_tree=True)
try:
tree = etree.ElementTree(etree.fromstring(data, parser=parser))
except etree.XMLSyntaxError:
return None
self._xml_trees[part_name] = tree
return tree
def _looks_like_xml(self, part_name: str, data: bytes) -> bool:
content_type = self.content_type(part_name) or ""
if part_name.endswith((".xml", ".rels")):
return True
return "xml" in content_type or data.lstrip().startswith(b"<")
def content_type(self, part_name: str) -> str | None:
return _content_type_for(part_name, self._content_type_defaults, self._content_type_overrides)
def parts(self) -> list[DocxPart]:
result: list[DocxPart] = []
for part_name in sorted(self._raw_parts):
tree = self._parse_xml_part(part_name)
result.append(
DocxPart(
name=part_name,
content_type=self.content_type(part_name),
size=len(self._raw_parts[part_name]),
is_xml=tree is not None,
relationships=self._relationships.get(part_name, []),
)
)
return result
def iter_elements(self, part_patterns: Iterable[str] | None = None) -> Iterable[DocxElement]:
patterns = tuple(part_patterns or ("*.xml", "*.rels"))
for part_name in sorted(self._raw_parts):
if not any(fnmatch.fnmatch(part_name, pattern) for pattern in patterns):
continue
tree = self._parse_xml_part(part_name)
if tree is None:
continue
for element in tree.iter():
xpath = tree.getpath(element)
yield DocxElement(
element_id=f"{part_name}::{xpath}",
part_name=part_name,
xpath=xpath,
tag=_local_name(element.tag),
kind=_element_kind(element),
text=_element_text(element),
attributes=_simplify_attributes(element),
)
def text_blocks(self) -> list[DocxElement]:
return [element for element in self.iter_elements(("word/*.xml",)) if element.text]
def images(self) -> list[DocxImage]:
references: dict[str, list[str]] = {}
for source_part, relationships in self._relationships.items():
for relationship in relationships:
if relationship.mode == "External":
continue
target = _resolve_relationship_target(source_part, relationship.target)
references.setdefault(target, []).append(f"{source_part}#{relationship.relationship_id}")
images: list[DocxImage] = []
for part_name in sorted(self._raw_parts):
content_type = self.content_type(part_name) or ""
if content_type.startswith("image/") or part_name.startswith("word/media/"):
images.append(
DocxImage(
part_name=part_name,
size=len(self._raw_parts[part_name]),
content_type=content_type or None,
referenced_by=references.get(part_name, []),
)
)
return images
def extract(self, part_patterns: Iterable[str] | None = None) -> DocxExtraction:
return DocxExtraction(
source=str(self.path),
parts=self.parts(),
elements=list(self.iter_elements(part_patterns)),
images=self.images(),
)
def xpath(self, part_name: str, expression: str) -> list[etree._Element]:
tree = self._parse_xml_part(part_name)
if tree is None:
raise ValueError(f"Part is not parseable XML: {part_name}")
return tree.xpath(expression, namespaces=DOCX_NAMESPACES)
def replace_text(self, old: str, new: str, part_patterns: Iterable[str] | None = None) -> int:
if not old:
raise ValueError("old text must not be empty")
patterns = tuple(part_patterns or ("word/*.xml", "docProps/*.xml"))
replacements = 0
for part_name in sorted(self._raw_parts):
if not any(fnmatch.fnmatch(part_name, pattern) for pattern in patterns):
continue
tree = self._parse_xml_part(part_name)
if tree is None:
continue
rewritten_nodes: set[etree._Element] = set()
for container in tree.xpath(".//w:p | .//a:p", namespaces=DOCX_NAMESPACES):
text_nodes = [node for node in container.iter() if node.tag in TEXT_TAGS and node.text]
if len(text_nodes) < 2:
continue
joined = "".join(node.text or "" for node in text_nodes)
if old not in joined or any(old in (node.text or "") for node in text_nodes):
continue
replacements += joined.count(old)
text_nodes[0].text = joined.replace(old, new)
for node in text_nodes[1:]:
node.text = ""
rewritten_nodes.update(text_nodes)
for element in tree.iter():
if element in rewritten_nodes:
continue
if element.text and old in element.text:
replacements += element.text.count(old)
element.text = element.text.replace(old, new)
if element.tail and old in element.tail:
replacements += element.tail.count(old)
element.tail = element.tail.replace(old, new)
return replacements
def set_element_text(self, element_id: str, text: str) -> None:
try:
part_name, xpath = element_id.split("::", 1)
except ValueError as exc:
raise ValueError(f"Invalid element id: {element_id}") from exc
matches = self.xpath(part_name, xpath)
if len(matches) != 1:
raise ValueError(f"Expected exactly one element for {element_id}, found {len(matches)}")
matches[0].text = text
def update_xml(self, part_name: str, xpath: str, updater: Callable[[etree._Element], None]) -> int:
matches = self.xpath(part_name, xpath)
for element in matches:
updater(element)
return len(matches)
def replace_part(self, part_name: str, data: bytes) -> None:
if part_name not in self._raw_parts:
raise KeyError(part_name)
self._raw_parts[part_name] = data
self._xml_trees.pop(part_name, None)
def fill_review_results(
self,
heading_contains: str | None = None,
result: str = "通过",
mark: str = "",
) -> list[ReviewRowUpdate]:
if result not in {"通过", "未通过", "不适用"}:
raise ValueError("result must be one of: 通过, 未通过, 不适用")
tree = self._parse_xml_part("word/document.xml")
if tree is None:
raise ValueError("word/document.xml is not parseable XML")
body = tree.getroot().find(_w_tag("body"))
if body is None:
return []
updates: list[ReviewRowUpdate] = []
current_heading = ""
review_table_index = 0
for child in body:
if child.tag == _w_tag("p"):
paragraph_text = _xml_text(child)
if paragraph_text:
current_heading = paragraph_text
continue
if child.tag != _w_tag("tbl"):
continue
rows = _table_rows(child)
header = _find_review_header(rows)
if header is None:
continue
review_table_index += 1
if heading_contains and heading_contains not in current_heading:
continue
header_row_index, sequence_col, content_col, pass_col, fail_col, na_col = header
option_columns = {
"通过": pass_col,
"未通过": fail_col,
"不适用": na_col,
}
selected_col = option_columns[result]
for cells in rows[header_row_index + 1 :]:
if max(sequence_col, content_col, pass_col, fail_col, na_col) >= len(cells):
continue
sequence = _xml_text(cells[sequence_col])
if not _is_sequence(sequence):
continue
review_content = _xml_text(cells[content_col])
for option_col in (pass_col, fail_col, na_col):
_set_word_cell_text(cells[option_col], mark if option_col == selected_col else "")
updates.append(
ReviewRowUpdate(
table_index=review_table_index,
heading=current_heading,
sequence=sequence,
review_content=review_content,
result=result,
)
)
return updates
def save(self, output_path: Path | str) -> Path:
destination = Path(output_path)
destination.parent.mkdir(parents=True, exist_ok=True)
with NamedTemporaryFile(delete=False, suffix=".docx", dir=destination.parent) as tmp:
temp_path = Path(tmp.name)
try:
with zipfile.ZipFile(temp_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for part_name in sorted(self._raw_parts):
if part_name in self._xml_trees:
data = etree.tostring(
self._xml_trees[part_name],
encoding="UTF-8",
xml_declaration=True,
standalone=None,
)
else:
data = self._raw_parts[part_name]
archive.writestr(part_name, data)
shutil.move(str(temp_path), destination)
finally:
if temp_path.exists():
temp_path.unlink()
return destination
def _to_jsonable(value: Any) -> Any:
if hasattr(value, "__dataclass_fields__"):
return asdict(value)
if isinstance(value, list):
return [_to_jsonable(item) for item in value]
if isinstance(value, dict):
return {key: _to_jsonable(item) for key, item in value.items()}
return value
def _command_extract(args: argparse.Namespace) -> None:
package = DocxPackage(args.docx)
extraction = package.extract(args.part)
print(json.dumps(_to_jsonable(extraction), ensure_ascii=False, indent=2))
def _command_text(args: argparse.Namespace) -> None:
package = DocxPackage(args.docx)
for block in package.text_blocks():
print(f"{block.element_id}\t{block.kind}\t{block.text}")
def _command_replace(args: argparse.Namespace) -> None:
package = DocxPackage(args.docx)
count = package.replace_text(args.old, args.new, args.part)
package.save(args.output)
print(f"replacements={count}")
print(f"output={args.output}")
def _command_fill_review_results(args: argparse.Namespace) -> None:
package = DocxPackage(args.docx)
updates = package.fill_review_results(
heading_contains=args.heading,
result=args.result,
mark=args.mark,
)
package.save(args.output)
print(f"updated_rows={len(updates)}")
for update in updates:
print(f"{update.heading}\t{update.sequence}\t{update.result}\t{update.review_content}")
print(f"output={args.output}")
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Extract and modify DOCX Open XML package content.")
subparsers = parser.add_subparsers(dest="command", required=True)
extract = subparsers.add_parser("extract", help="Print all package parts and XML elements as JSON.")
extract.add_argument("docx", type=Path)
extract.add_argument("--part", action="append", help="Glob pattern such as word/*.xml; can be repeated.")
extract.set_defaults(func=_command_extract)
text = subparsers.add_parser("text", help="Print text-bearing DOCX elements.")
text.add_argument("docx", type=Path)
text.set_defaults(func=_command_text)
replace = subparsers.add_parser("replace", help="Replace text in XML parts and save a new DOCX.")
replace.add_argument("docx", type=Path)
replace.add_argument("old")
replace.add_argument("new")
replace.add_argument("-o", "--output", type=Path, required=True)
replace.add_argument("--part", action="append", help="Glob pattern such as word/*.xml; can be repeated.")
replace.set_defaults(func=_command_replace)
fill = subparsers.add_parser(
"fill-review-results",
help="Fill mutually exclusive review-result columns in tables with 序号/审查内容/通过/未通过/不适用 headers.",
)
fill.add_argument("docx", type=Path)
fill.add_argument("-o", "--output", type=Path, required=True)
fill.add_argument("--heading", help="Only update review tables after a heading containing this text.")
fill.add_argument("--result", choices=("通过", "未通过", "不适用"), default="通过")
fill.add_argument("--mark", default="")
fill.set_defaults(func=_command_fill_review_results)
return parser
def main() -> None:
parser = build_arg_parser()
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()
"""
- 命令行使用
- python scripts/docx_full_parser.py text test/附录A文档审查.docx
- python scripts/docx_full_parser.py extract test/附录A文档审查.docx
- python scripts/docx_full_parser.py replace input.docx 原文 新文 -o output.docx
"""