import re from copy import deepcopy from ninja_extra import api_controller, ControllerBase, route from ninja import Query from ninja_jwt.authentication import JWTAuth from ninja_extra.permissions import IsAuthenticated from ninja.pagination import paginate from ninja.errors import HttpError from utils.chen_pagination import MyPagination from django.db import transaction from django.db.models import F, Value from django.db.models.functions import Replace from django.shortcuts import get_object_or_404 from typing import List from utils.chen_response import ChenResponse from utils.chen_crud import multi_delete_design from utils.codes import HTTP_INDEX_ERROR from apps.project.models import Design, Dut, Round, Project from apps.project.schemas.design import DeleteSchema, DesignFilterSchema, DesignModelOutSchema, \ DesignTreeReturnSchema, \ DesignTreeInputSchema, DesignCreateOutSchema, DesignCreateInputSchema, MultiDesignCreateInputSchema, \ ReplaceDesignContentSchema from apps.project.tools.delete_change_key import design_delete_sub_node_key from utils.smallTools.interfaceTools import conditionNoneToBlank from apps.project.tools.auto_create_data import auto_create_renji @api_controller("/project", auth=JWTAuth(), permissions=[IsAuthenticated], tags=['设计需求数据']) class DesignController(ControllerBase): @route.get("/getDesignDemandList", response=List[DesignModelOutSchema], exclude_none=True, url_name="design-list") @transaction.atomic @paginate(MyPagination) def get_design_list(self, datafilter: DesignFilterSchema = Query(...)): conditionNoneToBlank(datafilter) query_params = { 'project__id': datafilter.project_id, 'ident__icontains': datafilter.ident, 'name__icontains': datafilter.name, 'demandType__contains': datafilter.demandType, 'chapter__icontains': datafilter.chapter } # 判断是否传递dut_id,如果没传递则查询轮次全部 if datafilter.dut_id: dut_key = f"{datafilter.round_id}-{datafilter.dut_id}" query_params['dut__key'] = dut_key else: # 如果没有dut__key则要查询round__key query_params['round__key'] = datafilter.round_id qs = Design.objects.filter(**query_params).order_by('id') return qs @route.get("/getDesignOne", response=DesignModelOutSchema, url_name='design-one') def get_dut(self, project_id: int, key: str): design_qs = Design.objects.filter(project_id=project_id, key=key).first() if design_qs: return design_qs raise HttpError(500, "未找到相应的数据") @route.get("/getDesignOneById", response=DesignModelOutSchema, url_name='design-one-by-id') def get_one_by_id(self, id: int): design_qs = Design.objects.filter(id=id).first() if design_qs: return design_qs raise HttpError(500, "未找到相应的数据") # 处理树状数据 @route.get("/getDesignDemandInfo", response=List[DesignTreeReturnSchema], url_name="design-info") def get_design_tree(self, payload: DesignTreeInputSchema = Query(...)): qs = Design.objects.filter(project__id=payload.project_id, dut__key=payload.key).order_by('id') return qs # 添加设计需求 @route.post("/designDemand/save", response=DesignCreateOutSchema, url_name="design-create") @transaction.atomic def create_design(self, payload: DesignCreateInputSchema): asert_dict = payload.dict(exclude_none=True) # 如果识别description为None变为空字符串 description = asert_dict.get('description') # 构造dut_key dut_key = "".join([payload.round_key, "-", payload.dut_key]) # 判重标识-不需要再查询round以后的 if Design.objects.filter(project__id=payload.project_id, round__key=payload.round_key, dut__key=dut_key, ident=payload.ident).exists() and asert_dict['ident'] != "": return ChenResponse(code=400, status=400, message='研制需求的标识重复,请检查') # 查询当前key应该为多少 design_count = Design.objects.filter(project__id=payload.project_id, dut__key=dut_key).count() key_string = ''.join([dut_key, "-", str(design_count)]) # 查询当前的round_id round_instance = Round.objects.get(project__id=payload.project_id, key=payload.round_key) dut_instance = Dut.objects.get(project__id=payload.project_id, key=dut_key) asert_dict.update( {'key': key_string, 'round': round_instance, 'dut': dut_instance, 'title': payload.name}) asert_dict.pop("round_key") asert_dict.pop("dut_key") qs = Design.objects.create(**asert_dict) return qs # 批量增加设计需求,对应前端批量增加页面modal @route.post('/designDemand/multi_save', url_name='design-multi-create') @transaction.atomic def multi_create_design(self, payload: MultiDesignCreateInputSchema): project_obj = get_object_or_404(Project, id=payload.project_id) dut_obj = project_obj.pdField.filter(key=payload.dut_key).first() round_obj = dut_obj.round # 当前dut下的design个数 design_count = Design.objects.filter(project=project_obj, dut=dut_obj).count() key_index = design_count # 这里根据payload.data批量增加 bulk_list = [] for desgin_obj in payload.data: design_one = Design(**desgin_obj.model_dump()) design_one.title = design_one.name # 计算出当前key应该为多少 design_one.key = ''.join([dut_obj.key, "-", str(key_index)]) key_index += 1 design_one.level = '2' design_one.project = project_obj design_one.round = round_obj design_one.dut = dut_obj bulk_list.append(design_one) Design.objects.bulk_create(bulk_list) # 为了前端更新,需要返回一个dut_key return ChenResponse(status=200, code=200, data={'key': dut_obj.key + '-1'}) # 更新设计需求 @route.put("/editDesignDemand/{id}", response=DesignCreateOutSchema, url_name="design-update") @transaction.atomic def update_design(self, id: int, payload: DesignCreateInputSchema): design_search = Design.objects.filter(project__id=payload.project_id, ident=payload.ident, round__key=payload.round_key) # 判断是否和同项目同轮次的标识重复 if len(design_search) > 1 and payload.ident != '': return ChenResponse(code=400, status=400, message='研制需求的标识重复,请检查') # 查到当前 design_qs = Design.objects.get(id=id) for attr, value in payload.dict().items(): if attr == 'project_id' or attr == 'round_key' or attr == 'dut_key': continue if attr == 'name': setattr(design_qs, "title", value) setattr(design_qs, attr, value) design_qs.save() return design_qs # 删除设计需求 @route.delete("/designDemand/delete", url_name="design-delete") @transaction.atomic def delete_design(self, data: DeleteSchema): # 根据其中一个id查询出dut_id try: design_single = Design.objects.filter(id=data.ids[0])[0] except IndexError: return ChenResponse(status=500, code=HTTP_INDEX_ERROR, message='您未选择需要删除的内容') dut_id = design_single.dut.id dut_key = design_single.dut.key multi_delete_design(data.ids, Design) index = 0 design_all_qs = Design.objects.filter(dut__id=dut_id).order_by('id') for single_qs in design_all_qs: design_key = "".join([dut_key, '-', str(index)]) single_qs.key = design_key index = index + 1 single_qs.save() design_delete_sub_node_key(single_qs) return ChenResponse(message="设计需求删除成功!") # 给复制功能级联选择器查询所有的设计需求【这是查项目所有的设计需求】 @route.get("/designDemand/getRelatedDesign", url_name='dut-relatedDesign') def getRelatedDesign(self, id: int): project_qs = get_object_or_404(Project, id=id) # 依次找出round -> dut -> design round_qs = project_qs.pField.all() data_list = [] for round in round_qs: round_dict = {'label': round.name, 'value': round.id, 'children': []} for dut in round.rdField.all(): dut_dict = {'label': dut.name, 'value': dut.id, 'children': []} for design in dut.rsField.all(): design_dict = {'label': design.name, 'value': design.id, 'key': design.key} dut_dict['children'].append(design_dict) round_dict['children'].append(dut_dict) data_list.append(round_dict) return ChenResponse(message='获取成功', data=data_list) # 设计需求-替换接口 @route.post("/designDemand/replace/", url_name='design-replace') @transaction.atomic def replace_content(self, payload: ReplaceDesignContentSchema): # 1.首先查询项目 project_obj: Project = get_object_or_404(Project, id=payload.project_id) # 2.查询[所有轮次]的selectRows的id design_qs = project_obj.psField.filter(id__in=payload.selectRows, round__key=payload.round_key) # 3.批量替换里面文本 replace_kwargs = { field_name: Replace(F(field_name), Value(payload.originText), Value(payload.replaceText)) for field_name in payload.selectColumn } # 4.提交更新 replace_count = design_qs.update(**replace_kwargs) return {'count': replace_count} # 点击生成人机交互界面测试-注意必须要有界面的软件 @route.get("/create_renji/", url_name='renji') @transaction.atomic def create_rj(self, round_id: int, project_id: int): user_name = self.context.request.user.name # 获取当前用户名 project_obj: Project = get_object_or_404(Project, id=project_id) dut_qs = Dut.objects.filter(round__key=round_id, project=project_obj, type='XQ').first() if dut_qs: auto_create_renji(user_name, dut_qs, project_obj) return ChenResponse(status=200, message='自动生成人机界面交互测试成功!', data=dut_qs.key) return ChenResponse(status=402, message='您还未录入需求规格说明文档,请录入后再试') # 复制design到当前dut下面接口 @route.get("/copy_current", url_name='copy-design-current') @transaction.atomic def copy_current(self, dut_id: int, design_id: int): dut_obj = get_object_or_404(Dut, id=dut_id) design_obj = get_object_or_404(Design, id=design_id) # 首先查询该dut下design个数,设置为新增设计需求的key末尾 key_index = dut_obj.rsField.count() new_design_obj = deepcopy(design_obj) # 修改新design内容 new_design_obj.pk = None new_design_obj.key = "".join([dut_obj.key, "-", str(key_index)]) new_design_obj.title = "".join([design_obj.title, "(复制)"]) new_design_obj.name = "".join([design_obj.name, "(复制)"]) # ident容错,查询是否有拼接的 current_ident = "".join([new_design_obj.ident, "1"]) project_obj = dut_obj.project exit_ident = project_obj.psField.filter(ident=current_ident).exists() if exit_ident: match = re.search(r'(\d+)$', current_ident) if match: num = int(match.group(1)) + 1 current_ident = re.sub(r'\d+$', str(num), current_ident) else: current_ident = current_ident + "1" new_design_obj.ident = current_ident # 最后记得save new_design_obj.save() return ChenResponse(status=200, code=200, message='复制当前设计需求成功', data="")