Files
TBgen_App/autoline/TB_cga.py

1454 lines
63 KiB
Python
Raw Normal View History

2026-03-30 16:46:48 +08:00
# # #第四版
# # """
# # Description : Coverage-Guided Agent (CGA) Main Controller
# # - Integrated with Layer 0: Semantic Analysis
# # - Integrated with Layer 1: Diversity Constraint Injection
# # - Integrated with Layer 4: Energy Allocation
# # Author : CorrectBench Integration
# # """
# # import os
# # import sys
# # import shutil
# # import LLM_call as llm
# # import loader_saver as ls
# # from loader_saver import autologger as logger
# # from utils.verilator_call import verilator_run_coverage
# # from autoline.cga_utils import CoverageParser, TBInjector
# # # [新增] 导入语义分析层
# # from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
# # # [新增] 导入能量分配层
# # from autoline.energy_allocator import EnergyAllocator, EnergyState
# # # [新增] 导入多样性约束注入器
# # from autoline.diversity_injector import DiversityInjector
# # # [新增] 导入测试历史管理器
# # from autoline.test_history import TestHistoryManager
# # class TaskTBCGA:
# # def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config):
# # self.task_dir = task_dir
# # self.task_id = task_id
# # self.header = header
# # self.DUT_code = DUT_code
# # self.TB_code = TB_code
# # self.config = config
# # self.max_iter = 5
# # self.target_coverage = 95.0
# # self.model = config.gpt.model
# # self.best_tb = TB_code
# # self.best_score = 0.0
# # # [新增] 能量分配器
# # self.energy_allocator: EnergyAllocator = None
# # # [新增] 多样性约束注入器
# # self.diversity_injector: DiversityInjector = None
# # # [新增辅助函数] 从父目录拷贝 DUT
# # def _prepare_dut(self, target_dir):
# # source_dut = os.path.join(self.task_dir, "DUT.v")
# # target_dut = os.path.join(target_dir, "DUT.v")
# # # 优先拷贝现有的文件
# # if os.path.exists(source_dut):
# # shutil.copy(source_dut, target_dut)
# # else:
# # # 只有当文件由于某种原因被删除了,才降级使用内存中的 code
# # ls.save_code(self.DUT_code, target_dut)
# # def run(self):
# # logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
# # # 1. 确保工作目录存在 (saves/任务名/5_CGA)
# # work_dir = os.path.join(self.task_dir, "5_CGA")
# # if os.path.exists(work_dir):
# # shutil.rmtree(work_dir)
# # os.makedirs(work_dir, exist_ok=True)
# # # === [新增] Step 0: 语义分析 ===
# # logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
# # self.semantic_result = None
# # try:
# # semantic_analyzer = SemanticAnalyzer(self.DUT_code)
# # self.semantic_result = semantic_analyzer.analyze()
# # # 记录分析结果摘要
# # fp_count = len(self.semantic_result.get('function_points', []))
# # fsm_info = semantic_analyzer.get_fsm_info()
# # if fsm_info:
# # logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
# # f"({len(fsm_info.get('states', []))} states)")
# # logger.info(f" Total function points identified: {fp_count}")
# # # 保存语义分析报告
# # semantic_report = semantic_analyzer.generate_prompt_context()
# # ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
# # # === [新增] Step 0.1: 初始化能量分配器 ===
# # if self.semantic_result.get('function_points'):
# # self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
# # energy_init_result = self.energy_allocator.initialize(
# # self.semantic_result['function_points']
# # )
# # logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
# # # === [新增] Step 0.2: 初始化多样性约束注入器 ===
# # history_file = os.path.join(work_dir, "test_history.json")
# # # 创建 TestHistoryManager 并传递 history_file
# # history_manager = TestHistoryManager(history_file=history_file)
# # self.diversity_injector = DiversityInjector(history_manager=history_manager)
# # logger.info(f" Diversity injector initialized with history file: {history_file}")
# # except Exception as e:
# # logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
# # # ================================
# # current_tb = self.TB_code
# # last_annotated_file = None
# # # --- Baseline ---
# # logger.info(f"--- CGA Iter 0 (Baseline) ---")
# # iter0_dir = os.path.join(work_dir, "iter_0")
# # os.makedirs(iter0_dir, exist_ok=True)
# # self._prepare_dut(iter0_dir)
# # ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
# # success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
# # self.best_score = score
# # self.best_tb = current_tb
# # last_annotated_file = annotated_path
# # logger.info(f"Baseline Coverage: {score:.2f}%")
# # if score >= self.target_coverage:
# # logger.success(f"Target reached at baseline!")
# # # [修改] 返回元组 (代码, 分数)
# # return self.best_tb, self.best_score
# # # --- Loop ---
# # for i in range(1, self.max_iter + 1):
# # logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
# # # === [新增] 能量检查:是否还有活跃目标 ===
# # if self.energy_allocator:
# # current_target = self.energy_allocator.select_next_target()
# # if not current_target:
# # logger.info("No more active targets with remaining energy. Stopping.")
# # break
# # logger.info(f"Target: {current_target}")
# # # =========================================
# # if not last_annotated_file: break
# # # [修改] 传递语义分析结果、能量分配器、多样性注入器给 CoverageParser
# # parser = CoverageParser(
# # last_annotated_file,
# # tb_code=self.best_tb,
# # semantic_result=self.semantic_result,
# # energy_allocator=self.energy_allocator,
# # diversity_injector=self.diversity_injector # [新增]
# # )
# # prompt = parser.generate_prompt(self.best_score)
# # if not prompt:
# # logger.info("No reachable missing blocks found. Stopping.")
# # break
# # logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
# # messages = [{"role": "user", "content": prompt}]
# # try:
# # response, _ = llm.llm_call(messages, self.model)
# # codes = llm.extract_code(response, "verilog")
# # new_task_code = codes[0] if codes else ""
# # if not new_task_code:
# # # [新增] 记录失败
# # if self.energy_allocator:
# # self.energy_allocator.record_generation(
# # success=False,
# # coverage_delta=0.0,
# # energy_cost=1.0
# # )
# # continue
# # except Exception as e:
# # logger.error(f"LLM Call failed: {e}")
# # # [新增] 记录失败
# # if self.energy_allocator:
# # self.energy_allocator.record_generation(
# # success=False,
# # coverage_delta=0.0,
# # energy_cost=1.0
# # )
# # break
# # injector = TBInjector(self.best_tb)
# # enhanced_tb = injector.inject(new_task_code, iter_idx=i)
# # iter_dir = os.path.join(work_dir, f"iter_{i}")
# # os.makedirs(iter_dir, exist_ok=True)
# # self._prepare_dut(iter_dir)
# # ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
# # success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
# # # === [新增] 记录生成结果到能量分配器 ===
# # coverage_delta = new_score - self.best_score if success else 0.0
# # generation_success = success and new_score > self.best_score
# # if self.energy_allocator:
# # self.energy_allocator.record_generation(
# # success=generation_success,
# # coverage_delta=coverage_delta,
# # energy_cost=1.0
# # )
# # # =========================================
# # # === [新增] 记录测试用例到多样性历史 ===
# # if self.diversity_injector:
# # # 提取已知信号
# # known_signals = []
# # if self.semantic_result:
# # known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
# # self.diversity_injector.record_test(
# # code=new_task_code,
# # target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
# # coverage_score=new_score,
# # success=generation_success,
# # iteration=i,
# # known_signals=known_signals
# # )
# # # =======================================
# # if success and new_score > self.best_score:
# # improvement = new_score - self.best_score
# # logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
# # self.best_score = new_score
# # self.best_tb = enhanced_tb
# # last_annotated_file = new_annotated_path
# # elif success and new_score == self.best_score:
# # logger.info(f"Coverage unchanged. Keeping previous.")
# # else:
# # logger.warning(f"Regression or Failure. Discarding changes.")
# # if self.best_score >= self.target_coverage:
# # logger.success("Target coverage reached!")
# # break
# # logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
# # # === [新增] 生成能量分配报告 ===
# # if self.energy_allocator:
# # energy_report = self.energy_allocator.generate_report()
# # ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
# # logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
# # # =================================
# # # === [新增] 生成多样性报告并保存历史 ===
# # if self.diversity_injector:
# # diversity_report = self.diversity_injector.generate_diversity_report()
# # ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
# # logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
# # # 保存测试历史
# # self.diversity_injector.history.save()
# # # ======================================
# # # [修改] 返回元组 (代码, 分数)
# # return self.best_tb, self.best_score
# #终版
# """
# Description : Coverage-Guided Agent (CGA) Main Controller
# - Integrated with Layer 0: Semantic Analysis
# - Integrated with Layer 1: Diversity Constraint Injection
# - Integrated with Layer 3: Quality Evaluation
# - Integrated with Layer 4: Energy Allocation
# Author : CorrectBench Integration
# """
# import os
# import sys
# import shutil
# import LLM_call as llm
# import loader_saver as ls
# from loader_saver import autologger as logger
# from utils.verilator_call import verilator_run_coverage
# from autoline.cga_utils import CoverageParser, TBInjector
# # [新增] 导入语义分析层
# from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
# # [新增] 导入能量分配层
# from autoline.energy_allocator import EnergyAllocator, EnergyState
# # [新增] 导入多样性约束注入器
# from autoline.diversity_injector import DiversityInjector
# # [新增] 导入测试历史管理器
# from autoline.test_history import TestHistoryManager
# # [新增] 导入质量评估层
# from autoline.quality_evaluator import QualityEvaluator, DiversityScore, SemanticCoverageResult
# class TaskTBCGA:
# def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config):
# self.task_dir = task_dir
# self.task_id = task_id
# self.header = header
# self.DUT_code = DUT_code
# self.TB_code = TB_code
# self.config = config
# self.max_iter = 5
# self.target_coverage = 95.0
# self.model = config.gpt.model
# self.best_tb = TB_code
# self.best_score = 0.0
# # [新增] 能量分配器
# self.energy_allocator: EnergyAllocator = None
# # [新增] 多样性约束注入器
# self.diversity_injector: DiversityInjector = None
# # [新增] 质量评估器
# self.quality_evaluator: QualityEvaluator = None
# # [新增辅助函数] 从父目录拷贝 DUT
# def _prepare_dut(self, target_dir):
# source_dut = os.path.join(self.task_dir, "DUT.v")
# target_dut = os.path.join(target_dir, "DUT.v")
# # 优先拷贝现有的文件
# if os.path.exists(source_dut):
# shutil.copy(source_dut, target_dut)
# else:
# # 只有当文件由于某种原因被删除了,才降级使用内存中的 code
# ls.save_code(self.DUT_code, target_dut)
# def _generate_exploration_prompt(self, iteration: int) -> str:
# """
# 生成探索性测试 Prompt
# 当找不到明确的 missing blocks 但覆盖率仍未达标时,
# 生成一个探索性 Prompt 来尝试发现新的测试路径。
# Args:
# iteration: 当前迭代次数
# Returns:
# 探索性测试 Prompt如果无法生成则返回 None
# """
# # 从语义分析结果获取 FSM 和功能点信息
# fsm_info = ""
# if self.semantic_result:
# fsm_data = self.semantic_result.get('fsm', {})
# if fsm_data:
# states = fsm_data.get('states', [])
# state_var = fsm_data.get('state_variable', 'state')
# fsm_info = f"""
# [FSM INFORMATION]
# - State variable: {state_var}
# - Known states: {', '.join(states) if states else 'unknown'}
# The DUT appears to be a Finite State Machine. To improve coverage:
# 1. Try to visit each state by driving inputs that trigger state transitions
# 2. For each state, try different input combinations
# 3. Consider edge cases: reset transitions, timeout conditions, error states
# """
# # 从能量分配器获取目标功能点
# target_info = ""
# if self.energy_allocator and self.energy_allocator.current_target:
# target = self.energy_allocator.current_target
# target_info = f"""
# [CURRENT TARGET]
# Focus on: {target.function_point}
# Remaining energy: {target.remaining}
# """
# # 从多样性注入器获取已尝试的测试
# diversity_hints = ""
# if self.diversity_injector:
# history = self.diversity_injector.history
# # if history and len(history.history) > 0:
# # recent_tests = history.history[-5:] if len(history.history) > 5 else history.history
# if history and hasattr(history, 'records') and len(history.records) > 0:
# recent_tests = history.records[-5:] if len(history.records) > 5 else history.records
# diversity_hints = f"""
# [RECENTLY TRIED APPROACHES - AVOID REPETITION]
# Recent test patterns tried:
# """
# # for i, test in enumerate(recent_tests):
# # diversity_hints += f"- Iter {test.get('iteration', i)}: target={test.get('target_function', 'unknown')}\n"
# for i, test in enumerate(recent_tests):
# # TestRecord 是 dataclass使用属性访问
# target = getattr(test, 'target_function', 'unknown') if hasattr(test, 'target_function') else 'unknown'
# iteration = getattr(test, 'iteration', i) if hasattr(test, 'iteration') else i
# diversity_hints += f"- Iter {iteration}: target={target}\n"
# prompt = f"""
# [EXPLORATION MODE - ITERATION {iteration}]
# Current coverage is {self.best_score:.2f}%, but no specific uncovered code blocks were identified.
# This may happen when:
# 1. Coverage data is incomplete or filtered
# 2. Branch/condition coverage needs improvement (not just line coverage)
# 3. State transitions in FSM are not fully exercised
# {fsm_info}
# {target_info}
# {diversity_hints}
# [YOUR TASK]
# Write an EXPLORATORY test scenario that:
# 1. Covers different input combinations than previous tests
# 2. Explores different FSM state transitions
# 3. Tests edge cases and boundary conditions
# 4. Varies timing and sequence of inputs
# [OUTPUT FORMAT]
# Return ONLY Verilog test scenario code (no task wrapper).
# Use the signal names from the testbench.
# ```verilog
# // Your exploratory test code here
# ```
# """
# return prompt
# def run(self):
# logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
# # 1. 确保工作目录存在 (saves/任务名/5_CGA)
# work_dir = os.path.join(self.task_dir, "5_CGA")
# if os.path.exists(work_dir):
# shutil.rmtree(work_dir)
# os.makedirs(work_dir, exist_ok=True)
# # === [新增] Step 0: 语义分析 ===
# logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
# self.semantic_result = None
# try:
# semantic_analyzer = SemanticAnalyzer(self.DUT_code)
# self.semantic_result = semantic_analyzer.analyze()
# # 记录分析结果摘要
# fp_count = len(self.semantic_result.get('function_points', []))
# fsm_info = semantic_analyzer.get_fsm_info()
# if fsm_info:
# logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
# f"({len(fsm_info.get('states', []))} states)")
# logger.info(f" Total function points identified: {fp_count}")
# # 保存语义分析报告
# semantic_report = semantic_analyzer.generate_prompt_context()
# ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
# # === [新增] Step 0.1: 初始化能量分配器 ===
# if self.semantic_result.get('function_points'):
# self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
# energy_init_result = self.energy_allocator.initialize(
# self.semantic_result['function_points']
# )
# logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
# # === [新增] Step 0.2: 初始化多样性约束注入器 ===
# history_file = os.path.join(work_dir, "test_history.json")
# # 创建 TestHistoryManager 并传递 history_file
# history_manager = TestHistoryManager(history_file=history_file)
# self.diversity_injector = DiversityInjector(history_manager=history_manager)
# logger.info(f" Diversity injector initialized with history file: {history_file}")
# # === [新增] Step 0.3: 初始化质量评估器 ===
# if self.semantic_result.get('function_points'):
# self.quality_evaluator = QualityEvaluator(
# function_points=self.semantic_result['function_points']
# )
# logger.info(f" Quality evaluator initialized")
# except Exception as e:
# logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
# # ================================
# current_tb = self.TB_code
# last_annotated_file = None
# # --- Baseline ---
# logger.info(f"--- CGA Iter 0 (Baseline) ---")
# iter0_dir = os.path.join(work_dir, "iter_0")
# os.makedirs(iter0_dir, exist_ok=True)
# self._prepare_dut(iter0_dir)
# ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
# success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
# self.best_score = score
# self.best_tb = current_tb
# last_annotated_file = annotated_path
# logger.info(f"Baseline Coverage: {score:.2f}%")
# if score >= self.target_coverage:
# logger.success(f"Target reached at baseline!")
# # [修改] 返回元组 (代码, 分数)
# return self.best_tb, self.best_score
# # --- Loop ---
# for i in range(1, self.max_iter + 1):
# logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
# # === [新增] 能量检查:是否还有活跃目标 ===
# if self.energy_allocator:
# current_target = self.energy_allocator.select_next_target()
# if not current_target:
# logger.info("No more active targets with remaining energy. Stopping.")
# break
# logger.info(f"Target: {current_target}")
# # =========================================
# if not last_annotated_file: break
# # [修改] 传递语义分析结果、能量分配器、多样性注入器给 CoverageParser
# parser = CoverageParser(
# last_annotated_file,
# tb_code=self.best_tb,
# semantic_result=self.semantic_result,
# energy_allocator=self.energy_allocator,
# diversity_injector=self.diversity_injector # [新增]
# )
# prompt = parser.generate_prompt(self.best_score)
# # if not prompt:
# # logger.info("No reachable missing blocks found. Stopping.")
# # break
# if not prompt:
# if self.best_score >= self.target_coverage:
# break # 达标才停止
# else:
# # 未达标,尝试探索性测试
# prompt = self._generate_exploration_prompt(i)
# logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
# messages = [{"role": "user", "content": prompt}]
# try:
# response, _ = llm.llm_call(messages, self.model)
# codes = llm.extract_code(response, "verilog")
# new_task_code = codes[0] if codes else ""
# if not new_task_code:
# # [新增] 记录失败
# if self.energy_allocator:
# self.energy_allocator.record_generation(
# success=False,
# coverage_delta=0.0,
# energy_cost=1.0
# )
# continue
# except Exception as e:
# logger.error(f"LLM Call failed: {e}")
# # [新增] 记录失败
# if self.energy_allocator:
# self.energy_allocator.record_generation(
# success=False,
# coverage_delta=0.0,
# energy_cost=1.0
# )
# break
# injector = TBInjector(self.best_tb)
# enhanced_tb = injector.inject(new_task_code, iter_idx=i)
# iter_dir = os.path.join(work_dir, f"iter_{i}")
# os.makedirs(iter_dir, exist_ok=True)
# self._prepare_dut(iter_dir)
# ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
# success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
# # === [新增] 记录生成结果到能量分配器 ===
# coverage_delta = new_score - self.best_score if success else 0.0
# generation_success = success and new_score > self.best_score
# if self.energy_allocator:
# self.energy_allocator.record_generation(
# success=generation_success,
# coverage_delta=coverage_delta,
# energy_cost=1.0
# )
# # =========================================
# # === [新增] 记录测试用例到多样性历史 ===
# if self.diversity_injector:
# # 提取已知信号
# known_signals = []
# if self.semantic_result:
# known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
# self.diversity_injector.record_test(
# code=new_task_code,
# target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
# coverage_score=new_score,
# success=generation_success,
# iteration=i,
# known_signals=known_signals
# )
# # =======================================
# # === [新增] Layer 3: 质量评估 ===
# if self.quality_evaluator:
# # 评估测试用例质量
# eval_result = self.quality_evaluator.evaluate_test_case(
# code=new_task_code,
# covered_lines=set(), # 如果有具体覆盖行信息可传入
# covered_functions=[], # 如果有覆盖功能点信息可传入
# test_id=f"iter_{i}",
# iteration=i
# )
# # 记录多样性得分
# diversity_score = eval_result.get('diversity', {}).get('overall_score', 0)
# logger.info(f" Quality Evaluation: diversity={diversity_score:.2f}")
# # 检查是否应该接受该测试用例
# should_accept, reason = self.quality_evaluator.should_accept(eval_result)
# if not should_accept:
# logger.warning(f" Quality check failed: {reason}")
# # =====================================
# if success and new_score > self.best_score:
# improvement = new_score - self.best_score
# logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
# self.best_score = new_score
# self.best_tb = enhanced_tb
# last_annotated_file = new_annotated_path
# elif success and new_score == self.best_score:
# logger.info(f"Coverage unchanged. Keeping previous.")
# else:
# logger.warning(f"Regression or Failure. Discarding changes.")
# if self.best_score >= self.target_coverage:
# logger.success("Target coverage reached!")
# break
# logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
# # === [新增] 生成能量分配报告 ===
# if self.energy_allocator:
# energy_report = self.energy_allocator.generate_report()
# ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
# logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
# # =================================
# # === [新增] 生成多样性报告并保存历史 ===
# if self.diversity_injector:
# diversity_report = self.diversity_injector.generate_diversity_report()
# ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
# logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
# # 保存测试历史
# self.diversity_injector.history.save()
# # ======================================
# # === [新增] Layer 3: 生成质量评估报告 ===
# if self.quality_evaluator:
# quality_report = self.quality_evaluator.generate_report()
# ls.save_code(quality_report, os.path.join(work_dir, "quality_evaluation_report.txt"))
# logger.info(f"Quality evaluation report saved to {work_dir}/quality_evaluation_report.txt")
# # 输出语义覆盖率摘要
# coverage_result = self.quality_evaluator.semantic_coverage.calculate_coverage()
# logger.info(f"Semantic Coverage: {coverage_result.semantic_coverage:.2%}")
# # ===========================================
# # [修改] 返回元组 (代码, 分数)
# return self.best_tb, self.best_score
"""
Description : Coverage-Guided Agent (CGA) Main Controller
- Integrated with Layer 0: Semantic Analysis
- Integrated with Layer 1: Diversity Constraint Injection
- Integrated with Layer 3: Quality Evaluation
- Integrated with Layer 4: Energy Allocation
Author : CorrectBench Integration
"""
import os
import re
import sys
import shutil
import LLM_call as llm
import loader_saver as ls
from loader_saver import autologger as logger
from utils.verilator_call import verilator_run_coverage
from autoline.cga_utils import CoverageParser, TBInjector
# [新增] 导入语义分析层
from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
# [新增] 导入能量分配层
from autoline.energy_allocator import EnergyAllocator, EnergyState
# [新增] 导入多样性约束注入器
from autoline.diversity_injector import DiversityInjector
# [新增] 导入测试历史管理器
from autoline.test_history import TestHistoryManager
# [新增] 导入质量评估层
from autoline.quality_evaluator import QualityEvaluator, DiversityScore, SemanticCoverageResult
class TaskTBCGA:
def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config, work_subdir="CGA", max_iter=None):
self.task_dir = task_dir
self.task_id = task_id
self.header = header
self.DUT_code = DUT_code
self.TB_code = TB_code
self.config = config
self.work_subdir = work_subdir
self.max_iter = config.autoline.cga.max_iter if max_iter is None else max_iter
self.target_coverage = config.autoline.cga.target_coverage
self.model = config.gpt.model
self.best_tb = TB_code
self.best_score = 0.0
self.best_covered_lines = set()
self.best_covered_functions = set()
# [新增] 能量分配器
self.energy_allocator: EnergyAllocator = None
# [新增] 多样性约束注入器
self.diversity_injector: DiversityInjector = None
# [新增] 质量评估器
self.quality_evaluator: QualityEvaluator = None
# [新增辅助函数] 从父目录拷贝 DUT
def _prepare_dut(self, target_dir):
source_dut = os.path.join(self.task_dir, "DUT.v")
target_dut = os.path.join(target_dir, "DUT.v")
# 优先拷贝现有的文件
if os.path.exists(source_dut):
shutil.copy(source_dut, target_dut)
else:
# 只有当文件由于某种原因被删除了,才降级使用内存中的 code
ls.save_code(self.DUT_code, target_dut)
def _extract_coverage_snapshot(self, annotated_path):
"""
Verilator annotated DUT 中提取当前已覆盖行和已覆盖功能点
"""
snapshot = {
"covered_lines": set(),
"covered_functions": set(),
"coverable_lines": set(),
}
if not annotated_path or not os.path.exists(annotated_path):
return snapshot
pct_pattern = re.compile(r"^%(\d+)\s+(.*)$")
tilde_pattern = re.compile(r"^~(\d+)\s+(.*)$")
caret_pattern = re.compile(r"^\^(\d+)\s+(.*)$")
plain_pattern = re.compile(r"^\s*(\d+)\s+(.*)$")
decl_pattern = re.compile(r"^\s*(input|output|inout|wire|reg|logic|parameter|localparam|assign)\b")
with open(annotated_path, "r", encoding="utf-8", errors="ignore") as f:
for line_no, raw_line in enumerate(f, start=1):
stripped = raw_line.strip()
if not stripped:
continue
count = None
code_part = None
is_caret = False
match = pct_pattern.match(stripped)
if match:
count = int(match.group(1))
code_part = match.group(2).strip()
else:
match = tilde_pattern.match(stripped)
if match:
count = int(match.group(1))
code_part = match.group(2).strip()
else:
match = caret_pattern.match(stripped)
if match:
is_caret = True
code_part = match.group(2).strip()
else:
match = plain_pattern.match(stripped)
if match:
count = int(match.group(1))
code_part = match.group(2).strip()
if code_part is None:
continue
if "//" in code_part:
code_part = code_part.split("//", 1)[0].strip()
if not code_part:
continue
if decl_pattern.match(code_part):
continue
if code_part in {"begin", "end", "else", "endmodule", "endcase", ");", "default:"}:
continue
if not any(ch.isalnum() for ch in code_part):
continue
snapshot["coverable_lines"].add(line_no)
if (count is not None) and (count > 0) and not is_caret:
snapshot["covered_lines"].add(line_no)
snapshot["covered_functions"] = self._map_lines_to_function_points(snapshot["covered_lines"])
return snapshot
def _map_lines_to_function_points(self, covered_lines):
"""
用功能点 location 与已覆盖行做交集推断当前已命中的功能点
"""
matched = set()
if not self.semantic_result:
return matched
for fp in self.semantic_result.get("function_points", []):
location = fp.get("location", {})
start_line = location.get("start_line", 0)
end_line = location.get("end_line", 0)
if (start_line <= 0) or (end_line <= 0):
continue
if any(start_line <= line_no <= end_line for line_no in covered_lines):
matched.add(fp.get("name", ""))
matched.discard("")
return matched
def _generate_exploration_prompt(self, iteration: int) -> str:
"""
生成探索性测试 Prompt
当找不到明确的 missing blocks 但覆盖率仍未达标时
生成一个探索性 Prompt 来尝试发现新的测试路径
Args:
iteration: 当前迭代次数
Returns:
探索性测试 Prompt如果无法生成则返回 None
"""
# 从语义分析结果获取 FSM 和功能点信息
fsm_info = ""
if self.semantic_result:
fsm_data = self.semantic_result.get('fsm', {})
if fsm_data:
states = fsm_data.get('states', [])
state_var = fsm_data.get('state_variable', 'state')
fsm_info = f"""
[FSM INFORMATION]
- State variable: {state_var}
- Known states: {', '.join(states) if states else 'unknown'}
The DUT appears to be a Finite State Machine. To improve coverage:
1. Try to visit each state by driving inputs that trigger state transitions
2. For each state, try different input combinations
3. Consider edge cases: reset transitions, timeout conditions, error states
"""
# 从能量分配器获取目标功能点
target_info = ""
if self.energy_allocator and self.energy_allocator.current_target:
target = self.energy_allocator.current_target
target_info = f"""
[CURRENT TARGET]
Focus on: {target.function_point}
Remaining energy: {target.remaining}
"""
# 从多样性注入器获取已尝试的测试
diversity_hints = ""
if self.diversity_injector:
history = self.diversity_injector.history
# [修复] TestHistoryManager 使用 records 属性,不是 history
if history and hasattr(history, 'records') and len(history.records) > 0:
recent_tests = history.records[-5:] if len(history.records) > 5 else history.records
diversity_hints = f"""
[RECENTLY TRIED APPROACHES - AVOID REPETITION]
Recent test patterns tried:
"""
for i, test in enumerate(recent_tests):
# TestRecord 是 dataclass使用属性访问
target = getattr(test, 'target_function', 'unknown') if hasattr(test, 'target_function') else test.get('target_function', 'unknown') if isinstance(test, dict) else 'unknown'
iteration = getattr(test, 'iteration', i) if hasattr(test, 'iteration') else test.get('iteration', i) if isinstance(test, dict) else i
diversity_hints += f"- Iter {iteration}: target={target}\n"
prompt = f"""
[EXPLORATION MODE - ITERATION {iteration}]
Current coverage is {self.best_score:.2f}%, but no specific uncovered code blocks were identified.
This may happen when:
1. Coverage data is incomplete or filtered
2. Branch/condition coverage needs improvement (not just line coverage)
3. State transitions in FSM are not fully exercised
{fsm_info}
{target_info}
{diversity_hints}
[YOUR TASK]
Write an EXPLORATORY test scenario that:
1. Covers different input combinations than previous tests
2. Explores different FSM state transitions
3. Tests edge cases and boundary conditions
4. Varies timing and sequence of inputs
[OUTPUT FORMAT]
Return ONLY Verilog test scenario code (no task wrapper).
Use the signal names from the testbench.
```verilog
// Your exploratory test code here
```
"""
return prompt
def _generate_syntax_fix_prompt(self, original_code: str, syntax_issues: dict, original_prompt: str) -> str:
"""
生成语法修正 Prompt LLM 修复检测到的语法问题
Args:
original_code: 原始生成的代码
syntax_issues: 语法检查结果
original_prompt: 原始 Prompt
Returns:
修正 Prompt
"""
issues_text = []
for issue in syntax_issues.get('width_mismatch', []):
issues_text.append(f"- {issue['message']}")
if 'suggestion' in issue:
issues_text.append(f" Suggestion: {issue['suggestion']}")
for issue in syntax_issues.get('logic_issues', []):
issues_text.append(f"- {issue['message']}")
if 'suggestion' in issue:
issues_text.append(f" Suggestion: {issue['suggestion']}")
for issue in syntax_issues.get('syntax_warnings', []):
if issue['severity'] == 'error':
issues_text.append(f"- ERROR: {issue['message']}")
prompt = f"""
[SYNTAX FIX REQUEST]
The previously generated Verilog test code has the following issues:
{chr(10).join(issues_text)}
[ORIGINAL CODE]
```verilog
{original_code}
```
[YOUR TASK]
Fix the above code to address these issues. Pay special attention to:
1. **Width Mismatch**: When you want to input a bit sequence (e.g., 01111100) to a single-bit signal:
- WRONG: `{{in}} = 8'b01111100;` (truncates to single bit)
- CORRECT: Use a shift register
```verilog
reg [7:0] shift_reg;
shift_reg = 8'b01111100;
for (i = 0; i < 8; i = i + 1) begin
in = shift_reg[7];
shift_reg = shift_reg << 1;
@(posedge clk);
end
```
2. **Single-bit Shift**: Shifting a 1-bit signal has no effect:
- WRONG: `in = in >> 1;` (always results in 0)
- CORRECT: Use a multi-bit shift register as shown above
[OUTPUT FORMAT]
Return ONLY the corrected Verilog test scenario code:
```verilog
// Your corrected test code here
```
"""
return prompt
def _get_compile_error(self, iter_dir: str) -> str:
"""
获取 Verilator 编译错误日志
Args:
iter_dir: 迭代目录
Returns:
错误日志字符串
"""
error_parts = []
# 检查 obj_dir 是否存在
obj_dir = os.path.join(iter_dir, "obj_dir")
if not os.path.exists(obj_dir):
error_parts.append("obj_dir not created - compilation failed early")
# 检查可能的日志文件
log_files = [
os.path.join(iter_dir, "verilator.log"),
os.path.join(iter_dir, "compile.log"),
os.path.join(obj_dir, "Vtestbench.log"),
]
for log_file in log_files:
if os.path.exists(log_file):
try:
with open(log_file, 'r', errors='ignore') as f:
content = f.read()
if content.strip():
error_parts.append(f"=== {os.path.basename(log_file)} ===")
error_parts.append(content[-2000:]) # 最后 2000 字符
except Exception:
pass
# 如果没有找到日志文件,检查目录内容
if not error_parts:
error_parts.append(f"Directory contents of {iter_dir}:")
try:
for item in os.listdir(iter_dir):
error_parts.append(f" {item}")
except Exception:
pass
return '\n'.join(error_parts) if error_parts else "Unknown compilation error"
def _generate_compile_fix_prompt(self, compile_error: str, original_code: str) -> str:
"""
生成编译错误修正 Prompt
Args:
compile_error: 编译错误日志
original_code: 原始代码
Returns:
修正 Prompt
"""
# 截取关键错误信息
error_lines = compile_error.split('\n')
key_errors = []
for line in error_lines:
line = line.strip()
if any(kw in line.lower() for kw in ['error', 'syntax', 'fatal', 'undefined', 'illegal']):
key_errors.append(line)
if len(key_errors) > 10: # 最多 10 条关键错误
break
prompt = f"""
[COMPILATION ERROR FIX REQUEST]
The Verilog test code failed to compile with Verilator. Here are the key errors:
```
{chr(10).join(key_errors) if key_errors else compile_error[:1000]}
```
[ORIGINAL CODE]
```verilog
{original_code[:2000]} // Truncated if too long
```
[COMMON VERILOG ISSUES TO CHECK]
1. **Width mismatch**: Assigning wide values to narrow signals
- Problem: `{{in}} = 8'b01111100;` where `in` is 1-bit
- Fix: Use shift register to input bits one at a time
2. **Undefined signals**: Using signals that are not declared
- Check spelling of signal names against the testbench
3. **Syntax errors**: Missing semicolons, mismatched begin/end
- Check all statements end with semicolon
- Ensure all `begin` have matching `end`
4. **Timescale issues**: Missing timescale directive
- The testbench should have `timescale 1ns / 1ps`
[YOUR TASK]
Generate a CORRECTED version of the test code that will compile successfully.
Focus on fixing the specific errors shown above.
[OUTPUT FORMAT]
Return ONLY the corrected Verilog test scenario code:
```verilog
// Your corrected test code here
```
"""
return prompt
def run(self):
logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
# 1. 确保工作目录存在 (saves/任务名/5_CGA)
work_dir = os.path.join(self.task_dir, self.work_subdir)
if os.path.exists(work_dir):
shutil.rmtree(work_dir)
os.makedirs(work_dir, exist_ok=True)
# === [新增] Step 0: 语义分析 ===
logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
self.semantic_result = None
try:
semantic_analyzer = SemanticAnalyzer(self.DUT_code)
self.semantic_result = semantic_analyzer.analyze()
# 记录分析结果摘要
fp_count = len(self.semantic_result.get('function_points', []))
fsm_info = semantic_analyzer.get_fsm_info()
if fsm_info:
logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
f"({len(fsm_info.get('states', []))} states)")
logger.info(f" Total function points identified: {fp_count}")
# 保存语义分析报告
semantic_report = semantic_analyzer.generate_prompt_context()
ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
# === [新增] Step 0.1: 初始化能量分配器 ===
if self.semantic_result.get('function_points'):
self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
energy_init_result = self.energy_allocator.initialize(
self.semantic_result['function_points']
)
logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
# === [新增] Step 0.2: 初始化多样性约束注入器 ===
history_file = os.path.join(work_dir, "test_history.json")
# 创建 TestHistoryManager 并传递 history_file
history_manager = TestHistoryManager(history_file=history_file)
self.diversity_injector = DiversityInjector(history_manager=history_manager)
logger.info(f" Diversity injector initialized with history file: {history_file}")
# === [新增] Step 0.3: 初始化质量评估器 ===
if self.semantic_result.get('function_points'):
self.quality_evaluator = QualityEvaluator(
function_points=self.semantic_result['function_points']
)
logger.info(f" Quality evaluator initialized")
except Exception as e:
logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
# ================================
current_tb = self.TB_code
last_annotated_file = None
# --- Baseline ---
logger.info(f"--- CGA Iter 0 (Baseline) ---")
iter0_dir = os.path.join(work_dir, "iter_0")
os.makedirs(iter0_dir, exist_ok=True)
self._prepare_dut(iter0_dir)
ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
self.best_score = score
self.best_tb = current_tb
last_annotated_file = annotated_path
baseline_snapshot = self._extract_coverage_snapshot(annotated_path)
self.best_covered_lines = set(baseline_snapshot["covered_lines"])
self.best_covered_functions = set(baseline_snapshot["covered_functions"])
if self.energy_allocator and self.best_covered_functions:
self.energy_allocator.mark_targets_completed(sorted(self.best_covered_functions))
if self.quality_evaluator and self.best_covered_functions:
self.quality_evaluator.semantic_coverage.update_coverage(
covered_lines=self.best_covered_lines,
covered_functions=sorted(self.best_covered_functions),
test_id="iter_0",
iteration=0
)
logger.info(f"Baseline Coverage: {score:.2f}%")
if score >= self.target_coverage:
logger.success(f"Target reached at baseline!")
# [修改] 返回元组 (代码, 分数)
return self.best_tb, self.best_score
# --- Loop ---
for i in range(1, self.max_iter + 1):
logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
iter_dir = os.path.join(work_dir, f"iter_{i}")
os.makedirs(iter_dir, exist_ok=True)
# === [新增] 能量检查:是否还有活跃目标 ===
if self.energy_allocator:
current_target = self.energy_allocator.select_next_target()
if not current_target:
logger.info("No more active targets with remaining energy. Stopping.")
break
logger.info(f"Target: {current_target}")
# =========================================
if not last_annotated_file: break
# [修改] 传递语义分析结果、能量分配器、多样性注入器、DUT代码给 CoverageParser
parser = CoverageParser(
last_annotated_file,
tb_code=self.best_tb,
semantic_result=self.semantic_result,
energy_allocator=self.energy_allocator,
diversity_injector=self.diversity_injector, # [新增]
dut_code=self.DUT_code # [新增] 传递 DUT 代码以提取信号名
)
prompt = parser.generate_prompt(self.best_score)
# [修改] 改进停止条件:即使找不到 missing_blocks只要覆盖率未达标就继续
if not prompt:
if self.best_score >= self.target_coverage:
logger.success(f"Target coverage reached: {self.best_score:.2f}%")
break
else:
# 覆盖率未达标但找不到明确的 missing_blocks
# 尝试生成随机探索 Prompt
logger.warning(f"No reachable missing blocks found, but coverage ({self.best_score:.2f}%) < target ({self.target_coverage}%).")
logger.info(f"Attempting random exploration to discover uncovered paths...")
prompt = self._generate_exploration_prompt(i)
if not prompt:
logger.info("Could not generate exploration prompt. Stopping.")
break
ls.save_code(prompt, os.path.join(iter_dir, "prompt.txt"))
logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
messages = [{"role": "user", "content": prompt}]
try:
response, _ = llm.llm_call(messages, self.model)
ls.save_code(response, os.path.join(iter_dir, "llm_response.txt"))
codes = llm.extract_code(response, "verilog")
new_task_code = codes[0] if codes else ""
if not new_task_code:
# [新增] 记录失败
if self.energy_allocator:
self.energy_allocator.record_generation(
success=False,
coverage_delta=0.0,
energy_cost=1.0
)
continue
except Exception as e:
logger.error(f"LLM Call failed: {e}")
# [新增] 记录失败
if self.energy_allocator:
self.energy_allocator.record_generation(
success=False,
coverage_delta=0.0,
energy_cost=1.0
)
break
ls.save_code(new_task_code, os.path.join(iter_dir, "generated_scenario.v"))
injector = TBInjector(self.best_tb)
enhanced_tb = injector.inject(new_task_code, iter_idx=i)
# [新增] 检查语法预检查结果
validation_result = injector.last_validation_result
syntax_issues = validation_result.get('syntax_check', {}) if validation_result else {}
if syntax_issues.get('should_retry', False):
logger.warning(f"[CGA-{i}] Syntax issues detected in generated code. Attempting retry...")
# 生成修正后的 Prompt包含语法问题提示
retry_prompt = self._generate_syntax_fix_prompt(new_task_code, syntax_issues, prompt)
if retry_prompt:
try:
retry_response, _ = llm.llm_call([{"role": "user", "content": retry_prompt}], self.model)
retry_codes = llm.extract_code(retry_response, "verilog")
if retry_codes:
new_task_code = retry_codes[0]
ls.save_code(retry_prompt, os.path.join(iter_dir, "retry_prompt.txt"))
ls.save_code(retry_response, os.path.join(iter_dir, "retry_response.txt"))
ls.save_code(new_task_code, os.path.join(iter_dir, "generated_scenario_retry.v"))
enhanced_tb = injector.inject(new_task_code, iter_idx=i)
logger.info(f"[CGA-{i}] Retry code generated successfully")
except Exception as e:
logger.warning(f"[CGA-{i}] Retry failed: {e}")
self._prepare_dut(iter_dir)
ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
# [新增] 编译失败时的错误反馈机制
if not success:
compile_error = self._get_compile_error(iter_dir)
if compile_error:
logger.error(f"[CGA-{i}] Verilator compilation failed:")
logger.error(compile_error[:500]) # 截断过长的错误信息
# 尝试让 LLM 修正编译错误(最多 1 次重试)
if not hasattr(self, '_compile_retry_count'):
self._compile_retry_count = {}
self._compile_retry_count[i] = self._compile_retry_count.get(i, 0)
if self._compile_retry_count[i] < 1:
logger.info(f"[CGA-{i}] Asking LLM to fix compilation errors...")
fix_prompt = self._generate_compile_fix_prompt(compile_error, new_task_code)
try:
fix_response, _ = llm.llm_call([{"role": "user", "content": fix_prompt}], self.model)
fix_codes = llm.extract_code(fix_response, "verilog")
if fix_codes:
fixed_code = fix_codes[0]
ls.save_code(fix_prompt, os.path.join(iter_dir, "compile_fix_prompt.txt"))
ls.save_code(fix_response, os.path.join(iter_dir, "compile_fix_response.txt"))
ls.save_code(fixed_code, os.path.join(iter_dir, "generated_scenario_compile_fix.v"))
enhanced_tb = injector.inject(fixed_code, iter_idx=i)
ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
# 再次尝试编译
success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
if success:
logger.info(f"[CGA-{i}] Compilation fixed! Score: {new_score:.2f}%")
new_task_code = fixed_code
except Exception as e:
logger.warning(f"[CGA-{i}] Compile fix attempt failed: {e}")
self._compile_retry_count[i] += 1
coverage_snapshot = self._extract_coverage_snapshot(new_annotated_path) if success else {
"covered_lines": set(),
"covered_functions": set(),
"coverable_lines": set(),
}
current_covered_lines = set(coverage_snapshot["covered_lines"])
current_covered_functions = set(coverage_snapshot["covered_functions"])
newly_covered_lines = current_covered_lines - self.best_covered_lines
newly_covered_functions = current_covered_functions - self.best_covered_functions
# === [新增] 记录生成结果到能量分配器 ===
coverage_delta = new_score - self.best_score if success else 0.0
current_target_name = None
target_hit = False
if self.energy_allocator and self.energy_allocator.current_target:
current_target_name = self.energy_allocator.current_target.function_point
if current_covered_functions:
target_hit = current_target_name in current_covered_functions
else:
target_hit = success and new_score > self.best_score
generation_success = success and target_hit
if self.energy_allocator:
self.energy_allocator.record_generation(
success=generation_success,
coverage_delta=coverage_delta,
energy_cost=1.0
)
extra_completed_functions = set(newly_covered_functions)
if generation_success and current_target_name:
extra_completed_functions.discard(current_target_name)
if extra_completed_functions:
self.energy_allocator.mark_targets_completed(sorted(extra_completed_functions))
# =========================================
# === [新增] 记录测试用例到多样性历史 ===
if self.diversity_injector:
# 提取已知信号
known_signals = []
if self.semantic_result:
known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
self.diversity_injector.record_test(
code=new_task_code,
target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
coverage_score=new_score,
success=generation_success,
iteration=i,
known_signals=known_signals
)
# =======================================
# === [新增] Layer 3: 质量评估 ===
if self.quality_evaluator:
# 评估测试用例质量
eval_result = self.quality_evaluator.evaluate_test_case(
code=new_task_code,
covered_lines=newly_covered_lines,
covered_functions=sorted(newly_covered_functions),
test_id=f"iter_{i}",
iteration=i
)
# 记录多样性得分
diversity_score = eval_result.get('diversity', {}).get('overall_score', 0)
logger.info(f" Quality Evaluation: diversity={diversity_score:.2f}")
# 检查是否应该接受该测试用例
should_accept, reason = self.quality_evaluator.should_accept(eval_result)
if not should_accept:
logger.warning(f" Quality check failed: {reason}")
# =====================================
if success and new_score > self.best_score:
improvement = new_score - self.best_score
logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
self.best_score = new_score
self.best_tb = enhanced_tb
last_annotated_file = new_annotated_path
self.best_covered_lines = current_covered_lines
self.best_covered_functions = current_covered_functions
elif success and new_score == self.best_score:
logger.info(f"Coverage unchanged. Keeping previous.")
else:
logger.warning(f"Regression or Failure. Discarding changes.")
if self.best_score >= self.target_coverage:
logger.success("Target coverage reached!")
break
logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
# === [新增] 生成能量分配报告 ===
if self.energy_allocator:
energy_report = self.energy_allocator.generate_report()
ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
# =================================
# === [新增] 生成多样性报告并保存历史 ===
if self.diversity_injector:
diversity_report = self.diversity_injector.generate_diversity_report()
ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
# 保存测试历史
self.diversity_injector.history.save()
# ======================================
# === [新增] Layer 3: 生成质量评估报告 ===
if self.quality_evaluator:
quality_report = self.quality_evaluator.generate_report()
ls.save_code(quality_report, os.path.join(work_dir, "quality_evaluation_report.txt"))
logger.info(f"Quality evaluation report saved to {work_dir}/quality_evaluation_report.txt")
# 输出语义覆盖率摘要
coverage_result = self.quality_evaluator.semantic_coverage.calculate_coverage()
logger.info(f"Semantic Coverage: {coverage_result.semantic_coverage:.2%}")
# ===========================================
# [修改] 返回元组 (代码, 分数)
return self.best_tb, self.best_score