from datetime import datetime from typing import Any, Dict, List, Tuple from sqlalchemy.orm import Session from app.models.tooling import TestingGeneration, ToolJob from app.schemas.tooling import TestingGenerationSaveRequest TESTING_TOOL_NAME = "testing.case_generator" def _resolve_total_requirements(generated_file: Dict[str, Any]) -> int: requirements = generated_file.get("requirements") if isinstance(requirements, list): return len(requirements) total = generated_file.get("totalRequirements") if isinstance(total, int) and total >= 0: return total return 0 def build_testing_generation_response(job: ToolJob, generation: TestingGeneration) -> Dict[str, Any]: return { "jobId": job.id, "sourceJobId": generation.source_job_id, "sourceDocumentName": generation.source_document_name, "generatedAt": generation.generated_at.isoformat(), "totalRequirements": generation.total_requirements, "knowledgeBaseId": generation.knowledge_base_id, "generatedFile": generation.generated_file or {}, } def create_testing_generation( db: Session, user_id: int, payload: TestingGenerationSaveRequest, ) -> Dict[str, Any]: now = datetime.utcnow() total_requirements = _resolve_total_requirements(payload.generated_file) job = ToolJob( user_id=user_id, tool_name=TESTING_TOOL_NAME, status="completed", input_file_name=payload.source_document_name, input_file_path="", started_at=now, completed_at=now, output_summary={ "source_document_name": payload.source_document_name, "total_requirements": total_requirements, "knowledge_base_id": payload.knowledge_base_id, }, ) db.add(job) db.flush() generation = TestingGeneration( job_id=job.id, source_job_id=payload.source_job_id, source_document_name=payload.source_document_name, generated_at=now, total_requirements=total_requirements, knowledge_base_id=payload.knowledge_base_id, generated_file=payload.generated_file, ) db.add(generation) db.commit() db.refresh(job) db.refresh(generation) return build_testing_generation_response(job, generation) def list_testing_history(db: Session, user_id: int) -> List[Dict[str, Any]]: rows: List[Tuple[ToolJob, TestingGeneration]] = ( db.query(ToolJob, TestingGeneration) .join(TestingGeneration, TestingGeneration.job_id == ToolJob.id) .filter( ToolJob.user_id == user_id, ToolJob.tool_name == TESTING_TOOL_NAME, ) .order_by(ToolJob.created_at.desc()) .all() ) items: List[Dict[str, Any]] = [] for job, generation in rows: items.append( { "jobId": job.id, "sourceJobId": generation.source_job_id, "sourceDocumentName": generation.source_document_name, "generatedAt": generation.generated_at.isoformat(), "totalRequirements": generation.total_requirements, "knowledgeBaseId": generation.knowledge_base_id, "status": job.status, "createdAt": job.created_at.isoformat(), "updatedAt": job.updated_at.isoformat(), } ) return items def delete_testing_generation(db: Session, job: ToolJob) -> None: db.delete(job) db.commit()