Files
TBgen_App/autoline/TB_cga.py
2026-03-30 16:46:48 +08:00

1454 lines
63 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# # #第四版
# # """
# # Description : Coverage-Guided Agent (CGA) Main Controller
# # - Integrated with Layer 0: Semantic Analysis
# # - Integrated with Layer 1: Diversity Constraint Injection
# # - Integrated with Layer 4: Energy Allocation
# # Author : CorrectBench Integration
# # """
# # import os
# # import sys
# # import shutil
# # import LLM_call as llm
# # import loader_saver as ls
# # from loader_saver import autologger as logger
# # from utils.verilator_call import verilator_run_coverage
# # from autoline.cga_utils import CoverageParser, TBInjector
# # # [新增] 导入语义分析层
# # from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
# # # [新增] 导入能量分配层
# # from autoline.energy_allocator import EnergyAllocator, EnergyState
# # # [新增] 导入多样性约束注入器
# # from autoline.diversity_injector import DiversityInjector
# # # [新增] 导入测试历史管理器
# # from autoline.test_history import TestHistoryManager
# # class TaskTBCGA:
# # def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config):
# # self.task_dir = task_dir
# # self.task_id = task_id
# # self.header = header
# # self.DUT_code = DUT_code
# # self.TB_code = TB_code
# # self.config = config
# # self.max_iter = 5
# # self.target_coverage = 95.0
# # self.model = config.gpt.model
# # self.best_tb = TB_code
# # self.best_score = 0.0
# # # [新增] 能量分配器
# # self.energy_allocator: EnergyAllocator = None
# # # [新增] 多样性约束注入器
# # self.diversity_injector: DiversityInjector = None
# # # [新增辅助函数] 从父目录拷贝 DUT
# # def _prepare_dut(self, target_dir):
# # source_dut = os.path.join(self.task_dir, "DUT.v")
# # target_dut = os.path.join(target_dir, "DUT.v")
# # # 优先拷贝现有的文件
# # if os.path.exists(source_dut):
# # shutil.copy(source_dut, target_dut)
# # else:
# # # 只有当文件由于某种原因被删除了,才降级使用内存中的 code
# # ls.save_code(self.DUT_code, target_dut)
# # def run(self):
# # logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
# # # 1. 确保工作目录存在 (saves/任务名/5_CGA)
# # work_dir = os.path.join(self.task_dir, "5_CGA")
# # if os.path.exists(work_dir):
# # shutil.rmtree(work_dir)
# # os.makedirs(work_dir, exist_ok=True)
# # # === [新增] Step 0: 语义分析 ===
# # logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
# # self.semantic_result = None
# # try:
# # semantic_analyzer = SemanticAnalyzer(self.DUT_code)
# # self.semantic_result = semantic_analyzer.analyze()
# # # 记录分析结果摘要
# # fp_count = len(self.semantic_result.get('function_points', []))
# # fsm_info = semantic_analyzer.get_fsm_info()
# # if fsm_info:
# # logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
# # f"({len(fsm_info.get('states', []))} states)")
# # logger.info(f" Total function points identified: {fp_count}")
# # # 保存语义分析报告
# # semantic_report = semantic_analyzer.generate_prompt_context()
# # ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
# # # === [新增] Step 0.1: 初始化能量分配器 ===
# # if self.semantic_result.get('function_points'):
# # self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
# # energy_init_result = self.energy_allocator.initialize(
# # self.semantic_result['function_points']
# # )
# # logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
# # # === [新增] Step 0.2: 初始化多样性约束注入器 ===
# # history_file = os.path.join(work_dir, "test_history.json")
# # # 创建 TestHistoryManager 并传递 history_file
# # history_manager = TestHistoryManager(history_file=history_file)
# # self.diversity_injector = DiversityInjector(history_manager=history_manager)
# # logger.info(f" Diversity injector initialized with history file: {history_file}")
# # except Exception as e:
# # logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
# # # ================================
# # current_tb = self.TB_code
# # last_annotated_file = None
# # # --- Baseline ---
# # logger.info(f"--- CGA Iter 0 (Baseline) ---")
# # iter0_dir = os.path.join(work_dir, "iter_0")
# # os.makedirs(iter0_dir, exist_ok=True)
# # self._prepare_dut(iter0_dir)
# # ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
# # success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
# # self.best_score = score
# # self.best_tb = current_tb
# # last_annotated_file = annotated_path
# # logger.info(f"Baseline Coverage: {score:.2f}%")
# # if score >= self.target_coverage:
# # logger.success(f"Target reached at baseline!")
# # # [修改] 返回元组 (代码, 分数)
# # return self.best_tb, self.best_score
# # # --- Loop ---
# # for i in range(1, self.max_iter + 1):
# # logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
# # # === [新增] 能量检查:是否还有活跃目标 ===
# # if self.energy_allocator:
# # current_target = self.energy_allocator.select_next_target()
# # if not current_target:
# # logger.info("No more active targets with remaining energy. Stopping.")
# # break
# # logger.info(f"Target: {current_target}")
# # # =========================================
# # if not last_annotated_file: break
# # # [修改] 传递语义分析结果、能量分配器、多样性注入器给 CoverageParser
# # parser = CoverageParser(
# # last_annotated_file,
# # tb_code=self.best_tb,
# # semantic_result=self.semantic_result,
# # energy_allocator=self.energy_allocator,
# # diversity_injector=self.diversity_injector # [新增]
# # )
# # prompt = parser.generate_prompt(self.best_score)
# # if not prompt:
# # logger.info("No reachable missing blocks found. Stopping.")
# # break
# # logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
# # messages = [{"role": "user", "content": prompt}]
# # try:
# # response, _ = llm.llm_call(messages, self.model)
# # codes = llm.extract_code(response, "verilog")
# # new_task_code = codes[0] if codes else ""
# # if not new_task_code:
# # # [新增] 记录失败
# # if self.energy_allocator:
# # self.energy_allocator.record_generation(
# # success=False,
# # coverage_delta=0.0,
# # energy_cost=1.0
# # )
# # continue
# # except Exception as e:
# # logger.error(f"LLM Call failed: {e}")
# # # [新增] 记录失败
# # if self.energy_allocator:
# # self.energy_allocator.record_generation(
# # success=False,
# # coverage_delta=0.0,
# # energy_cost=1.0
# # )
# # break
# # injector = TBInjector(self.best_tb)
# # enhanced_tb = injector.inject(new_task_code, iter_idx=i)
# # iter_dir = os.path.join(work_dir, f"iter_{i}")
# # os.makedirs(iter_dir, exist_ok=True)
# # self._prepare_dut(iter_dir)
# # ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
# # success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
# # # === [新增] 记录生成结果到能量分配器 ===
# # coverage_delta = new_score - self.best_score if success else 0.0
# # generation_success = success and new_score > self.best_score
# # if self.energy_allocator:
# # self.energy_allocator.record_generation(
# # success=generation_success,
# # coverage_delta=coverage_delta,
# # energy_cost=1.0
# # )
# # # =========================================
# # # === [新增] 记录测试用例到多样性历史 ===
# # if self.diversity_injector:
# # # 提取已知信号
# # known_signals = []
# # if self.semantic_result:
# # known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
# # self.diversity_injector.record_test(
# # code=new_task_code,
# # target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
# # coverage_score=new_score,
# # success=generation_success,
# # iteration=i,
# # known_signals=known_signals
# # )
# # # =======================================
# # if success and new_score > self.best_score:
# # improvement = new_score - self.best_score
# # logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
# # self.best_score = new_score
# # self.best_tb = enhanced_tb
# # last_annotated_file = new_annotated_path
# # elif success and new_score == self.best_score:
# # logger.info(f"Coverage unchanged. Keeping previous.")
# # else:
# # logger.warning(f"Regression or Failure. Discarding changes.")
# # if self.best_score >= self.target_coverage:
# # logger.success("Target coverage reached!")
# # break
# # logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
# # # === [新增] 生成能量分配报告 ===
# # if self.energy_allocator:
# # energy_report = self.energy_allocator.generate_report()
# # ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
# # logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
# # # =================================
# # # === [新增] 生成多样性报告并保存历史 ===
# # if self.diversity_injector:
# # diversity_report = self.diversity_injector.generate_diversity_report()
# # ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
# # logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
# # # 保存测试历史
# # self.diversity_injector.history.save()
# # # ======================================
# # # [修改] 返回元组 (代码, 分数)
# # return self.best_tb, self.best_score
# #终版
# """
# Description : Coverage-Guided Agent (CGA) Main Controller
# - Integrated with Layer 0: Semantic Analysis
# - Integrated with Layer 1: Diversity Constraint Injection
# - Integrated with Layer 3: Quality Evaluation
# - Integrated with Layer 4: Energy Allocation
# Author : CorrectBench Integration
# """
# import os
# import sys
# import shutil
# import LLM_call as llm
# import loader_saver as ls
# from loader_saver import autologger as logger
# from utils.verilator_call import verilator_run_coverage
# from autoline.cga_utils import CoverageParser, TBInjector
# # [新增] 导入语义分析层
# from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
# # [新增] 导入能量分配层
# from autoline.energy_allocator import EnergyAllocator, EnergyState
# # [新增] 导入多样性约束注入器
# from autoline.diversity_injector import DiversityInjector
# # [新增] 导入测试历史管理器
# from autoline.test_history import TestHistoryManager
# # [新增] 导入质量评估层
# from autoline.quality_evaluator import QualityEvaluator, DiversityScore, SemanticCoverageResult
# class TaskTBCGA:
# def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config):
# self.task_dir = task_dir
# self.task_id = task_id
# self.header = header
# self.DUT_code = DUT_code
# self.TB_code = TB_code
# self.config = config
# self.max_iter = 5
# self.target_coverage = 95.0
# self.model = config.gpt.model
# self.best_tb = TB_code
# self.best_score = 0.0
# # [新增] 能量分配器
# self.energy_allocator: EnergyAllocator = None
# # [新增] 多样性约束注入器
# self.diversity_injector: DiversityInjector = None
# # [新增] 质量评估器
# self.quality_evaluator: QualityEvaluator = None
# # [新增辅助函数] 从父目录拷贝 DUT
# def _prepare_dut(self, target_dir):
# source_dut = os.path.join(self.task_dir, "DUT.v")
# target_dut = os.path.join(target_dir, "DUT.v")
# # 优先拷贝现有的文件
# if os.path.exists(source_dut):
# shutil.copy(source_dut, target_dut)
# else:
# # 只有当文件由于某种原因被删除了,才降级使用内存中的 code
# ls.save_code(self.DUT_code, target_dut)
# def _generate_exploration_prompt(self, iteration: int) -> str:
# """
# 生成探索性测试 Prompt
# 当找不到明确的 missing blocks 但覆盖率仍未达标时,
# 生成一个探索性 Prompt 来尝试发现新的测试路径。
# Args:
# iteration: 当前迭代次数
# Returns:
# 探索性测试 Prompt如果无法生成则返回 None
# """
# # 从语义分析结果获取 FSM 和功能点信息
# fsm_info = ""
# if self.semantic_result:
# fsm_data = self.semantic_result.get('fsm', {})
# if fsm_data:
# states = fsm_data.get('states', [])
# state_var = fsm_data.get('state_variable', 'state')
# fsm_info = f"""
# [FSM INFORMATION]
# - State variable: {state_var}
# - Known states: {', '.join(states) if states else 'unknown'}
# The DUT appears to be a Finite State Machine. To improve coverage:
# 1. Try to visit each state by driving inputs that trigger state transitions
# 2. For each state, try different input combinations
# 3. Consider edge cases: reset transitions, timeout conditions, error states
# """
# # 从能量分配器获取目标功能点
# target_info = ""
# if self.energy_allocator and self.energy_allocator.current_target:
# target = self.energy_allocator.current_target
# target_info = f"""
# [CURRENT TARGET]
# Focus on: {target.function_point}
# Remaining energy: {target.remaining}
# """
# # 从多样性注入器获取已尝试的测试
# diversity_hints = ""
# if self.diversity_injector:
# history = self.diversity_injector.history
# # if history and len(history.history) > 0:
# # recent_tests = history.history[-5:] if len(history.history) > 5 else history.history
# if history and hasattr(history, 'records') and len(history.records) > 0:
# recent_tests = history.records[-5:] if len(history.records) > 5 else history.records
# diversity_hints = f"""
# [RECENTLY TRIED APPROACHES - AVOID REPETITION]
# Recent test patterns tried:
# """
# # for i, test in enumerate(recent_tests):
# # diversity_hints += f"- Iter {test.get('iteration', i)}: target={test.get('target_function', 'unknown')}\n"
# for i, test in enumerate(recent_tests):
# # TestRecord 是 dataclass使用属性访问
# target = getattr(test, 'target_function', 'unknown') if hasattr(test, 'target_function') else 'unknown'
# iteration = getattr(test, 'iteration', i) if hasattr(test, 'iteration') else i
# diversity_hints += f"- Iter {iteration}: target={target}\n"
# prompt = f"""
# [EXPLORATION MODE - ITERATION {iteration}]
# Current coverage is {self.best_score:.2f}%, but no specific uncovered code blocks were identified.
# This may happen when:
# 1. Coverage data is incomplete or filtered
# 2. Branch/condition coverage needs improvement (not just line coverage)
# 3. State transitions in FSM are not fully exercised
# {fsm_info}
# {target_info}
# {diversity_hints}
# [YOUR TASK]
# Write an EXPLORATORY test scenario that:
# 1. Covers different input combinations than previous tests
# 2. Explores different FSM state transitions
# 3. Tests edge cases and boundary conditions
# 4. Varies timing and sequence of inputs
# [OUTPUT FORMAT]
# Return ONLY Verilog test scenario code (no task wrapper).
# Use the signal names from the testbench.
# ```verilog
# // Your exploratory test code here
# ```
# """
# return prompt
# def run(self):
# logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
# # 1. 确保工作目录存在 (saves/任务名/5_CGA)
# work_dir = os.path.join(self.task_dir, "5_CGA")
# if os.path.exists(work_dir):
# shutil.rmtree(work_dir)
# os.makedirs(work_dir, exist_ok=True)
# # === [新增] Step 0: 语义分析 ===
# logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
# self.semantic_result = None
# try:
# semantic_analyzer = SemanticAnalyzer(self.DUT_code)
# self.semantic_result = semantic_analyzer.analyze()
# # 记录分析结果摘要
# fp_count = len(self.semantic_result.get('function_points', []))
# fsm_info = semantic_analyzer.get_fsm_info()
# if fsm_info:
# logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
# f"({len(fsm_info.get('states', []))} states)")
# logger.info(f" Total function points identified: {fp_count}")
# # 保存语义分析报告
# semantic_report = semantic_analyzer.generate_prompt_context()
# ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
# # === [新增] Step 0.1: 初始化能量分配器 ===
# if self.semantic_result.get('function_points'):
# self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
# energy_init_result = self.energy_allocator.initialize(
# self.semantic_result['function_points']
# )
# logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
# # === [新增] Step 0.2: 初始化多样性约束注入器 ===
# history_file = os.path.join(work_dir, "test_history.json")
# # 创建 TestHistoryManager 并传递 history_file
# history_manager = TestHistoryManager(history_file=history_file)
# self.diversity_injector = DiversityInjector(history_manager=history_manager)
# logger.info(f" Diversity injector initialized with history file: {history_file}")
# # === [新增] Step 0.3: 初始化质量评估器 ===
# if self.semantic_result.get('function_points'):
# self.quality_evaluator = QualityEvaluator(
# function_points=self.semantic_result['function_points']
# )
# logger.info(f" Quality evaluator initialized")
# except Exception as e:
# logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
# # ================================
# current_tb = self.TB_code
# last_annotated_file = None
# # --- Baseline ---
# logger.info(f"--- CGA Iter 0 (Baseline) ---")
# iter0_dir = os.path.join(work_dir, "iter_0")
# os.makedirs(iter0_dir, exist_ok=True)
# self._prepare_dut(iter0_dir)
# ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
# success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
# self.best_score = score
# self.best_tb = current_tb
# last_annotated_file = annotated_path
# logger.info(f"Baseline Coverage: {score:.2f}%")
# if score >= self.target_coverage:
# logger.success(f"Target reached at baseline!")
# # [修改] 返回元组 (代码, 分数)
# return self.best_tb, self.best_score
# # --- Loop ---
# for i in range(1, self.max_iter + 1):
# logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
# # === [新增] 能量检查:是否还有活跃目标 ===
# if self.energy_allocator:
# current_target = self.energy_allocator.select_next_target()
# if not current_target:
# logger.info("No more active targets with remaining energy. Stopping.")
# break
# logger.info(f"Target: {current_target}")
# # =========================================
# if not last_annotated_file: break
# # [修改] 传递语义分析结果、能量分配器、多样性注入器给 CoverageParser
# parser = CoverageParser(
# last_annotated_file,
# tb_code=self.best_tb,
# semantic_result=self.semantic_result,
# energy_allocator=self.energy_allocator,
# diversity_injector=self.diversity_injector # [新增]
# )
# prompt = parser.generate_prompt(self.best_score)
# # if not prompt:
# # logger.info("No reachable missing blocks found. Stopping.")
# # break
# if not prompt:
# if self.best_score >= self.target_coverage:
# break # 达标才停止
# else:
# # 未达标,尝试探索性测试
# prompt = self._generate_exploration_prompt(i)
# logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
# messages = [{"role": "user", "content": prompt}]
# try:
# response, _ = llm.llm_call(messages, self.model)
# codes = llm.extract_code(response, "verilog")
# new_task_code = codes[0] if codes else ""
# if not new_task_code:
# # [新增] 记录失败
# if self.energy_allocator:
# self.energy_allocator.record_generation(
# success=False,
# coverage_delta=0.0,
# energy_cost=1.0
# )
# continue
# except Exception as e:
# logger.error(f"LLM Call failed: {e}")
# # [新增] 记录失败
# if self.energy_allocator:
# self.energy_allocator.record_generation(
# success=False,
# coverage_delta=0.0,
# energy_cost=1.0
# )
# break
# injector = TBInjector(self.best_tb)
# enhanced_tb = injector.inject(new_task_code, iter_idx=i)
# iter_dir = os.path.join(work_dir, f"iter_{i}")
# os.makedirs(iter_dir, exist_ok=True)
# self._prepare_dut(iter_dir)
# ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
# success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
# # === [新增] 记录生成结果到能量分配器 ===
# coverage_delta = new_score - self.best_score if success else 0.0
# generation_success = success and new_score > self.best_score
# if self.energy_allocator:
# self.energy_allocator.record_generation(
# success=generation_success,
# coverage_delta=coverage_delta,
# energy_cost=1.0
# )
# # =========================================
# # === [新增] 记录测试用例到多样性历史 ===
# if self.diversity_injector:
# # 提取已知信号
# known_signals = []
# if self.semantic_result:
# known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
# self.diversity_injector.record_test(
# code=new_task_code,
# target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
# coverage_score=new_score,
# success=generation_success,
# iteration=i,
# known_signals=known_signals
# )
# # =======================================
# # === [新增] Layer 3: 质量评估 ===
# if self.quality_evaluator:
# # 评估测试用例质量
# eval_result = self.quality_evaluator.evaluate_test_case(
# code=new_task_code,
# covered_lines=set(), # 如果有具体覆盖行信息可传入
# covered_functions=[], # 如果有覆盖功能点信息可传入
# test_id=f"iter_{i}",
# iteration=i
# )
# # 记录多样性得分
# diversity_score = eval_result.get('diversity', {}).get('overall_score', 0)
# logger.info(f" Quality Evaluation: diversity={diversity_score:.2f}")
# # 检查是否应该接受该测试用例
# should_accept, reason = self.quality_evaluator.should_accept(eval_result)
# if not should_accept:
# logger.warning(f" Quality check failed: {reason}")
# # =====================================
# if success and new_score > self.best_score:
# improvement = new_score - self.best_score
# logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
# self.best_score = new_score
# self.best_tb = enhanced_tb
# last_annotated_file = new_annotated_path
# elif success and new_score == self.best_score:
# logger.info(f"Coverage unchanged. Keeping previous.")
# else:
# logger.warning(f"Regression or Failure. Discarding changes.")
# if self.best_score >= self.target_coverage:
# logger.success("Target coverage reached!")
# break
# logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
# # === [新增] 生成能量分配报告 ===
# if self.energy_allocator:
# energy_report = self.energy_allocator.generate_report()
# ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
# logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
# # =================================
# # === [新增] 生成多样性报告并保存历史 ===
# if self.diversity_injector:
# diversity_report = self.diversity_injector.generate_diversity_report()
# ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
# logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
# # 保存测试历史
# self.diversity_injector.history.save()
# # ======================================
# # === [新增] Layer 3: 生成质量评估报告 ===
# if self.quality_evaluator:
# quality_report = self.quality_evaluator.generate_report()
# ls.save_code(quality_report, os.path.join(work_dir, "quality_evaluation_report.txt"))
# logger.info(f"Quality evaluation report saved to {work_dir}/quality_evaluation_report.txt")
# # 输出语义覆盖率摘要
# coverage_result = self.quality_evaluator.semantic_coverage.calculate_coverage()
# logger.info(f"Semantic Coverage: {coverage_result.semantic_coverage:.2%}")
# # ===========================================
# # [修改] 返回元组 (代码, 分数)
# return self.best_tb, self.best_score
"""
Description : Coverage-Guided Agent (CGA) Main Controller
- Integrated with Layer 0: Semantic Analysis
- Integrated with Layer 1: Diversity Constraint Injection
- Integrated with Layer 3: Quality Evaluation
- Integrated with Layer 4: Energy Allocation
Author : CorrectBench Integration
"""
import os
import re
import sys
import shutil
import LLM_call as llm
import loader_saver as ls
from loader_saver import autologger as logger
from utils.verilator_call import verilator_run_coverage
from autoline.cga_utils import CoverageParser, TBInjector
# [新增] 导入语义分析层
from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
# [新增] 导入能量分配层
from autoline.energy_allocator import EnergyAllocator, EnergyState
# [新增] 导入多样性约束注入器
from autoline.diversity_injector import DiversityInjector
# [新增] 导入测试历史管理器
from autoline.test_history import TestHistoryManager
# [新增] 导入质量评估层
from autoline.quality_evaluator import QualityEvaluator, DiversityScore, SemanticCoverageResult
class TaskTBCGA:
def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config, work_subdir="CGA", max_iter=None):
self.task_dir = task_dir
self.task_id = task_id
self.header = header
self.DUT_code = DUT_code
self.TB_code = TB_code
self.config = config
self.work_subdir = work_subdir
self.max_iter = config.autoline.cga.max_iter if max_iter is None else max_iter
self.target_coverage = config.autoline.cga.target_coverage
self.model = config.gpt.model
self.best_tb = TB_code
self.best_score = 0.0
self.best_covered_lines = set()
self.best_covered_functions = set()
# [新增] 能量分配器
self.energy_allocator: EnergyAllocator = None
# [新增] 多样性约束注入器
self.diversity_injector: DiversityInjector = None
# [新增] 质量评估器
self.quality_evaluator: QualityEvaluator = None
# [新增辅助函数] 从父目录拷贝 DUT
def _prepare_dut(self, target_dir):
source_dut = os.path.join(self.task_dir, "DUT.v")
target_dut = os.path.join(target_dir, "DUT.v")
# 优先拷贝现有的文件
if os.path.exists(source_dut):
shutil.copy(source_dut, target_dut)
else:
# 只有当文件由于某种原因被删除了,才降级使用内存中的 code
ls.save_code(self.DUT_code, target_dut)
def _extract_coverage_snapshot(self, annotated_path):
"""
从 Verilator annotated DUT 中提取当前已覆盖行和已覆盖功能点。
"""
snapshot = {
"covered_lines": set(),
"covered_functions": set(),
"coverable_lines": set(),
}
if not annotated_path or not os.path.exists(annotated_path):
return snapshot
pct_pattern = re.compile(r"^%(\d+)\s+(.*)$")
tilde_pattern = re.compile(r"^~(\d+)\s+(.*)$")
caret_pattern = re.compile(r"^\^(\d+)\s+(.*)$")
plain_pattern = re.compile(r"^\s*(\d+)\s+(.*)$")
decl_pattern = re.compile(r"^\s*(input|output|inout|wire|reg|logic|parameter|localparam|assign)\b")
with open(annotated_path, "r", encoding="utf-8", errors="ignore") as f:
for line_no, raw_line in enumerate(f, start=1):
stripped = raw_line.strip()
if not stripped:
continue
count = None
code_part = None
is_caret = False
match = pct_pattern.match(stripped)
if match:
count = int(match.group(1))
code_part = match.group(2).strip()
else:
match = tilde_pattern.match(stripped)
if match:
count = int(match.group(1))
code_part = match.group(2).strip()
else:
match = caret_pattern.match(stripped)
if match:
is_caret = True
code_part = match.group(2).strip()
else:
match = plain_pattern.match(stripped)
if match:
count = int(match.group(1))
code_part = match.group(2).strip()
if code_part is None:
continue
if "//" in code_part:
code_part = code_part.split("//", 1)[0].strip()
if not code_part:
continue
if decl_pattern.match(code_part):
continue
if code_part in {"begin", "end", "else", "endmodule", "endcase", ");", "default:"}:
continue
if not any(ch.isalnum() for ch in code_part):
continue
snapshot["coverable_lines"].add(line_no)
if (count is not None) and (count > 0) and not is_caret:
snapshot["covered_lines"].add(line_no)
snapshot["covered_functions"] = self._map_lines_to_function_points(snapshot["covered_lines"])
return snapshot
def _map_lines_to_function_points(self, covered_lines):
"""
用功能点 location 与已覆盖行做交集,推断当前已命中的功能点。
"""
matched = set()
if not self.semantic_result:
return matched
for fp in self.semantic_result.get("function_points", []):
location = fp.get("location", {})
start_line = location.get("start_line", 0)
end_line = location.get("end_line", 0)
if (start_line <= 0) or (end_line <= 0):
continue
if any(start_line <= line_no <= end_line for line_no in covered_lines):
matched.add(fp.get("name", ""))
matched.discard("")
return matched
def _generate_exploration_prompt(self, iteration: int) -> str:
"""
生成探索性测试 Prompt
当找不到明确的 missing blocks 但覆盖率仍未达标时,
生成一个探索性 Prompt 来尝试发现新的测试路径。
Args:
iteration: 当前迭代次数
Returns:
探索性测试 Prompt如果无法生成则返回 None
"""
# 从语义分析结果获取 FSM 和功能点信息
fsm_info = ""
if self.semantic_result:
fsm_data = self.semantic_result.get('fsm', {})
if fsm_data:
states = fsm_data.get('states', [])
state_var = fsm_data.get('state_variable', 'state')
fsm_info = f"""
[FSM INFORMATION]
- State variable: {state_var}
- Known states: {', '.join(states) if states else 'unknown'}
The DUT appears to be a Finite State Machine. To improve coverage:
1. Try to visit each state by driving inputs that trigger state transitions
2. For each state, try different input combinations
3. Consider edge cases: reset transitions, timeout conditions, error states
"""
# 从能量分配器获取目标功能点
target_info = ""
if self.energy_allocator and self.energy_allocator.current_target:
target = self.energy_allocator.current_target
target_info = f"""
[CURRENT TARGET]
Focus on: {target.function_point}
Remaining energy: {target.remaining}
"""
# 从多样性注入器获取已尝试的测试
diversity_hints = ""
if self.diversity_injector:
history = self.diversity_injector.history
# [修复] TestHistoryManager 使用 records 属性,不是 history
if history and hasattr(history, 'records') and len(history.records) > 0:
recent_tests = history.records[-5:] if len(history.records) > 5 else history.records
diversity_hints = f"""
[RECENTLY TRIED APPROACHES - AVOID REPETITION]
Recent test patterns tried:
"""
for i, test in enumerate(recent_tests):
# TestRecord 是 dataclass使用属性访问
target = getattr(test, 'target_function', 'unknown') if hasattr(test, 'target_function') else test.get('target_function', 'unknown') if isinstance(test, dict) else 'unknown'
iteration = getattr(test, 'iteration', i) if hasattr(test, 'iteration') else test.get('iteration', i) if isinstance(test, dict) else i
diversity_hints += f"- Iter {iteration}: target={target}\n"
prompt = f"""
[EXPLORATION MODE - ITERATION {iteration}]
Current coverage is {self.best_score:.2f}%, but no specific uncovered code blocks were identified.
This may happen when:
1. Coverage data is incomplete or filtered
2. Branch/condition coverage needs improvement (not just line coverage)
3. State transitions in FSM are not fully exercised
{fsm_info}
{target_info}
{diversity_hints}
[YOUR TASK]
Write an EXPLORATORY test scenario that:
1. Covers different input combinations than previous tests
2. Explores different FSM state transitions
3. Tests edge cases and boundary conditions
4. Varies timing and sequence of inputs
[OUTPUT FORMAT]
Return ONLY Verilog test scenario code (no task wrapper).
Use the signal names from the testbench.
```verilog
// Your exploratory test code here
```
"""
return prompt
def _generate_syntax_fix_prompt(self, original_code: str, syntax_issues: dict, original_prompt: str) -> str:
"""
生成语法修正 Prompt让 LLM 修复检测到的语法问题
Args:
original_code: 原始生成的代码
syntax_issues: 语法检查结果
original_prompt: 原始 Prompt
Returns:
修正 Prompt
"""
issues_text = []
for issue in syntax_issues.get('width_mismatch', []):
issues_text.append(f"- {issue['message']}")
if 'suggestion' in issue:
issues_text.append(f" Suggestion: {issue['suggestion']}")
for issue in syntax_issues.get('logic_issues', []):
issues_text.append(f"- {issue['message']}")
if 'suggestion' in issue:
issues_text.append(f" Suggestion: {issue['suggestion']}")
for issue in syntax_issues.get('syntax_warnings', []):
if issue['severity'] == 'error':
issues_text.append(f"- ERROR: {issue['message']}")
prompt = f"""
[SYNTAX FIX REQUEST]
The previously generated Verilog test code has the following issues:
{chr(10).join(issues_text)}
[ORIGINAL CODE]
```verilog
{original_code}
```
[YOUR TASK]
Fix the above code to address these issues. Pay special attention to:
1. **Width Mismatch**: When you want to input a bit sequence (e.g., 01111100) to a single-bit signal:
- WRONG: `{{in}} = 8'b01111100;` (truncates to single bit)
- CORRECT: Use a shift register
```verilog
reg [7:0] shift_reg;
shift_reg = 8'b01111100;
for (i = 0; i < 8; i = i + 1) begin
in = shift_reg[7];
shift_reg = shift_reg << 1;
@(posedge clk);
end
```
2. **Single-bit Shift**: Shifting a 1-bit signal has no effect:
- WRONG: `in = in >> 1;` (always results in 0)
- CORRECT: Use a multi-bit shift register as shown above
[OUTPUT FORMAT]
Return ONLY the corrected Verilog test scenario code:
```verilog
// Your corrected test code here
```
"""
return prompt
def _get_compile_error(self, iter_dir: str) -> str:
"""
获取 Verilator 编译错误日志
Args:
iter_dir: 迭代目录
Returns:
错误日志字符串
"""
error_parts = []
# 检查 obj_dir 是否存在
obj_dir = os.path.join(iter_dir, "obj_dir")
if not os.path.exists(obj_dir):
error_parts.append("obj_dir not created - compilation failed early")
# 检查可能的日志文件
log_files = [
os.path.join(iter_dir, "verilator.log"),
os.path.join(iter_dir, "compile.log"),
os.path.join(obj_dir, "Vtestbench.log"),
]
for log_file in log_files:
if os.path.exists(log_file):
try:
with open(log_file, 'r', errors='ignore') as f:
content = f.read()
if content.strip():
error_parts.append(f"=== {os.path.basename(log_file)} ===")
error_parts.append(content[-2000:]) # 最后 2000 字符
except Exception:
pass
# 如果没有找到日志文件,检查目录内容
if not error_parts:
error_parts.append(f"Directory contents of {iter_dir}:")
try:
for item in os.listdir(iter_dir):
error_parts.append(f" {item}")
except Exception:
pass
return '\n'.join(error_parts) if error_parts else "Unknown compilation error"
def _generate_compile_fix_prompt(self, compile_error: str, original_code: str) -> str:
"""
生成编译错误修正 Prompt
Args:
compile_error: 编译错误日志
original_code: 原始代码
Returns:
修正 Prompt
"""
# 截取关键错误信息
error_lines = compile_error.split('\n')
key_errors = []
for line in error_lines:
line = line.strip()
if any(kw in line.lower() for kw in ['error', 'syntax', 'fatal', 'undefined', 'illegal']):
key_errors.append(line)
if len(key_errors) > 10: # 最多 10 条关键错误
break
prompt = f"""
[COMPILATION ERROR FIX REQUEST]
The Verilog test code failed to compile with Verilator. Here are the key errors:
```
{chr(10).join(key_errors) if key_errors else compile_error[:1000]}
```
[ORIGINAL CODE]
```verilog
{original_code[:2000]} // Truncated if too long
```
[COMMON VERILOG ISSUES TO CHECK]
1. **Width mismatch**: Assigning wide values to narrow signals
- Problem: `{{in}} = 8'b01111100;` where `in` is 1-bit
- Fix: Use shift register to input bits one at a time
2. **Undefined signals**: Using signals that are not declared
- Check spelling of signal names against the testbench
3. **Syntax errors**: Missing semicolons, mismatched begin/end
- Check all statements end with semicolon
- Ensure all `begin` have matching `end`
4. **Timescale issues**: Missing timescale directive
- The testbench should have `timescale 1ns / 1ps`
[YOUR TASK]
Generate a CORRECTED version of the test code that will compile successfully.
Focus on fixing the specific errors shown above.
[OUTPUT FORMAT]
Return ONLY the corrected Verilog test scenario code:
```verilog
// Your corrected test code here
```
"""
return prompt
def run(self):
logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
# 1. 确保工作目录存在 (saves/任务名/5_CGA)
work_dir = os.path.join(self.task_dir, self.work_subdir)
if os.path.exists(work_dir):
shutil.rmtree(work_dir)
os.makedirs(work_dir, exist_ok=True)
# === [新增] Step 0: 语义分析 ===
logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
self.semantic_result = None
try:
semantic_analyzer = SemanticAnalyzer(self.DUT_code)
self.semantic_result = semantic_analyzer.analyze()
# 记录分析结果摘要
fp_count = len(self.semantic_result.get('function_points', []))
fsm_info = semantic_analyzer.get_fsm_info()
if fsm_info:
logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
f"({len(fsm_info.get('states', []))} states)")
logger.info(f" Total function points identified: {fp_count}")
# 保存语义分析报告
semantic_report = semantic_analyzer.generate_prompt_context()
ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
# === [新增] Step 0.1: 初始化能量分配器 ===
if self.semantic_result.get('function_points'):
self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
energy_init_result = self.energy_allocator.initialize(
self.semantic_result['function_points']
)
logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
# === [新增] Step 0.2: 初始化多样性约束注入器 ===
history_file = os.path.join(work_dir, "test_history.json")
# 创建 TestHistoryManager 并传递 history_file
history_manager = TestHistoryManager(history_file=history_file)
self.diversity_injector = DiversityInjector(history_manager=history_manager)
logger.info(f" Diversity injector initialized with history file: {history_file}")
# === [新增] Step 0.3: 初始化质量评估器 ===
if self.semantic_result.get('function_points'):
self.quality_evaluator = QualityEvaluator(
function_points=self.semantic_result['function_points']
)
logger.info(f" Quality evaluator initialized")
except Exception as e:
logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
# ================================
current_tb = self.TB_code
last_annotated_file = None
# --- Baseline ---
logger.info(f"--- CGA Iter 0 (Baseline) ---")
iter0_dir = os.path.join(work_dir, "iter_0")
os.makedirs(iter0_dir, exist_ok=True)
self._prepare_dut(iter0_dir)
ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
self.best_score = score
self.best_tb = current_tb
last_annotated_file = annotated_path
baseline_snapshot = self._extract_coverage_snapshot(annotated_path)
self.best_covered_lines = set(baseline_snapshot["covered_lines"])
self.best_covered_functions = set(baseline_snapshot["covered_functions"])
if self.energy_allocator and self.best_covered_functions:
self.energy_allocator.mark_targets_completed(sorted(self.best_covered_functions))
if self.quality_evaluator and self.best_covered_functions:
self.quality_evaluator.semantic_coverage.update_coverage(
covered_lines=self.best_covered_lines,
covered_functions=sorted(self.best_covered_functions),
test_id="iter_0",
iteration=0
)
logger.info(f"Baseline Coverage: {score:.2f}%")
if score >= self.target_coverage:
logger.success(f"Target reached at baseline!")
# [修改] 返回元组 (代码, 分数)
return self.best_tb, self.best_score
# --- Loop ---
for i in range(1, self.max_iter + 1):
logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
iter_dir = os.path.join(work_dir, f"iter_{i}")
os.makedirs(iter_dir, exist_ok=True)
# === [新增] 能量检查:是否还有活跃目标 ===
if self.energy_allocator:
current_target = self.energy_allocator.select_next_target()
if not current_target:
logger.info("No more active targets with remaining energy. Stopping.")
break
logger.info(f"Target: {current_target}")
# =========================================
if not last_annotated_file: break
# [修改] 传递语义分析结果、能量分配器、多样性注入器、DUT代码给 CoverageParser
parser = CoverageParser(
last_annotated_file,
tb_code=self.best_tb,
semantic_result=self.semantic_result,
energy_allocator=self.energy_allocator,
diversity_injector=self.diversity_injector, # [新增]
dut_code=self.DUT_code # [新增] 传递 DUT 代码以提取信号名
)
prompt = parser.generate_prompt(self.best_score)
# [修改] 改进停止条件:即使找不到 missing_blocks只要覆盖率未达标就继续
if not prompt:
if self.best_score >= self.target_coverage:
logger.success(f"Target coverage reached: {self.best_score:.2f}%")
break
else:
# 覆盖率未达标但找不到明确的 missing_blocks
# 尝试生成随机探索 Prompt
logger.warning(f"No reachable missing blocks found, but coverage ({self.best_score:.2f}%) < target ({self.target_coverage}%).")
logger.info(f"Attempting random exploration to discover uncovered paths...")
prompt = self._generate_exploration_prompt(i)
if not prompt:
logger.info("Could not generate exploration prompt. Stopping.")
break
ls.save_code(prompt, os.path.join(iter_dir, "prompt.txt"))
logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
messages = [{"role": "user", "content": prompt}]
try:
response, _ = llm.llm_call(messages, self.model)
ls.save_code(response, os.path.join(iter_dir, "llm_response.txt"))
codes = llm.extract_code(response, "verilog")
new_task_code = codes[0] if codes else ""
if not new_task_code:
# [新增] 记录失败
if self.energy_allocator:
self.energy_allocator.record_generation(
success=False,
coverage_delta=0.0,
energy_cost=1.0
)
continue
except Exception as e:
logger.error(f"LLM Call failed: {e}")
# [新增] 记录失败
if self.energy_allocator:
self.energy_allocator.record_generation(
success=False,
coverage_delta=0.0,
energy_cost=1.0
)
break
ls.save_code(new_task_code, os.path.join(iter_dir, "generated_scenario.v"))
injector = TBInjector(self.best_tb)
enhanced_tb = injector.inject(new_task_code, iter_idx=i)
# [新增] 检查语法预检查结果
validation_result = injector.last_validation_result
syntax_issues = validation_result.get('syntax_check', {}) if validation_result else {}
if syntax_issues.get('should_retry', False):
logger.warning(f"[CGA-{i}] Syntax issues detected in generated code. Attempting retry...")
# 生成修正后的 Prompt包含语法问题提示
retry_prompt = self._generate_syntax_fix_prompt(new_task_code, syntax_issues, prompt)
if retry_prompt:
try:
retry_response, _ = llm.llm_call([{"role": "user", "content": retry_prompt}], self.model)
retry_codes = llm.extract_code(retry_response, "verilog")
if retry_codes:
new_task_code = retry_codes[0]
ls.save_code(retry_prompt, os.path.join(iter_dir, "retry_prompt.txt"))
ls.save_code(retry_response, os.path.join(iter_dir, "retry_response.txt"))
ls.save_code(new_task_code, os.path.join(iter_dir, "generated_scenario_retry.v"))
enhanced_tb = injector.inject(new_task_code, iter_idx=i)
logger.info(f"[CGA-{i}] Retry code generated successfully")
except Exception as e:
logger.warning(f"[CGA-{i}] Retry failed: {e}")
self._prepare_dut(iter_dir)
ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
# [新增] 编译失败时的错误反馈机制
if not success:
compile_error = self._get_compile_error(iter_dir)
if compile_error:
logger.error(f"[CGA-{i}] Verilator compilation failed:")
logger.error(compile_error[:500]) # 截断过长的错误信息
# 尝试让 LLM 修正编译错误(最多 1 次重试)
if not hasattr(self, '_compile_retry_count'):
self._compile_retry_count = {}
self._compile_retry_count[i] = self._compile_retry_count.get(i, 0)
if self._compile_retry_count[i] < 1:
logger.info(f"[CGA-{i}] Asking LLM to fix compilation errors...")
fix_prompt = self._generate_compile_fix_prompt(compile_error, new_task_code)
try:
fix_response, _ = llm.llm_call([{"role": "user", "content": fix_prompt}], self.model)
fix_codes = llm.extract_code(fix_response, "verilog")
if fix_codes:
fixed_code = fix_codes[0]
ls.save_code(fix_prompt, os.path.join(iter_dir, "compile_fix_prompt.txt"))
ls.save_code(fix_response, os.path.join(iter_dir, "compile_fix_response.txt"))
ls.save_code(fixed_code, os.path.join(iter_dir, "generated_scenario_compile_fix.v"))
enhanced_tb = injector.inject(fixed_code, iter_idx=i)
ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
# 再次尝试编译
success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
if success:
logger.info(f"[CGA-{i}] Compilation fixed! Score: {new_score:.2f}%")
new_task_code = fixed_code
except Exception as e:
logger.warning(f"[CGA-{i}] Compile fix attempt failed: {e}")
self._compile_retry_count[i] += 1
coverage_snapshot = self._extract_coverage_snapshot(new_annotated_path) if success else {
"covered_lines": set(),
"covered_functions": set(),
"coverable_lines": set(),
}
current_covered_lines = set(coverage_snapshot["covered_lines"])
current_covered_functions = set(coverage_snapshot["covered_functions"])
newly_covered_lines = current_covered_lines - self.best_covered_lines
newly_covered_functions = current_covered_functions - self.best_covered_functions
# === [新增] 记录生成结果到能量分配器 ===
coverage_delta = new_score - self.best_score if success else 0.0
current_target_name = None
target_hit = False
if self.energy_allocator and self.energy_allocator.current_target:
current_target_name = self.energy_allocator.current_target.function_point
if current_covered_functions:
target_hit = current_target_name in current_covered_functions
else:
target_hit = success and new_score > self.best_score
generation_success = success and target_hit
if self.energy_allocator:
self.energy_allocator.record_generation(
success=generation_success,
coverage_delta=coverage_delta,
energy_cost=1.0
)
extra_completed_functions = set(newly_covered_functions)
if generation_success and current_target_name:
extra_completed_functions.discard(current_target_name)
if extra_completed_functions:
self.energy_allocator.mark_targets_completed(sorted(extra_completed_functions))
# =========================================
# === [新增] 记录测试用例到多样性历史 ===
if self.diversity_injector:
# 提取已知信号
known_signals = []
if self.semantic_result:
known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
self.diversity_injector.record_test(
code=new_task_code,
target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
coverage_score=new_score,
success=generation_success,
iteration=i,
known_signals=known_signals
)
# =======================================
# === [新增] Layer 3: 质量评估 ===
if self.quality_evaluator:
# 评估测试用例质量
eval_result = self.quality_evaluator.evaluate_test_case(
code=new_task_code,
covered_lines=newly_covered_lines,
covered_functions=sorted(newly_covered_functions),
test_id=f"iter_{i}",
iteration=i
)
# 记录多样性得分
diversity_score = eval_result.get('diversity', {}).get('overall_score', 0)
logger.info(f" Quality Evaluation: diversity={diversity_score:.2f}")
# 检查是否应该接受该测试用例
should_accept, reason = self.quality_evaluator.should_accept(eval_result)
if not should_accept:
logger.warning(f" Quality check failed: {reason}")
# =====================================
if success and new_score > self.best_score:
improvement = new_score - self.best_score
logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
self.best_score = new_score
self.best_tb = enhanced_tb
last_annotated_file = new_annotated_path
self.best_covered_lines = current_covered_lines
self.best_covered_functions = current_covered_functions
elif success and new_score == self.best_score:
logger.info(f"Coverage unchanged. Keeping previous.")
else:
logger.warning(f"Regression or Failure. Discarding changes.")
if self.best_score >= self.target_coverage:
logger.success("Target coverage reached!")
break
logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
# === [新增] 生成能量分配报告 ===
if self.energy_allocator:
energy_report = self.energy_allocator.generate_report()
ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
# =================================
# === [新增] 生成多样性报告并保存历史 ===
if self.diversity_injector:
diversity_report = self.diversity_injector.generate_diversity_report()
ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
# 保存测试历史
self.diversity_injector.history.save()
# ======================================
# === [新增] Layer 3: 生成质量评估报告 ===
if self.quality_evaluator:
quality_report = self.quality_evaluator.generate_report()
ls.save_code(quality_report, os.path.join(work_dir, "quality_evaluation_report.txt"))
logger.info(f"Quality evaluation report saved to {work_dir}/quality_evaluation_report.txt")
# 输出语义覆盖率摘要
coverage_result = self.quality_evaluator.semantic_coverage.calculate_coverage()
logger.info(f"Semantic Coverage: {coverage_result.semantic_coverage:.2%}")
# ===========================================
# [修改] 返回元组 (代码, 分数)
return self.best_tb, self.best_score