539 lines
30 KiB
Python
539 lines
30 KiB
Python
from io import BytesIO
|
||
import copy
|
||
from docx.parts.image import ImagePart
|
||
from pathlib import Path
|
||
from django.conf import settings
|
||
from django.core.files.storage import FileSystemStorage
|
||
from utils.path_utils import project_path
|
||
from ninja import File, UploadedFile
|
||
from ninja.errors import HttpError
|
||
from ninja_extra.controllers import api_controller, ControllerBase, route
|
||
from ninja_jwt.authentication import JWTAuth
|
||
from ninja_extra.permissions import IsAuthenticated
|
||
from django.db import transaction
|
||
from django.shortcuts import get_object_or_404
|
||
from django.db.models import QuerySet
|
||
from docx import Document
|
||
from docxtpl import DocxTemplate
|
||
# 工具
|
||
from apps.createSeiTaiDocument.docXmlUtils import generate_temp_doc, get_frag_from_document
|
||
from apps.createSeiTaiDocument.schema import SeitaiInputSchema
|
||
from utils.chen_response import ChenResponse
|
||
from apps.project.models import Project, Dut
|
||
from apps.createDocument.extensions.documentTime import DocTime
|
||
from utils.util import get_str_dict
|
||
from apps.createSeiTaiDocument.extensions.download_response import get_file_respone
|
||
# 图片工具docx
|
||
from apps.createSeiTaiDocument.extensions.shape_size_tool import set_shape_size
|
||
# 修改temp文本片段工具
|
||
from apps.createSeiTaiDocument.docXmlUtils import get_jinja_stdContent_element, stdContent_modify
|
||
|
||
main_download_path = Path(settings.BASE_DIR) / 'media'
|
||
|
||
# @api_controller("/create", tags=['生成产品文档接口'], auth=JWTAuth(), permissions=[IsAuthenticated])
|
||
@api_controller("/create", tags=['生成产品文档接口'])
|
||
class GenerateSeitaiController(ControllerBase):
|
||
chinese_round_name: list = ['一', '二', '三', '四', '五', '六', '七', '八', '九', '十']
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
super().__init__(*args, **kwargs)
|
||
self.project_obj: Project | None = None
|
||
self.temp_context = {}
|
||
|
||
@route.post("/dgDocument", url_name="create-dgDocument")
|
||
@transaction.atomic
|
||
def create_dgDocument(self, payload: SeitaiInputSchema):
|
||
# 获取项目Model
|
||
self.project_obj = get_object_or_404(Project, id=payload.id)
|
||
# 生成大纲需要的文本片段信息储存在字典里面
|
||
sec_title = get_str_dict(self.project_obj.secret, 'secret')
|
||
duty_person = self.project_obj.duty_person
|
||
is_jd = True if self.project_obj.report_type == '9' else False
|
||
self.temp_context = {
|
||
'is_jd': is_jd,
|
||
'jd_or_third': "鉴定" if is_jd else "第三方",
|
||
'project_ident': self.project_obj.ident,
|
||
'project_name': self.project_obj.name,
|
||
'test_purpose': "装备鉴定和列装定型" if is_jd else "软件交付和使用",
|
||
'sec_title': '密级:' + sec_title,
|
||
'sec': sec_title,
|
||
'duty_person': duty_person,
|
||
'member': self.project_obj.member[0] if len(
|
||
self.project_obj.member) > 0 else duty_person,
|
||
'entrust_unit': self.project_obj.entrust_unit
|
||
} | DocTime(payload.id).dg_final_time() # python3.9以上推荐使用|运算符合并
|
||
# 调用self添加temp_context信息
|
||
self.get_first_round_code_ident()
|
||
self.get_xq_doc_informations()
|
||
result = generate_temp_doc('dg', payload.id, frag_list=payload.frag)
|
||
if isinstance(result, dict):
|
||
return ChenResponse(status=400, code=400,
|
||
message=result.get('msg', 'dg未报出错误原因,反正在生成文档出错'))
|
||
dg_replace_path, dg_seitai_final_path = result
|
||
# ~~~~start:2025/04/19-新增渲染单个字段(可能封装为函数-对temp文件下的jinja字段处理)~~~~
|
||
# 现在已经把alias和stdContent对应起来了
|
||
text_frag_name_list, doc_docx = get_jinja_stdContent_element(dg_replace_path)
|
||
# 遍历找出来的文本片段进行修改
|
||
self.text_frag_replace_handle(text_frag_name_list, doc_docx)
|
||
# ~~~~end~~~~
|
||
try:
|
||
doc_docx.save(dg_seitai_final_path)
|
||
# 文件下载
|
||
return get_file_respone(payload.id, '测评大纲')
|
||
except PermissionError as e:
|
||
return ChenResponse(status=400, code=400, message="文档未生成或生成错误!,{0}".format(e))
|
||
|
||
@route.post('/smDocument', url_name='create-smDocument')
|
||
@transaction.atomic
|
||
def create_smDocument(self, payload: SeitaiInputSchema):
|
||
"""生成最后说明文档"""
|
||
# 获取项目对象
|
||
self.project_obj = get_object_or_404(Project, id=payload.id)
|
||
# 首先第二层模版所需变量
|
||
is_jd = True if self.project_obj.report_type == '9' else False
|
||
self.temp_context = {
|
||
'project_name': self.project_obj.name,
|
||
'project_ident': self.project_obj.ident,
|
||
'is_jd': is_jd,
|
||
'jd_or_third': "鉴定" if is_jd else "第三方",
|
||
'ident': self.project_obj.ident,
|
||
'sec_title': get_str_dict(self.project_obj.secret, 'secret'),
|
||
'sec': get_str_dict(self.project_obj.secret, 'secret'),
|
||
'duty_person': self.project_obj.duty_person,
|
||
'member': self.project_obj.member[0] if len(
|
||
self.project_obj.member) > 0 else self.project_obj.duty_person,
|
||
} | DocTime(payload.id).sm_final_time()
|
||
self.get_first_round_code_ident()
|
||
# 文档片段操作
|
||
result = generate_temp_doc('sm', payload.id, frag_list=payload.frag)
|
||
if isinstance(result, dict):
|
||
return ChenResponse(code=400, status=400, message=result.get('msg', '无错误原因'))
|
||
sm_to_tpl_file, sm_seitai_final_file = result
|
||
|
||
# 文本片段操作
|
||
text_frag_name_list, doc_docx = get_jinja_stdContent_element(sm_to_tpl_file)
|
||
self.text_frag_replace_handle(text_frag_name_list, doc_docx)
|
||
# 注册时间变量
|
||
try:
|
||
doc_docx.save(sm_seitai_final_file)
|
||
return get_file_respone(payload.id, '测试说明')
|
||
except PermissionError as e:
|
||
return ChenResponse(status=400, code=400, message="模版文件已打开,请关闭后再试,{0}".format(e))
|
||
|
||
@route.post('/jlDocument', url_name='create-jlDocument')
|
||
@transaction.atomic
|
||
def create_jlDocument(self, payload: SeitaiInputSchema):
|
||
self.project_obj = get_object_or_404(Project, id=payload.id)
|
||
# seitai文档所需变量
|
||
is_jd = True if self.project_obj.report_type == '9' else False
|
||
member = self.project_obj.member[0] if len(
|
||
self.project_obj.member) > 0 else self.project_obj.duty_person
|
||
self.temp_context = {
|
||
'project_name': self.project_obj.name,
|
||
'project_ident': self.project_obj.ident,
|
||
'is_jd': is_jd,
|
||
'name': self.project_obj.name,
|
||
'ident': self.project_obj.ident,
|
||
'sec_title': get_str_dict(self.project_obj.secret, 'secret'),
|
||
'duty_person': self.project_obj.duty_person, 'member': member
|
||
} | DocTime(payload.id).jl_final_time()
|
||
self.get_xq_doc_informations() # 添加文本片段“xq_version”
|
||
result = generate_temp_doc('jl', payload.id, frag_list=payload.frag)
|
||
if isinstance(result, dict):
|
||
return ChenResponse(code=400, status=400, message=result.get('msg', '无错误原因'))
|
||
jl_to_tpl_file, jl_seitai_final_file = result
|
||
text_frag_name_list, doc_docx = get_jinja_stdContent_element(jl_to_tpl_file)
|
||
# 文本片段操作
|
||
self.text_frag_replace_handle(text_frag_name_list, doc_docx)
|
||
# 重新设置时序图大小
|
||
for shape in doc_docx.inline_shapes:
|
||
set_shape_size(shape)
|
||
try:
|
||
doc_docx.save(jl_seitai_final_file)
|
||
return get_file_respone(payload.id, '测试记录')
|
||
except PermissionError as e:
|
||
return ChenResponse(status=400, code=400, message="模版文件已打开,请关闭后再试,{0}".format(e))
|
||
|
||
@route.post('/hsmDocument', url_name='create-hsmDocument')
|
||
@transaction.atomic
|
||
def create_hsmDocument(self, payload: SeitaiInputSchema):
|
||
"""生成最后的回归测试说明-(多个文档)"""
|
||
self.project_obj = get_object_or_404(Project, id=payload.id)
|
||
hround_list: QuerySet = self.project_obj.pField.exclude(key='0') # 非第一轮次
|
||
cname_list = []
|
||
if len(hround_list) < 1:
|
||
return ChenResponse(code=400, status=400, message='无回归轮次,请添加后再生成')
|
||
for hround in hround_list:
|
||
# 获取当前轮次中文数字
|
||
cname = self.chinese_round_name[int(hround.key)]
|
||
# 将cname存入一个list,以便后续拼接给下载函数
|
||
cname_list.append(cname)
|
||
is_jd = True if self.project_obj.report_type == '9' else False
|
||
member = self.project_obj.member[0] if len(
|
||
self.project_obj.member) > 0 else self.project_obj.duty_person
|
||
# 回归轮次的标识和版本
|
||
so_dut: Dut = hround.rdField.filter(type='SO').first()
|
||
if not so_dut:
|
||
return ChenResponse(status=400, code=400, message=f'您缺少第{cname}轮的源代码被测件')
|
||
# 每次循环会更新temp_context
|
||
self.temp_context = {
|
||
'project_name': self.project_obj.name,
|
||
'project_ident': self.project_obj.ident,
|
||
'is_jd': is_jd,
|
||
'sec_title': get_str_dict(self.project_obj.secret, 'secret'),
|
||
'duty_person': self.project_obj.duty_person,
|
||
'member': member,
|
||
'round_num': str(int(hround.key) + 1),
|
||
'round_num_chn': cname,
|
||
'soft_ident': so_dut.ref,
|
||
'soft_version': so_dut.version,
|
||
'location': hround.location,
|
||
} | DocTime(payload.id).hsm_final_time(hround.key)
|
||
# 注意回归测试说明、回归测试记录都生成多个文档
|
||
result = generate_temp_doc('hsm', payload.id, round_num=cname, frag_list=payload.frag)
|
||
if isinstance(result, dict):
|
||
return ChenResponse(status=400, code=400,
|
||
message=result.get('msg', '回归测试说明生成报错...'))
|
||
hsm_replace_path, hsm_seitai_final_path = result
|
||
text_frag_name_list, doc_docx = get_jinja_stdContent_element(hsm_replace_path)
|
||
# 文本片段操作
|
||
self.text_frag_replace_handle(text_frag_name_list, doc_docx)
|
||
try:
|
||
doc_docx.save(hsm_seitai_final_path)
|
||
except PermissionError as e:
|
||
return ChenResponse(status=400, code=400, message="模版文件已打开,请关闭后再试,{0}".format(e))
|
||
# 因为回归说明、回归记录可能有多份,多份下载zip否则docx
|
||
if len(cname_list) == 1:
|
||
return get_file_respone(payload.id, '第二轮回归测试说明')
|
||
else:
|
||
return get_file_respone(payload.id, list(map(lambda x: f"第{x}轮回归测试说明", cname_list)))
|
||
|
||
@route.post('/hjlDocument', url_name='create-hjlDocument')
|
||
@transaction.atomic
|
||
def create_hjlDocument(self, payload: SeitaiInputSchema):
|
||
"""生成最后的回归测试记录-(多个文档)"""
|
||
self.project_obj: Project = get_object_or_404(Project, id=payload.id)
|
||
# 取非第一轮次
|
||
hround_list: QuerySet = self.project_obj.pField.exclude(key='0')
|
||
cname_list = []
|
||
if len(hround_list) < 1:
|
||
return ChenResponse(code=400, status=400, message='无回归测试轮次,请创建后再试')
|
||
for hround in hround_list:
|
||
# 取出当前轮次key减1就是上一轮次
|
||
cname = self.chinese_round_name[int(hround.key)] # 输出二、三...
|
||
cname_list.append(cname)
|
||
member = self.project_obj.member[0] if len(
|
||
self.project_obj.member) > 0 else self.project_obj.duty_person
|
||
is_jd = True if self.project_obj.report_type == '9' else False
|
||
so_dut: Dut = hround.rdField.filter(type='SO').first()
|
||
if not so_dut:
|
||
return ChenResponse(status=400, code=400, message=f'您缺少第{cname}轮的源代码被测件')
|
||
self.temp_context = {
|
||
'project_name': self.project_obj.name,
|
||
'project_ident': self.project_obj.ident,
|
||
'is_jd': is_jd,
|
||
'sec_title': get_str_dict(self.project_obj.secret, 'secret'),
|
||
'duty_person': self.project_obj.duty_person,
|
||
'member': member,
|
||
'round_num': str(int(hround.key) + 1),
|
||
'round_num_chn': cname,
|
||
'soft_ident': so_dut.ref,
|
||
'soft_version': so_dut.version,
|
||
} | DocTime(payload.id).hjl_final_time(hround.key)
|
||
|
||
result = generate_temp_doc('hjl', payload.id, round_num=cname, frag_list=payload.frag)
|
||
if isinstance(result, dict):
|
||
return ChenResponse(status=400, code=400,
|
||
message=result.get('msg', '回归测试记录生成错误!'))
|
||
hjl_replace_path, hjl_seitai_final_path = result
|
||
text_frag_name_list, doc_docx = get_jinja_stdContent_element(hjl_replace_path)
|
||
# 文本片段操作
|
||
self.text_frag_replace_handle(text_frag_name_list, doc_docx)
|
||
# 重新设置时序图大小(注意不变)
|
||
for shape in doc_docx.inline_shapes:
|
||
set_shape_size(shape)
|
||
try:
|
||
doc_docx.save(hjl_seitai_final_path)
|
||
except PermissionError as e:
|
||
return ChenResponse(status=400, code=400, message="模版文件已打开,请关闭后再试,{0}".format(e))
|
||
if len(cname_list) == 1:
|
||
return get_file_respone(payload.id, '第二轮回归测试记录')
|
||
else:
|
||
return get_file_respone(payload.id, list(map(lambda x: f"第{x}轮回归测试说明", cname_list)))
|
||
|
||
@route.post('/wtdDocument', url_name='create-wtdDocument')
|
||
@transaction.atomic
|
||
def create_wtdDocument(self, payload: SeitaiInputSchema):
|
||
"""生成最后的问题单"""
|
||
self.project_obj = get_object_or_404(Project, id=payload.id)
|
||
# seitai文档所需变量
|
||
member = self.project_obj.member[0] if len(
|
||
self.project_obj.member) > 0 else self.project_obj.duty_person
|
||
is_jd = True if self.project_obj.report_type == '9' else False
|
||
self.temp_context = {
|
||
"project_name": self.project_obj.name,
|
||
'project_ident': self.project_obj.ident,
|
||
'is_jd': is_jd,
|
||
'member': member,
|
||
'duty_person': self.project_obj.duty_person,
|
||
'sec_title': get_str_dict(self.project_obj.secret, 'secret'),
|
||
} | DocTime(payload.id).wtd_final_time()
|
||
result = generate_temp_doc('wtd', payload.id, frag_list=payload.frag)
|
||
if isinstance(result, dict):
|
||
return ChenResponse(status=400, code=400,
|
||
message=result.get('msg', 'wtd未报出错误原因,反正在生成文档出错'))
|
||
wtd_replace_path, wtd_seitai_final_path = result
|
||
text_frag_name_list, doc_docx = get_jinja_stdContent_element(wtd_replace_path)
|
||
# 文本片段操作
|
||
self.text_frag_replace_handle(text_frag_name_list, doc_docx)
|
||
try:
|
||
doc_docx.save(wtd_seitai_final_path)
|
||
return get_file_respone(payload.id, '问题单')
|
||
except PermissionError as e:
|
||
return ChenResponse(status=400, code=400, message="模版文件已打开,请关闭后再试,{0}".format(e))
|
||
|
||
@route.post('/bgDocument', url_name='create-bgDocument')
|
||
@transaction.atomic
|
||
def create_bgDocument(self, payload: SeitaiInputSchema):
|
||
"""生成最后的报告文档"""
|
||
self.project_obj = get_object_or_404(Project, id=payload.id)
|
||
# seitai文档所需变量
|
||
## 1.判断是否为JD
|
||
member = self.project_obj.member[0] if len(
|
||
self.project_obj.member) > 0 else self.project_obj.duty_person
|
||
is_jd = True if self.project_obj.report_type == '9' else False
|
||
self.temp_context = {
|
||
'project_name': self.project_obj.name,
|
||
'project_ident': self.project_obj.ident,
|
||
'test_purpose': "装备鉴定和列装定型" if is_jd else "软件交付和使用",
|
||
'is_jd': is_jd,
|
||
'sec_title': get_str_dict(self.project_obj.secret, 'secret'),
|
||
'duty_person': self.project_obj.duty_person,
|
||
'jd_or_third': "鉴定" if is_jd else "第三方",
|
||
'entrust_unit': self.project_obj.entrust_unit,
|
||
'member': member,
|
||
'joined_part': f'驻{self.project_obj.dev_unit}军事代表室、{self.project_obj.dev_unit}',
|
||
} | DocTime(payload.id).bg_final_time()
|
||
result = generate_temp_doc('bg', payload.id, frag_list=payload.frag)
|
||
if isinstance(result, dict):
|
||
return ChenResponse(status=400, code=400,
|
||
message=result.get('msg', 'bg未报出错误原因,反正在生成文档出错'))
|
||
bg_replace_path, bg_seitai_final_path = result
|
||
text_frag_name_list, doc_docx = get_jinja_stdContent_element(bg_replace_path)
|
||
# 文本片段操作
|
||
self.text_frag_replace_handle(text_frag_name_list, doc_docx)
|
||
try:
|
||
doc_docx.save(bg_seitai_final_path)
|
||
return get_file_respone(payload.id, '测评报告')
|
||
except PermissionError as e:
|
||
return ChenResponse(status=400, code=400, message="模版文件已打开,请关闭后再试,{0}".format(e))
|
||
|
||
# ~~~~模版设计模式~~~~
|
||
# 1.传入sdtContent列表,和替换后的文档对象,进行替换操作
|
||
def text_frag_replace_handle(self, text_frag_name_list, doc_docx: Document):
|
||
for text_frag in text_frag_name_list:
|
||
alias = text_frag['alias']
|
||
if alias in self.temp_context:
|
||
sdtContent = text_frag['sdtContent']
|
||
stdContent_modify(self.temp_context[alias], doc_docx, sdtContent)
|
||
else:
|
||
print('未查找的文本片段变量:', alias)
|
||
|
||
# ~~~~下面是生成文档辅助文本片段取变量,统一设置报错信息等,后续看重复代码封装~~~~
|
||
# self拥有变量:self.project_obj / self.temp_context(用于替换文本片段的字典)
|
||
# 1.获取项目第一轮round的源代码dut的用户标识/版本/第一轮测试地点
|
||
def get_first_round_code_ident(self):
|
||
round_obj = self.project_obj.pField.filter(key='0').first()
|
||
if round_obj:
|
||
self.temp_context.update({
|
||
'location': round_obj.location,
|
||
})
|
||
code_dut_obj = round_obj.rdField.filter(type='SO').first()
|
||
if code_dut_obj:
|
||
self.temp_context.update({
|
||
'soft_ident': code_dut_obj.ref,
|
||
'soft_version': code_dut_obj.version,
|
||
})
|
||
return
|
||
raise HttpError(500, "第一轮次未创建,或第一轮动态测试地点为填写,或源代码被测件未创建,请先创建")
|
||
|
||
# 2.获取第一轮次需求规格说明dut
|
||
def get_xq_doc_informations(self):
|
||
round1_xq_dut = self.project_obj.pdField.filter(round__key='0', type='XQ').first()
|
||
if round1_xq_dut:
|
||
self.temp_context.update({'xq_version': round1_xq_dut.version})
|
||
return
|
||
raise HttpError(500, "第一轮次被测件:需求规格说明可能未创建,生成文档失败")
|
||
|
||
# documentType - 对应的目录名称
|
||
documentType_to_dir = {
|
||
'测评大纲': '',
|
||
'测试说明': 'sm',
|
||
'测试记录': 'jl',
|
||
'回归测试说明': 'hsm',
|
||
'回归测试记录': 'hjl',
|
||
'问题单': 'wtd',
|
||
'测评报告': 'bg'
|
||
}
|
||
|
||
# 处理文档片段相关请求
|
||
@api_controller('/createfragment', tags=['生成文档-文档片段接口集合'])
|
||
class CreateFragmentController(ControllerBase):
|
||
@route.get("/get_fragments", url_name='get-fragments')
|
||
def get_fragements(self, id: int, documentType: str):
|
||
"""根据项目id和文档类型获取有哪些文档片段"""
|
||
# 获取文档片段的字符串列表
|
||
frags = self.get_fragment_name_by_document_name(id, documentType)
|
||
# 如果没有文档片段-说明没有生成二段文档
|
||
if not frags:
|
||
return ChenResponse(status=500, code=500,
|
||
message='文档片段还未生成,请关闭后再打开/或者先下载基础文档')
|
||
# 到这里说fragments_files数组有值,返回文件名数组
|
||
return ChenResponse(data=[fragment for fragment in frags], message='返回文档片段成功')
|
||
|
||
@staticmethod
|
||
def get_fragment_name_by_document_name(id: int, document_name: str):
|
||
# 1.找到模版的路径 - 不用异常肯定存在
|
||
document_path = main_download_path / project_path(
|
||
id) / 'form_template' / 'products' / f"{document_name}.docx"
|
||
# 2.识别其中的文档片段
|
||
frag_list = get_frag_from_document(document_path)
|
||
# 3.这里处理报告里第十轮次前端展示问题
|
||
## 3.1先判断是否为报告
|
||
if document_name == '测评报告':
|
||
## 3.2然后判断有几个轮次
|
||
project_obj = get_object_or_404(Project, id=id)
|
||
round_qs = project_obj.pField.all()
|
||
white_list_frag = []
|
||
## 3.3将希望有的片段名称加入白名单
|
||
for round_obj in round_qs:
|
||
chn_num = digit_to_chinese(int(round_obj.key) + 1)
|
||
exclude_str = f"测试内容和结果_第{chn_num}轮次" # 组成识别字符串
|
||
white_list_frag.append(exclude_str)
|
||
## 3.4过滤包含“测试内容和结果的轮次在白名单的通过”
|
||
# 去掉所有“测试内容和结果_”的片段
|
||
filter_frags = list(filter(lambda x: '测试内容和结果' not in x['frag_name'], frag_list))
|
||
# 再找到白名单的“测试内容和结果_”的片段
|
||
content_and_result_frags = list(
|
||
filter(lambda x: '测试内容和结果' in x['frag_name'] and x['frag_name'] in white_list_frag,
|
||
frag_list))
|
||
# 再组合起来返回
|
||
filter_frags.extend(content_and_result_frags)
|
||
return filter_frags
|
||
return frag_list
|
||
|
||
@route.get("/get_round_exit", url_name='get-round-exit')
|
||
def get_round_exit(self, id: int):
|
||
"""该函数主要识别有几轮回归测试说明、几轮回归测试记录"""
|
||
project_obj: Project = get_object_or_404(Project, id=id)
|
||
# 取非第一轮次的轮次的个数
|
||
round_count = project_obj.pField.exclude(key='0').count()
|
||
return {'count': round_count}
|
||
|
||
# 自定义修改Django的文件系统-启动覆盖模式
|
||
class OverwriteStorage(FileSystemStorage):
|
||
def __init__(self, *args, **kwargs):
|
||
kwargs['allow_overwrite'] = True # 启用覆盖模式
|
||
super().__init__(*args, **kwargs)
|
||
|
||
def digit_to_chinese(num):
|
||
num_dict = {'0': '零', '1': '一', '2': '二', '3': '三', '4': '四',
|
||
'5': '五', '6': '六', '7': '七', '8': '八', '9': '九', '10': '十'}
|
||
return ''.join(num_dict[d] for d in str(num))
|
||
|
||
# 处理用户上传有文档片段的产品文档文件:注意回归测试说明、回归测试记录需要单独处理
|
||
@api_controller('/documentUpload', tags=['生成文档-上传模版文档接口'])
|
||
class UploadDocumentController(ControllerBase):
|
||
def __init__(self, *args, **kwargs):
|
||
super().__init__(*args, **kwargs)
|
||
# 储存上传文件
|
||
self.upload_file: UploadedFile | None = None
|
||
|
||
@route.post("/file", url_name='upload-file')
|
||
def upload_file(self, id: int, documentType: str, file: File[UploadedFile], round_num: int = None):
|
||
self.upload_file = file
|
||
# 1.获取储存路径
|
||
target_dir = main_download_path / project_path(id) / 'form_template' / 'products'
|
||
# 2.初始化文件系统
|
||
fs = OverwriteStorage(location=target_dir)
|
||
|
||
# 新:如果是大纲片段,则大纲所有片段以文档片段方式储存在/reuse文件夹下面
|
||
if documentType == '测评大纲':
|
||
self.get_dg_to_reuse_dir(target_dir.parent.parent / 'reuse')
|
||
|
||
if round_num is None:
|
||
# 处理非“回归测试说明”/“回归测试记录”文档的上传
|
||
# warning:不校验文档内是否有文档片段,由用户保证上传内容
|
||
# 3.覆盖储存,注意会返回文件的name属性
|
||
fs.save(f"{documentType}.docx", self.upload_file)
|
||
else:
|
||
# 处理“回归测试说明”/“回归测试记录”文档的上传
|
||
fs.save(f"第{digit_to_chinese(round_num)}轮{documentType}.docx", self.upload_file)
|
||
return ChenResponse(status=200, code=200, message=f'上传{documentType}成功!')
|
||
|
||
def get_dg_to_reuse_dir(self, reuse_dir_path: Path):
|
||
"""将大纲的文档片段储存在/reuse文件夹下面(保留图片)"""
|
||
src_doc = Document(self.upload_file)
|
||
frag_list = self.get_document_frag_list(src_doc)
|
||
|
||
for frag_item in frag_list:
|
||
if frag_item['content'] is None:
|
||
continue
|
||
|
||
# 1. 创建目标文档(基于 basic_doc.docx 模板)
|
||
new_doc = Document((reuse_dir_path / 'basic_doc.docx').as_posix())
|
||
new_doc.element.body.clear_content()
|
||
|
||
# 2. 逐元素复制 XML,并修复图片引用
|
||
for child in frag_item['content'].iterchildren():
|
||
self._copy_element_with_images(child, src_doc, new_doc)
|
||
|
||
# 3. 保存片段文件
|
||
filename = f"{frag_item['alias']}.docx"
|
||
new_doc.save((reuse_dir_path / filename).as_posix())
|
||
|
||
def _copy_element_with_images(self, element, src_doc, dst_doc):
|
||
"""
|
||
复制 lxml 元素及其子树,将源文档中的图片关系迁移至目标文档。
|
||
"""
|
||
# 1. 深拷贝元素
|
||
new_element = copy.deepcopy(element)
|
||
# 2. 使用本地名称匹配查找图片节点(完全避免命名空间前缀参数)
|
||
pic_nodes = new_element.xpath('.//*[local-name()="pic"]')
|
||
for pic in pic_nodes:
|
||
blip_nodes = pic.xpath('.//*[local-name()="blip"]')
|
||
for blip in blip_nodes:
|
||
embed_attr = '{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed'
|
||
old_rId = blip.get(embed_attr)
|
||
if not old_rId:
|
||
continue
|
||
src_image_part = src_doc.part.related_parts.get(old_rId)
|
||
if src_image_part is None or not isinstance(src_image_part, ImagePart):
|
||
continue
|
||
# 添加图片到目标文档,获取新 rId
|
||
image_stream = BytesIO(src_image_part.blob)
|
||
new_rId, _ = dst_doc.part.get_or_add_image(image_stream)
|
||
# 更新属性
|
||
blip.set(embed_attr, new_rId)
|
||
# 3. 追加到目标文档 body
|
||
dst_doc.element.body.append(new_element)
|
||
|
||
# 辅助函数:将上传文件的文档片段以列表形式返回
|
||
def get_document_frag_list(self, doc: Document):
|
||
body = doc.element.body
|
||
sdt_element_list = body.xpath('./w:sdt') # 只查询文档片段,非文本片段
|
||
frag_list = []
|
||
for sdt_element in sdt_element_list:
|
||
alias_name = None
|
||
sdtContent = None
|
||
for sdt_child in sdt_element.iterchildren():
|
||
if sdt_child.tag.endswith('sdtPr'):
|
||
for sdtPr_child in sdt_child.getchildren():
|
||
if sdtPr_child.tag.endswith('alias'):
|
||
if len(sdtPr_child.attrib.values()) > 0:
|
||
alias_name = sdtPr_child.attrib.values()[0]
|
||
if sdt_child.tag.endswith("sdtContent"):
|
||
sdtContent = sdt_child
|
||
frag_list.append({'alias': alias_name, 'content': sdtContent})
|
||
return list(filter(lambda x: x['alias'] is not None, frag_list))
|