112 lines
3.4 KiB
Python
112 lines
3.4 KiB
Python
|
|
from datetime import datetime
|
||
|
|
from typing import Any, Dict, List, Tuple
|
||
|
|
|
||
|
|
from sqlalchemy.orm import Session
|
||
|
|
|
||
|
|
from app.models.tooling import TestingGeneration, ToolJob
|
||
|
|
from app.schemas.tooling import TestingGenerationSaveRequest
|
||
|
|
|
||
|
|
TESTING_TOOL_NAME = "testing.case_generator"
|
||
|
|
|
||
|
|
|
||
|
|
def _resolve_total_requirements(generated_file: Dict[str, Any]) -> int:
|
||
|
|
requirements = generated_file.get("requirements")
|
||
|
|
if isinstance(requirements, list):
|
||
|
|
return len(requirements)
|
||
|
|
|
||
|
|
total = generated_file.get("totalRequirements")
|
||
|
|
if isinstance(total, int) and total >= 0:
|
||
|
|
return total
|
||
|
|
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
def build_testing_generation_response(job: ToolJob, generation: TestingGeneration) -> Dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"jobId": job.id,
|
||
|
|
"sourceJobId": generation.source_job_id,
|
||
|
|
"sourceDocumentName": generation.source_document_name,
|
||
|
|
"generatedAt": generation.generated_at.isoformat(),
|
||
|
|
"totalRequirements": generation.total_requirements,
|
||
|
|
"knowledgeBaseId": generation.knowledge_base_id,
|
||
|
|
"generatedFile": generation.generated_file or {},
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def create_testing_generation(
|
||
|
|
db: Session,
|
||
|
|
user_id: int,
|
||
|
|
payload: TestingGenerationSaveRequest,
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
now = datetime.utcnow()
|
||
|
|
total_requirements = _resolve_total_requirements(payload.generated_file)
|
||
|
|
|
||
|
|
job = ToolJob(
|
||
|
|
user_id=user_id,
|
||
|
|
tool_name=TESTING_TOOL_NAME,
|
||
|
|
status="completed",
|
||
|
|
input_file_name=payload.source_document_name,
|
||
|
|
input_file_path="",
|
||
|
|
started_at=now,
|
||
|
|
completed_at=now,
|
||
|
|
output_summary={
|
||
|
|
"source_document_name": payload.source_document_name,
|
||
|
|
"total_requirements": total_requirements,
|
||
|
|
"knowledge_base_id": payload.knowledge_base_id,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
db.add(job)
|
||
|
|
db.flush()
|
||
|
|
|
||
|
|
generation = TestingGeneration(
|
||
|
|
job_id=job.id,
|
||
|
|
source_job_id=payload.source_job_id,
|
||
|
|
source_document_name=payload.source_document_name,
|
||
|
|
generated_at=now,
|
||
|
|
total_requirements=total_requirements,
|
||
|
|
knowledge_base_id=payload.knowledge_base_id,
|
||
|
|
generated_file=payload.generated_file,
|
||
|
|
)
|
||
|
|
db.add(generation)
|
||
|
|
db.commit()
|
||
|
|
db.refresh(job)
|
||
|
|
db.refresh(generation)
|
||
|
|
|
||
|
|
return build_testing_generation_response(job, generation)
|
||
|
|
|
||
|
|
|
||
|
|
def list_testing_history(db: Session, user_id: int) -> List[Dict[str, Any]]:
|
||
|
|
rows: List[Tuple[ToolJob, TestingGeneration]] = (
|
||
|
|
db.query(ToolJob, TestingGeneration)
|
||
|
|
.join(TestingGeneration, TestingGeneration.job_id == ToolJob.id)
|
||
|
|
.filter(
|
||
|
|
ToolJob.user_id == user_id,
|
||
|
|
ToolJob.tool_name == TESTING_TOOL_NAME,
|
||
|
|
)
|
||
|
|
.order_by(ToolJob.created_at.desc())
|
||
|
|
.all()
|
||
|
|
)
|
||
|
|
|
||
|
|
items: List[Dict[str, Any]] = []
|
||
|
|
for job, generation in rows:
|
||
|
|
items.append(
|
||
|
|
{
|
||
|
|
"jobId": job.id,
|
||
|
|
"sourceJobId": generation.source_job_id,
|
||
|
|
"sourceDocumentName": generation.source_document_name,
|
||
|
|
"generatedAt": generation.generated_at.isoformat(),
|
||
|
|
"totalRequirements": generation.total_requirements,
|
||
|
|
"knowledgeBaseId": generation.knowledge_base_id,
|
||
|
|
"status": job.status,
|
||
|
|
"createdAt": job.created_at.isoformat(),
|
||
|
|
"updatedAt": job.updated_at.isoformat(),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
return items
|
||
|
|
|
||
|
|
|
||
|
|
def delete_testing_generation(db: Session, job: ToolJob) -> None:
|
||
|
|
db.delete(job)
|
||
|
|
db.commit()
|