Files

1750 lines
78 KiB
Python
Raw Permalink Normal View History

2026-05-22 10:02:42 +08:00
# # #第四版
# # """
# # Description : Coverage-Guided Agent (CGA) Main Controller
# # - Integrated with Layer 0: Semantic Analysis
# # - Integrated with Layer 1: Diversity Constraint Injection
# # - Integrated with Layer 4: Energy Allocation
# # Author : CorrectBench Integration
# # """
# # import os
# # import sys
# # import shutil
# # import LLM_call as llm
# # import loader_saver as ls
# # from loader_saver import autologger as logger
# # from utils.verilator_call import verilator_run_coverage
# # from autoline.cga_utils import CoverageParser, TBInjector
# # # [新增] 导入语义分析层
# # from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
# # # [新增] 导入能量分配层
# # from autoline.energy_allocator import EnergyAllocator, EnergyState
# # # [新增] 导入多样性约束注入器
# # from autoline.diversity_injector import DiversityInjector
# # # [新增] 导入测试历史管理器
# # from autoline.test_history import TestHistoryManager
# # class TaskTBCGA:
# # def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config):
# # self.task_dir = task_dir
# # self.task_id = task_id
# # self.header = header
# # self.DUT_code = DUT_code
# # self.TB_code = TB_code
# # self.config = config
# # self.max_iter = 5
# # self.target_coverage = 95.0
# # self.model = config.gpt.model
# # self.best_tb = TB_code
# # self.best_score = 0.0
# # # [新增] 能量分配器
# # self.energy_allocator: EnergyAllocator = None
# # # [新增] 多样性约束注入器
# # self.diversity_injector: DiversityInjector = None
# # # [新增辅助函数] 从父目录拷贝 DUT
# # def _prepare_dut(self, target_dir):
# # source_dut = os.path.join(self.task_dir, "DUT.v")
# # target_dut = os.path.join(target_dir, "DUT.v")
# # # 优先拷贝现有的文件
# # if os.path.exists(source_dut):
# # shutil.copy(source_dut, target_dut)
# # else:
# # # 只有当文件由于某种原因被删除了,才降级使用内存中的 code
# # ls.save_code(self.DUT_code, target_dut)
# # def run(self):
# # logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
# # # 1. 确保工作目录存在 (saves/任务名/5_CGA)
# # work_dir = os.path.join(self.task_dir, "5_CGA")
# # if os.path.exists(work_dir):
# # shutil.rmtree(work_dir)
# # os.makedirs(work_dir, exist_ok=True)
# # # === [新增] Step 0: 语义分析 ===
# # logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
# # self.semantic_result = None
# # try:
# # semantic_analyzer = SemanticAnalyzer(self.DUT_code)
# # self.semantic_result = semantic_analyzer.analyze()
# # # 记录分析结果摘要
# # fp_count = len(self.semantic_result.get('function_points', []))
# # fsm_info = semantic_analyzer.get_fsm_info()
# # if fsm_info:
# # logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
# # f"({len(fsm_info.get('states', []))} states)")
# # logger.info(f" Total function points identified: {fp_count}")
# # # 保存语义分析报告
# # semantic_report = semantic_analyzer.generate_prompt_context()
# # ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
# # # === [新增] Step 0.1: 初始化能量分配器 ===
# # if self.semantic_result.get('function_points'):
# # self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
# # energy_init_result = self.energy_allocator.initialize(
# # self.semantic_result['function_points']
# # )
# # logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
# # # === [新增] Step 0.2: 初始化多样性约束注入器 ===
# # history_file = os.path.join(work_dir, "test_history.json")
# # # 创建 TestHistoryManager 并传递 history_file
# # history_manager = TestHistoryManager(history_file=history_file)
# # self.diversity_injector = DiversityInjector(history_manager=history_manager)
# # logger.info(f" Diversity injector initialized with history file: {history_file}")
# # except Exception as e:
# # logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
# # # ================================
# # current_tb = self.TB_code
# # last_annotated_file = None
# # # --- Baseline ---
# # logger.info(f"--- CGA Iter 0 (Baseline) ---")
# # iter0_dir = os.path.join(work_dir, "iter_0")
# # os.makedirs(iter0_dir, exist_ok=True)
# # self._prepare_dut(iter0_dir)
# # ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
# # success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
# # self.best_score = score
# # self.best_tb = current_tb
# # last_annotated_file = annotated_path
# # logger.info(f"Baseline Coverage: {score:.2f}%")
# # if score >= self.target_coverage:
# # logger.success(f"Target reached at baseline!")
# # # [修改] 返回元组 (代码, 分数)
# # return self.best_tb, self.best_score
# # # --- Loop ---
# # for i in range(1, self.max_iter + 1):
# # logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
# # # === [新增] 能量检查:是否还有活跃目标 ===
# # if self.energy_allocator:
# # current_target = self.energy_allocator.select_next_target()
# # if not current_target:
# # logger.info("No more active targets with remaining energy. Stopping.")
# # break
# # logger.info(f"Target: {current_target}")
# # # =========================================
# # if not last_annotated_file: break
# # # [修改] 传递语义分析结果、能量分配器、多样性注入器给 CoverageParser
# # parser = CoverageParser(
# # last_annotated_file,
# # tb_code=self.best_tb,
# # semantic_result=self.semantic_result,
# # energy_allocator=self.energy_allocator,
# # diversity_injector=self.diversity_injector # [新增]
# # )
# # prompt = parser.generate_prompt(self.best_score)
# # if not prompt:
# # logger.info("No reachable missing blocks found. Stopping.")
# # break
# # logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
# # messages = [{"role": "user", "content": prompt}]
# # try:
# # response, _ = llm.llm_call(messages, self.model)
# # codes = llm.extract_code(response, "verilog")
# # new_task_code = codes[0] if codes else ""
# # if not new_task_code:
# # # [新增] 记录失败
# # if self.energy_allocator:
# # self.energy_allocator.record_generation(
# # success=False,
# # coverage_delta=0.0,
# # energy_cost=1.0
# # )
# # continue
# # except Exception as e:
# # logger.error(f"LLM Call failed: {e}")
# # # [新增] 记录失败
# # if self.energy_allocator:
# # self.energy_allocator.record_generation(
# # success=False,
# # coverage_delta=0.0,
# # energy_cost=1.0
# # )
# # break
# # injector = TBInjector(self.best_tb)
# # enhanced_tb = injector.inject(new_task_code, iter_idx=i)
# # iter_dir = os.path.join(work_dir, f"iter_{i}")
# # os.makedirs(iter_dir, exist_ok=True)
# # self._prepare_dut(iter_dir)
# # ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
# # success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
# # # === [新增] 记录生成结果到能量分配器 ===
# # coverage_delta = new_score - self.best_score if success else 0.0
# # generation_success = success and new_score > self.best_score
# # if self.energy_allocator:
# # self.energy_allocator.record_generation(
# # success=generation_success,
# # coverage_delta=coverage_delta,
# # energy_cost=1.0
# # )
# # # =========================================
# # # === [新增] 记录测试用例到多样性历史 ===
# # if self.diversity_injector:
# # # 提取已知信号
# # known_signals = []
# # if self.semantic_result:
# # known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
# # self.diversity_injector.record_test(
# # code=new_task_code,
# # target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
# # coverage_score=new_score,
# # success=generation_success,
# # iteration=i,
# # known_signals=known_signals
# # )
# # # =======================================
# # if success and new_score > self.best_score:
# # improvement = new_score - self.best_score
# # logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
# # self.best_score = new_score
# # self.best_tb = enhanced_tb
# # last_annotated_file = new_annotated_path
# # elif success and new_score == self.best_score:
# # logger.info(f"Coverage unchanged. Keeping previous.")
# # else:
# # logger.warning(f"Regression or Failure. Discarding changes.")
# # if self.best_score >= self.target_coverage:
# # logger.success("Target coverage reached!")
# # break
# # logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
# # # === [新增] 生成能量分配报告 ===
# # if self.energy_allocator:
# # energy_report = self.energy_allocator.generate_report()
# # ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
# # logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
# # # =================================
# # # === [新增] 生成多样性报告并保存历史 ===
# # if self.diversity_injector:
# # diversity_report = self.diversity_injector.generate_diversity_report()
# # ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
# # logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
# # # 保存测试历史
# # self.diversity_injector.history.save()
# # # ======================================
# # # [修改] 返回元组 (代码, 分数)
# # return self.best_tb, self.best_score
# #终版
# """
# Description : Coverage-Guided Agent (CGA) Main Controller
# - Integrated with Layer 0: Semantic Analysis
# - Integrated with Layer 1: Diversity Constraint Injection
# - Integrated with Layer 3: Quality Evaluation
# - Integrated with Layer 4: Energy Allocation
# Author : CorrectBench Integration
# """
# import os
# import sys
# import shutil
# import LLM_call as llm
# import loader_saver as ls
# from loader_saver import autologger as logger
# from utils.verilator_call import verilator_run_coverage
# from autoline.cga_utils import CoverageParser, TBInjector
# # [新增] 导入语义分析层
# from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
# # [新增] 导入能量分配层
# from autoline.energy_allocator import EnergyAllocator, EnergyState
# # [新增] 导入多样性约束注入器
# from autoline.diversity_injector import DiversityInjector
# # [新增] 导入测试历史管理器
# from autoline.test_history import TestHistoryManager
# # [新增] 导入质量评估层
# from autoline.quality_evaluator import QualityEvaluator, DiversityScore, SemanticCoverageResult
# class TaskTBCGA:
# def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config):
# self.task_dir = task_dir
# self.task_id = task_id
# self.header = header
# self.DUT_code = DUT_code
# self.TB_code = TB_code
# self.config = config
# self.max_iter = 5
# self.target_coverage = 95.0
# self.model = config.gpt.model
# self.best_tb = TB_code
# self.best_score = 0.0
# # [新增] 能量分配器
# self.energy_allocator: EnergyAllocator = None
# # [新增] 多样性约束注入器
# self.diversity_injector: DiversityInjector = None
# # [新增] 质量评估器
# self.quality_evaluator: QualityEvaluator = None
# # [新增辅助函数] 从父目录拷贝 DUT
# def _prepare_dut(self, target_dir):
# source_dut = os.path.join(self.task_dir, "DUT.v")
# target_dut = os.path.join(target_dir, "DUT.v")
# # 优先拷贝现有的文件
# if os.path.exists(source_dut):
# shutil.copy(source_dut, target_dut)
# else:
# # 只有当文件由于某种原因被删除了,才降级使用内存中的 code
# ls.save_code(self.DUT_code, target_dut)
# def _generate_exploration_prompt(self, iteration: int) -> str:
# """
# 生成探索性测试 Prompt
# 当找不到明确的 missing blocks 但覆盖率仍未达标时,
# 生成一个探索性 Prompt 来尝试发现新的测试路径。
# Args:
# iteration: 当前迭代次数
# Returns:
# 探索性测试 Prompt如果无法生成则返回 None
# """
# # 从语义分析结果获取 FSM 和功能点信息
# fsm_info = ""
# if self.semantic_result:
# fsm_data = self.semantic_result.get('fsm', {})
# if fsm_data:
# states = fsm_data.get('states', [])
# state_var = fsm_data.get('state_variable', 'state')
# fsm_info = f"""
# [FSM INFORMATION]
# - State variable: {state_var}
# - Known states: {', '.join(states) if states else 'unknown'}
# The DUT appears to be a Finite State Machine. To improve coverage:
# 1. Try to visit each state by driving inputs that trigger state transitions
# 2. For each state, try different input combinations
# 3. Consider edge cases: reset transitions, timeout conditions, error states
# """
# # 从能量分配器获取目标功能点
# target_info = ""
# if self.energy_allocator and self.energy_allocator.current_target:
# target = self.energy_allocator.current_target
# target_info = f"""
# [CURRENT TARGET]
# Focus on: {target.function_point}
# Remaining energy: {target.remaining}
# """
# # 从多样性注入器获取已尝试的测试
# diversity_hints = ""
# if self.diversity_injector:
# history = self.diversity_injector.history
# # if history and len(history.history) > 0:
# # recent_tests = history.history[-5:] if len(history.history) > 5 else history.history
# if history and hasattr(history, 'records') and len(history.records) > 0:
# recent_tests = history.records[-5:] if len(history.records) > 5 else history.records
# diversity_hints = f"""
# [RECENTLY TRIED APPROACHES - AVOID REPETITION]
# Recent test patterns tried:
# """
# # for i, test in enumerate(recent_tests):
# # diversity_hints += f"- Iter {test.get('iteration', i)}: target={test.get('target_function', 'unknown')}\n"
# for i, test in enumerate(recent_tests):
# # TestRecord 是 dataclass使用属性访问
# target = getattr(test, 'target_function', 'unknown') if hasattr(test, 'target_function') else 'unknown'
# iteration = getattr(test, 'iteration', i) if hasattr(test, 'iteration') else i
# diversity_hints += f"- Iter {iteration}: target={target}\n"
# prompt = f"""
# [EXPLORATION MODE - ITERATION {iteration}]
# Current coverage is {self.best_score:.2f}%, but no specific uncovered code blocks were identified.
# This may happen when:
# 1. Coverage data is incomplete or filtered
# 2. Branch/condition coverage needs improvement (not just line coverage)
# 3. State transitions in FSM are not fully exercised
# {fsm_info}
# {target_info}
# {diversity_hints}
# [YOUR TASK]
# Write an EXPLORATORY test scenario that:
# 1. Covers different input combinations than previous tests
# 2. Explores different FSM state transitions
# 3. Tests edge cases and boundary conditions
# 4. Varies timing and sequence of inputs
# [OUTPUT FORMAT]
# Return ONLY Verilog test scenario code (no task wrapper).
# Use the signal names from the testbench.
# ```verilog
# // Your exploratory test code here
# ```
# """
# return prompt
# def run(self):
# logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
# # 1. 确保工作目录存在 (saves/任务名/5_CGA)
# work_dir = os.path.join(self.task_dir, "5_CGA")
# if os.path.exists(work_dir):
# shutil.rmtree(work_dir)
# os.makedirs(work_dir, exist_ok=True)
# # === [新增] Step 0: 语义分析 ===
# logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
# self.semantic_result = None
# try:
# semantic_analyzer = SemanticAnalyzer(self.DUT_code)
# self.semantic_result = semantic_analyzer.analyze()
# # 记录分析结果摘要
# fp_count = len(self.semantic_result.get('function_points', []))
# fsm_info = semantic_analyzer.get_fsm_info()
# if fsm_info:
# logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
# f"({len(fsm_info.get('states', []))} states)")
# logger.info(f" Total function points identified: {fp_count}")
# # 保存语义分析报告
# semantic_report = semantic_analyzer.generate_prompt_context()
# ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
# # === [新增] Step 0.1: 初始化能量分配器 ===
# if self.semantic_result.get('function_points'):
# self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
# energy_init_result = self.energy_allocator.initialize(
# self.semantic_result['function_points']
# )
# logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
# # === [新增] Step 0.2: 初始化多样性约束注入器 ===
# history_file = os.path.join(work_dir, "test_history.json")
# # 创建 TestHistoryManager 并传递 history_file
# history_manager = TestHistoryManager(history_file=history_file)
# self.diversity_injector = DiversityInjector(history_manager=history_manager)
# logger.info(f" Diversity injector initialized with history file: {history_file}")
# # === [新增] Step 0.3: 初始化质量评估器 ===
# if self.semantic_result.get('function_points'):
# self.quality_evaluator = QualityEvaluator(
# function_points=self.semantic_result['function_points']
# )
# logger.info(f" Quality evaluator initialized")
# except Exception as e:
# logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
# # ================================
# current_tb = self.TB_code
# last_annotated_file = None
# # --- Baseline ---
# logger.info(f"--- CGA Iter 0 (Baseline) ---")
# iter0_dir = os.path.join(work_dir, "iter_0")
# os.makedirs(iter0_dir, exist_ok=True)
# self._prepare_dut(iter0_dir)
# ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
# success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
# self.best_score = score
# self.best_tb = current_tb
# last_annotated_file = annotated_path
# logger.info(f"Baseline Coverage: {score:.2f}%")
# if score >= self.target_coverage:
# logger.success(f"Target reached at baseline!")
# # [修改] 返回元组 (代码, 分数)
# return self.best_tb, self.best_score
# # --- Loop ---
# for i in range(1, self.max_iter + 1):
# logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
# # === [新增] 能量检查:是否还有活跃目标 ===
# if self.energy_allocator:
# current_target = self.energy_allocator.select_next_target()
# if not current_target:
# logger.info("No more active targets with remaining energy. Stopping.")
# break
# logger.info(f"Target: {current_target}")
# # =========================================
# if not last_annotated_file: break
# # [修改] 传递语义分析结果、能量分配器、多样性注入器给 CoverageParser
# parser = CoverageParser(
# last_annotated_file,
# tb_code=self.best_tb,
# semantic_result=self.semantic_result,
# energy_allocator=self.energy_allocator,
# diversity_injector=self.diversity_injector # [新增]
# )
# prompt = parser.generate_prompt(self.best_score)
# # if not prompt:
# # logger.info("No reachable missing blocks found. Stopping.")
# # break
# if not prompt:
# if self.best_score >= self.target_coverage:
# break # 达标才停止
# else:
# # 未达标,尝试探索性测试
# prompt = self._generate_exploration_prompt(i)
# logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
# messages = [{"role": "user", "content": prompt}]
# try:
# response, _ = llm.llm_call(messages, self.model)
# codes = llm.extract_code(response, "verilog")
# new_task_code = codes[0] if codes else ""
# if not new_task_code:
# # [新增] 记录失败
# if self.energy_allocator:
# self.energy_allocator.record_generation(
# success=False,
# coverage_delta=0.0,
# energy_cost=1.0
# )
# continue
# except Exception as e:
# logger.error(f"LLM Call failed: {e}")
# # [新增] 记录失败
# if self.energy_allocator:
# self.energy_allocator.record_generation(
# success=False,
# coverage_delta=0.0,
# energy_cost=1.0
# )
# break
# injector = TBInjector(self.best_tb)
# enhanced_tb = injector.inject(new_task_code, iter_idx=i)
# iter_dir = os.path.join(work_dir, f"iter_{i}")
# os.makedirs(iter_dir, exist_ok=True)
# self._prepare_dut(iter_dir)
# ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
# success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
# # === [新增] 记录生成结果到能量分配器 ===
# coverage_delta = new_score - self.best_score if success else 0.0
# generation_success = success and new_score > self.best_score
# if self.energy_allocator:
# self.energy_allocator.record_generation(
# success=generation_success,
# coverage_delta=coverage_delta,
# energy_cost=1.0
# )
# # =========================================
# # === [新增] 记录测试用例到多样性历史 ===
# if self.diversity_injector:
# # 提取已知信号
# known_signals = []
# if self.semantic_result:
# known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
# self.diversity_injector.record_test(
# code=new_task_code,
# target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
# coverage_score=new_score,
# success=generation_success,
# iteration=i,
# known_signals=known_signals
# )
# # =======================================
# # === [新增] Layer 3: 质量评估 ===
# if self.quality_evaluator:
# # 评估测试用例质量
# eval_result = self.quality_evaluator.evaluate_test_case(
# code=new_task_code,
# covered_lines=set(), # 如果有具体覆盖行信息可传入
# covered_functions=[], # 如果有覆盖功能点信息可传入
# test_id=f"iter_{i}",
# iteration=i
# )
# # 记录多样性得分
# diversity_score = eval_result.get('diversity', {}).get('overall_score', 0)
# logger.info(f" Quality Evaluation: diversity={diversity_score:.2f}")
# # 检查是否应该接受该测试用例
# should_accept, reason = self.quality_evaluator.should_accept(eval_result)
# if not should_accept:
# logger.warning(f" Quality check failed: {reason}")
# # =====================================
# if success and new_score > self.best_score:
# improvement = new_score - self.best_score
# logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
# self.best_score = new_score
# self.best_tb = enhanced_tb
# last_annotated_file = new_annotated_path
# elif success and new_score == self.best_score:
# logger.info(f"Coverage unchanged. Keeping previous.")
# else:
# logger.warning(f"Regression or Failure. Discarding changes.")
# if self.best_score >= self.target_coverage:
# logger.success("Target coverage reached!")
# break
# logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
# # === [新增] 生成能量分配报告 ===
# if self.energy_allocator:
# energy_report = self.energy_allocator.generate_report()
# ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
# logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
# # =================================
# # === [新增] 生成多样性报告并保存历史 ===
# if self.diversity_injector:
# diversity_report = self.diversity_injector.generate_diversity_report()
# ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
# logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
# # 保存测试历史
# self.diversity_injector.history.save()
# # ======================================
# # === [新增] Layer 3: 生成质量评估报告 ===
# if self.quality_evaluator:
# quality_report = self.quality_evaluator.generate_report()
# ls.save_code(quality_report, os.path.join(work_dir, "quality_evaluation_report.txt"))
# logger.info(f"Quality evaluation report saved to {work_dir}/quality_evaluation_report.txt")
# # 输出语义覆盖率摘要
# coverage_result = self.quality_evaluator.semantic_coverage.calculate_coverage()
# logger.info(f"Semantic Coverage: {coverage_result.semantic_coverage:.2%}")
# # ===========================================
# # [修改] 返回元组 (代码, 分数)
# return self.best_tb, self.best_score
"""
Description : Coverage-Guided Agent (CGA) Main Controller
- Integrated with Layer 0: Semantic Analysis
- Integrated with Layer 1: Diversity Constraint Injection
- Integrated with Layer 3: Quality Evaluation
- Integrated with Layer 4: Energy Allocation
Author : CorrectBench Integration
"""
import os
import re
import sys
import shutil
import LLM_call as llm
import loader_saver as ls
from loader_saver import autologger as logger
from utils.verilator_call import verilator_run_coverage
from autoline.cga_utils import CoverageParser, TBInjector
# [新增] 导入语义分析层
from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
# [新增] 导入能量分配层
from autoline.energy_allocator import EnergyAllocator, EnergyState
# [新增] 导入多样性约束注入器
from autoline.diversity_injector import DiversityInjector
# [新增] 导入测试历史管理器
from autoline.test_history import TestHistoryManager
# [新增] 导入质量评估层
from autoline.quality_evaluator import QualityEvaluator, DiversityScore, SemanticCoverageResult
class TaskTBCGA:
def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config, work_subdir="CGA", max_iter=None):
self.task_dir = task_dir
self.task_id = task_id
self.header = header
self.DUT_code = DUT_code
self.TB_code = TB_code
self.config = config
self.work_subdir = work_subdir
self.max_iter = config.autoline.cga.max_iter if max_iter is None else max_iter
self.target_coverage = config.autoline.cga.target_coverage
self.model = config.gpt.model
self.best_tb = TB_code
self.best_score = 0.0
self.best_covered_lines = set()
self.best_covered_functions = set()
# [新增] 能量分配器
self.energy_allocator: EnergyAllocator = None
# [新增] 多样性约束注入器
self.diversity_injector: DiversityInjector = None
# [新增] 质量评估器
self.quality_evaluator: QualityEvaluator = None
# [新增辅助函数] 从父目录拷贝 DUT
def _prepare_dut(self, target_dir):
source_dut = os.path.join(self.task_dir, "DUT.v")
target_dut = os.path.join(target_dir, "DUT.v")
# 优先拷贝现有的文件
if os.path.exists(source_dut):
shutil.copy(source_dut, target_dut)
else:
# 只有当文件由于某种原因被删除了,才降级使用内存中的 code
ls.save_code(self.DUT_code, target_dut)
def _extract_coverage_snapshot(self, annotated_path):
"""
Verilator annotated DUT 中提取当前已覆盖行和已覆盖功能点
"""
snapshot = {
"covered_lines": set(),
"covered_functions": set(),
"coverable_lines": set(),
}
if not annotated_path or not os.path.exists(annotated_path):
return snapshot
pct_pattern = re.compile(r"^%(\d+)\s+(.*)$")
tilde_pattern = re.compile(r"^~(\d+)\s+(.*)$")
caret_pattern = re.compile(r"^\^(\d+)\s+(.*)$")
plain_pattern = re.compile(r"^\s*(\d+)\s+(.*)$")
decl_pattern = re.compile(r"^\s*(input|output|inout|wire|reg|logic|parameter|localparam|assign)\b")
with open(annotated_path, "r", encoding="utf-8", errors="ignore") as f:
for line_no, raw_line in enumerate(f, start=1):
stripped = raw_line.strip()
if not stripped:
continue
# Skip the verilator coverage comment line at the start of annotated files
# This line causes offset between original DUT line numbers and annotated line numbers
if line_no == 1 and "// verilator_coverage annotation" in stripped:
continue
count = None
code_part = None
is_caret = False
match = pct_pattern.match(stripped)
if match:
count = int(match.group(1))
code_part = match.group(2).strip()
else:
match = tilde_pattern.match(stripped)
if match:
count = int(match.group(1))
code_part = match.group(2).strip()
else:
match = caret_pattern.match(stripped)
if match:
is_caret = True
code_part = match.group(2).strip()
else:
match = plain_pattern.match(stripped)
if match:
count = int(match.group(1))
code_part = match.group(2).strip()
if code_part is None:
continue
if "//" in code_part:
code_part = code_part.split("//", 1)[0].strip()
if not code_part:
continue
if decl_pattern.match(code_part):
continue
if code_part in {"begin", "end", "else", "endmodule", "endcase", ");", "default:"}:
continue
if not any(ch.isalnum() for ch in code_part):
continue
snapshot["coverable_lines"].add(line_no)
if (count is not None) and (count > 0) and not is_caret:
snapshot["covered_lines"].add(line_no)
snapshot["covered_functions"] = self._map_lines_to_function_points(snapshot["covered_lines"])
return snapshot
def _map_lines_to_function_points(self, covered_lines):
"""
用功能点 location 与已覆盖行做交集推断当前已命中的功能点
对于 location=(0,0) FPException/Protocol使用启发式方法
如果模块有任何覆盖率认为这些FP可能被执行
"""
matched = set()
if not self.semantic_result:
return matched
# Check if we have any coverage at all (for heuristic matching of location=(0,0) FPs)
has_any_coverage = len(covered_lines) > 0
for fp in self.semantic_result.get("function_points", []):
location = fp.get("location", {})
start_line = location.get("start_line", 0)
end_line = location.get("end_line", 0)
fp_name = fp.get("name", "")
fp_type = fp.get("type", "")
# Handle FPs with invalid location (Exception/Protocol types)
if (start_line <= 0) or (end_line <= 0):
# Heuristic: if the module has any coverage, assume Exception/Protocol FPs might be exercised
if has_any_coverage and fp_type in ("exception", "protocol"):
matched.add(fp_name)
continue
if any(start_line <= line_no <= end_line for line_no in covered_lines):
matched.add(fp_name)
matched.discard("")
return matched
def _generate_exploration_prompt(self, iteration: int) -> str:
"""
生成探索性测试 Prompt
当找不到明确的 missing blocks 但覆盖率仍未达标时
生成一个探索性 Prompt 来尝试发现新的测试路径
Args:
iteration: 当前迭代次数
Returns:
探索性测试 Prompt如果无法生成则返回 None
"""
# 从语义分析结果获取 FSM 和功能点信息
fsm_info = ""
if self.semantic_result:
fsm_data = self.semantic_result.get('fsm', {})
if fsm_data:
states = fsm_data.get('states', [])
state_var = fsm_data.get('state_variable', 'state')
fsm_info = f"""
[FSM INFORMATION]
- State variable: {state_var}
- Known states: {', '.join(states) if states else 'unknown'}
The DUT appears to be a Finite State Machine. To improve coverage:
1. Try to visit each state by driving inputs that trigger state transitions
2. For each state, try different input combinations
3. Consider edge cases: reset transitions, timeout conditions, error states
"""
# 从能量分配器获取目标功能点
target_info = ""
if self.energy_allocator and self.energy_allocator.current_target:
target = self.energy_allocator.current_target
target_info = f"""
[CURRENT TARGET]
Focus on: {target.function_point}
Remaining energy: {target.remaining}
"""
# 从多样性注入器获取已尝试的测试
diversity_hints = ""
if self.diversity_injector:
history = self.diversity_injector.history
# [修复] TestHistoryManager 使用 records 属性,不是 history
if history and hasattr(history, 'records') and len(history.records) > 0:
recent_tests = history.records[-5:] if len(history.records) > 5 else history.records
diversity_hints = f"""
[RECENTLY TRIED APPROACHES - AVOID REPETITION]
Recent test patterns tried:
"""
for i, test in enumerate(recent_tests):
# TestRecord 是 dataclass使用属性访问
target = getattr(test, 'target_function', 'unknown') if hasattr(test, 'target_function') else test.get('target_function', 'unknown') if isinstance(test, dict) else 'unknown'
iteration = getattr(test, 'iteration', i) if hasattr(test, 'iteration') else test.get('iteration', i) if isinstance(test, dict) else i
diversity_hints += f"- Iter {iteration}: target={target}\n"
prompt = f"""
[EXPLORATION MODE - ITERATION {iteration}]
Current coverage is {self.best_score:.2f}%, but no specific uncovered code blocks were identified.
This may happen when:
1. Coverage data is incomplete or filtered
2. Branch/condition coverage needs improvement (not just line coverage)
3. State transitions in FSM are not fully exercised
{fsm_info}
{target_info}
{diversity_hints}
[YOUR TASK]
Write an EXPLORATORY test scenario that:
1. Covers different input combinations than previous tests
2. Explores different FSM state transitions
3. Tests edge cases and boundary conditions
4. Varies timing and sequence of inputs
[OUTPUT FORMAT]
Return ONLY Verilog test scenario code (no task wrapper).
Use the signal names from the testbench.
```verilog
// Your exploratory test code here
```
"""
return prompt
def _analyze_unreachable_branches(self, annotated_file):
"""
分析当前覆盖率情况下是否还存在不可达分支
CGA 达到目标覆盖率时调用此方法判断是否应该停止
- 如果存在不可达分支由于 RTL 结构设计停止并报告
- 如果所有未覆盖都是可覆盖但未测试的继续优化
Args:
annotated_file: str, annotated coverage 文件路径
Returns:
dict: {
"has_unreachable": bool, # 是否存在不可达分支
"count": int, # 不可达分支数量
"details": list, # 详细信息列表
"reason": str # 原因说明
}
"""
if not annotated_file or not os.path.exists(annotated_file):
return {"has_unreachable": False, "count": 0, "details": [], "reason": "No annotated file found"}
try:
with open(annotated_file, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
except Exception as e:
logger.warning(f"Failed to read annotated file: {e}")
return {"has_unreachable": False, "count": 0, "details": [], "reason": f"Read error: {e}"}
# 分析未覆盖行
unreachable_details = []
zero_count_lines = []
# patterns from CoverageParser
line_pattern = re.compile(r'^%(\d+)\s+(.*)$')
tilde_pattern = re.compile(r'^~(\d+)\s+(.*)$')
caret_pattern = re.compile(r'^\^(\d+)\s+(.*)$')
plain_pattern = re.compile(r'^\s*(\d+)\s+(.*)$')
decl_pattern = re.compile(r'^\s*(input|output|inout|wire|reg|logic|parameter|localparam|assign)\b')
for i, line in enumerate(lines):
line = line.strip()
if not line:
continue
count = -1
clean_code = line
is_tilde = False
is_caret = False
match_line = line_pattern.match(line)
match_tilde = tilde_pattern.match(line)
match_caret = caret_pattern.match(line)
match_plain = plain_pattern.match(line)
if match_line:
count = int(match_line.group(1))
clean_code = match_line.group(2).strip()
elif match_tilde:
count = int(match_tilde.group(1))
clean_code = match_tilde.group(2).strip()
is_tilde = True
elif match_caret:
count = int(match_caret.group(1))
clean_code = match_caret.group(2).strip()
is_caret = True
elif match_plain:
count = int(match_plain.group(1))
clean_code = match_plain.group(2).strip()
if "//" in clean_code:
clean_code = clean_code.split("//")[0].strip()
is_hard_noise = (decl_pattern.match(clean_code) or clean_code == "endmodule")
# soft_noise 不包含 "default:" 和 "else",因为这些可能是不可达分支,需要进一步分析
is_soft_noise = (len(clean_code) < 2 or clean_code in ["end", "begin", ");", "endcase"] or
clean_code.startswith("module ") or not any(c.isalnum() for c in clean_code))
# 判断是否零覆盖
is_zero = (count == 0)
# 零覆盖行需要进一步分析,不做 soft_noise 过滤
if is_zero and not is_hard_noise:
zero_count_lines.append({
"line_num": i + 1,
"code": clean_code[:80],
"is_tilde": is_tilde,
"is_caret": is_caret
})
# 分析每个零覆盖行,判断是否不可达
truly_unreachable = []
can_be_reached = []
for item in zero_count_lines:
code = item["code"]
line_num = item["line_num"]
# 不可达的常见模式:
# 1. default: 在 case 语句中,如果所有其他情况都已覆盖
# 2. 死代码(如不可能的条件)
# 3. 异常处理分支
# 可覆盖但未覆盖的常见模式:
# 1. 特定的状态转换分支
# 2. 边界条件检查
# 3. 错误处理路径
# 简单启发式判断
is_unreachable = False
reason = ""
# 检查是否是 default: 分支
if "default:" in code:
# default 分支在 case 完全覆盖时确实不可达
is_unreachable = True
reason = "default branch in fully-covered case statement"
# 检查是否是 endcase 后面的代码
elif "endcase" in code:
is_unreachable = True
reason = "code after endcase"
# 检查是否是 endmodule 附近
elif "endmodule" in code:
is_unreachable = True
reason = "near endmodule"
# 检查是否是纯声明
elif any(kw in code for kw in ["input", "output", "wire", "reg", "parameter"]):
is_unreachable = True
reason = "declaration statement"
# else 分支在 if 链完全覆盖时可能不可达
elif code.strip() == "else":
is_unreachable = True
reason = "else branch in fully-covered if statement"
else:
# 无法确定,归类为"可覆盖但未覆盖"
reason = "potentially coverable but not tested"
if is_unreachable:
truly_unreachable.append({
"line_num": line_num,
"code": code,
"reason": reason
})
else:
can_be_reached.append({
"line_num": line_num,
"code": code,
"reason": reason
})
# 汇总结果
has_unreachable = len(truly_unreachable) > 0
details = []
if truly_unreachable:
details.append(f"Truly unreachable by RTL design ({len(truly_unreachable)} lines):")
for item in truly_unreachable[:10]: # 最多显示10个
details.append(f" Line {item['line_num']}: {item['code']} - {item['reason']}")
if len(truly_unreachable) > 10:
details.append(f" ... and {len(truly_unreachable) - 10} more")
if can_be_reached:
details.append(f"Potentially coverable but not tested ({len(can_be_reached)} lines):")
for item in can_be_reached[:5]: # 最多显示5个
details.append(f" Line {item['line_num']}: {item['code']} - {item['reason']}")
if len(can_be_reached) > 5:
details.append(f" ... and {len(can_be_reached) - 5} more")
logger.info(f"Unreachable analysis: {len(truly_unreachable)} truly unreachable, {len(can_be_reached)} potentially coverable")
return {
"has_unreachable": has_unreachable,
"count": len(truly_unreachable),
"details": details,
"reason": "Found truly unreachable branches" if has_unreachable else "All uncovered are potentially coverable",
"truly_unreachable": truly_unreachable,
"can_be_reached": can_be_reached
}
def _generate_syntax_fix_prompt(self, original_code: str, syntax_issues: dict, original_prompt: str) -> str:
"""
生成语法修正 Prompt LLM 修复检测到的语法问题
Args:
original_code: 原始生成的代码
syntax_issues: 语法检查结果
original_prompt: 原始 Prompt
Returns:
修正 Prompt
"""
issues_text = []
for issue in syntax_issues.get('width_mismatch', []):
issues_text.append(f"- {issue['message']}")
if 'suggestion' in issue:
issues_text.append(f" Suggestion: {issue['suggestion']}")
for issue in syntax_issues.get('logic_issues', []):
issues_text.append(f"- {issue['message']}")
if 'suggestion' in issue:
issues_text.append(f" Suggestion: {issue['suggestion']}")
for issue in syntax_issues.get('syntax_warnings', []):
if issue['severity'] == 'error':
issues_text.append(f"- ERROR: {issue['message']}")
prompt = f"""
[SYNTAX FIX REQUEST]
The previously generated Verilog test code has the following issues:
{chr(10).join(issues_text)}
[ORIGINAL CODE]
```verilog
{original_code}
```
[YOUR TASK]
Fix the above code to address these issues. Pay special attention to:
1. **Width Mismatch**: When you want to input a bit sequence (e.g., 01111100) to a single-bit signal:
- WRONG: `{{in}} = 8'b01111100;` (truncates to single bit)
- CORRECT: Use a shift register
```verilog
reg [7:0] shift_reg;
shift_reg = 8'b01111100;
for (i = 0; i < 8; i = i + 1) begin
in = shift_reg[7];
shift_reg = shift_reg << 1;
@(posedge clk);
end
```
2. **Single-bit Shift**: Shifting a 1-bit signal has no effect:
- WRONG: `in = in >> 1;` (always results in 0)
- CORRECT: Use a multi-bit shift register as shown above
[OUTPUT FORMAT]
Return ONLY the corrected Verilog test scenario code:
```verilog
// Your corrected test code here
```
"""
return prompt
def _get_compile_error(self, iter_dir: str) -> str:
"""
获取 Verilator 编译错误日志
Args:
iter_dir: 迭代目录
Returns:
错误日志字符串
"""
error_parts = []
# 检查 obj_dir 是否存在
obj_dir = os.path.join(iter_dir, "obj_dir")
if not os.path.exists(obj_dir):
error_parts.append("obj_dir not created - compilation failed early")
# 检查可能的日志文件
log_files = [
os.path.join(iter_dir, "verilator.log"),
os.path.join(iter_dir, "compile.log"),
os.path.join(obj_dir, "Vtestbench.log"),
]
for log_file in log_files:
if os.path.exists(log_file):
try:
with open(log_file, 'r', errors='ignore') as f:
content = f.read()
if content.strip():
error_parts.append(f"=== {os.path.basename(log_file)} ===")
error_parts.append(content[-2000:]) # 最后 2000 字符
except Exception:
pass
# 如果没有找到日志文件,检查目录内容
if not error_parts:
error_parts.append(f"Directory contents of {iter_dir}:")
try:
for item in os.listdir(iter_dir):
error_parts.append(f" {item}")
except Exception:
pass
return '\n'.join(error_parts) if error_parts else "Unknown compilation error"
def _generate_compile_fix_prompt(self, compile_error: str, original_code: str) -> str:
"""
生成编译错误修正 Prompt
Args:
compile_error: 编译错误日志
original_code: 原始代码
Returns:
修正 Prompt
"""
# 截取关键错误信息
error_lines = compile_error.split('\n')
key_errors = []
for line in error_lines:
line = line.strip()
if any(kw in line.lower() for kw in ['error', 'syntax', 'fatal', 'undefined', 'illegal']):
key_errors.append(line)
if len(key_errors) > 10: # 最多 10 条关键错误
break
prompt = f"""
[COMPILATION ERROR FIX REQUEST]
The Verilog test code failed to compile with Verilator. Here are the key errors:
```
{chr(10).join(key_errors) if key_errors else compile_error[:1000]}
```
[ORIGINAL CODE]
```verilog
{original_code[:2000]} // Truncated if too long
```
[COMMON VERILOG ISSUES TO CHECK]
1. **Width mismatch**: Assigning wide values to narrow signals
- Problem: `{{in}} = 8'b01111100;` where `in` is 1-bit
- Fix: Use shift register to input bits one at a time
2. **Undefined signals**: Using signals that are not declared
- Check spelling of signal names against the testbench
3. **Syntax errors**: Missing semicolons, mismatched begin/end
- Check all statements end with semicolon
- Ensure all `begin` have matching `end`
4. **Timescale issues**: Missing timescale directive
- The testbench should have `timescale 1ns / 1ps`
[YOUR TASK]
Generate a CORRECTED version of the test code that will compile successfully.
Focus on fixing the specific errors shown above.
[OUTPUT FORMAT]
Return ONLY the corrected Verilog test scenario code:
```verilog
// Your corrected test code here
```
"""
return prompt
def run(self):
logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
# 1. 确保工作目录存在 (saves/任务名/5_CGA)
work_dir = os.path.join(self.task_dir, self.work_subdir)
if os.path.exists(work_dir):
shutil.rmtree(work_dir)
os.makedirs(work_dir, exist_ok=True)
# === [新增] Step 0: 语义分析 ===
logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
self.semantic_result = None
try:
semantic_analyzer = SemanticAnalyzer(self.DUT_code)
self.semantic_result = semantic_analyzer.analyze()
# 记录分析结果摘要
fp_count = len(self.semantic_result.get('function_points', []))
fsm_info = semantic_analyzer.get_fsm_info()
if fsm_info:
logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
f"({len(fsm_info.get('states', []))} states)")
logger.info(f" Total function points identified: {fp_count}")
# 保存语义分析报告
semantic_report = semantic_analyzer.generate_prompt_context()
ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
# === [新增] Step 0.1: 初始化能量分配器 ===
if self.semantic_result.get('function_points'):
self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
energy_init_result = self.energy_allocator.initialize(
self.semantic_result['function_points']
)
logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
# === [新增] Step 0.2: 初始化多样性约束注入器 ===
history_file = os.path.join(work_dir, "test_history.json")
# 创建 TestHistoryManager 并传递 history_file
history_manager = TestHistoryManager(history_file=history_file)
self.diversity_injector = DiversityInjector(history_manager=history_manager)
logger.info(f" Diversity injector initialized with history file: {history_file}")
# === [新增] Step 0.3: 初始化质量评估器 ===
if self.semantic_result.get('function_points'):
self.quality_evaluator = QualityEvaluator(
function_points=self.semantic_result['function_points']
)
logger.info(f" Quality evaluator initialized")
except Exception as e:
logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
# ================================
current_tb = self.TB_code
last_annotated_file = None
# --- Baseline ---
logger.info(f"--- CGA Iter 0 (Baseline) ---")
iter0_dir = os.path.join(work_dir, "iter_0")
os.makedirs(iter0_dir, exist_ok=True)
self._prepare_dut(iter0_dir)
ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
self.best_score = score
self.best_tb = current_tb
last_annotated_file = annotated_path
baseline_snapshot = self._extract_coverage_snapshot(annotated_path)
self.best_covered_lines = set(baseline_snapshot["covered_lines"])
self.best_covered_functions = set(baseline_snapshot["covered_functions"])
# NOTE: Do NOT mark targets as completed here.
# This allows CGA to continue iterating even when semantic coverage is 100%,
# as long as statement coverage < target_coverage.
# The mark_targets_completed will be called after each successful improvement.
if self.quality_evaluator and self.best_covered_functions:
self.quality_evaluator.semantic_coverage.update_coverage(
covered_lines=self.best_covered_lines,
covered_functions=sorted(self.best_covered_functions),
test_id="iter_0",
iteration=0
)
logger.info(f"Baseline Coverage: {score:.2f}%")
if score >= self.target_coverage:
# Baseline 达到目标,需要检查是否有不可达分支
unreachable_info = self._analyze_unreachable_branches(last_annotated_file)
if unreachable_info["has_unreachable"]:
truly_unreachable = unreachable_info.get("truly_unreachable", [])
logger.warning(f"Baseline coverage reached target, but found {unreachable_info['count']} unreachable branches:")
for item in truly_unreachable:
logger.warning(f" Line {item['line_num']}: {item['code']} - {item['reason']}")
logger.info("Stopping: unreachable branches exist by RTL design")
else:
logger.success(f"Target reached at baseline! No unreachable branches found.")
# [修改] 返回元组 (代码, 分数)
return self.best_tb, self.best_score
# --- Loop ---
for i in range(1, self.max_iter + 1):
logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
iter_dir = os.path.join(work_dir, f"iter_{i}")
os.makedirs(iter_dir, exist_ok=True)
# === [修改] 能量检查:即使没有能量,只要覆盖率未达标就继续 ===
current_target = None
if self.energy_allocator:
current_target = self.energy_allocator.select_next_target()
if not current_target:
# 即使没有目标能量了,如果语句覆盖率未达标,仍继续探索
if self.best_score < self.target_coverage:
logger.info("No more energy targets, but coverage < target. Continuing with exploration...")
prompt = self._generate_exploration_prompt(i)
if not prompt:
logger.warning("Cannot generate exploration prompt. Stopping.")
break
else:
logger.info("No more active targets with remaining energy. Stopping.")
break
else:
logger.info(f"Target: {current_target}")
# =========================================
if not last_annotated_file: break
# [修改] 传递语义分析结果、能量分配器、多样性注入器、DUT代码给 CoverageParser
parser = CoverageParser(
last_annotated_file,
tb_code=self.best_tb,
semantic_result=self.semantic_result,
energy_allocator=self.energy_allocator,
diversity_injector=self.diversity_injector, # [新增]
dut_code=self.DUT_code # [新增] 传递 DUT 代码以提取信号名
)
prompt = parser.generate_prompt(self.best_score)
# [修改] 改进停止条件:即使找不到 missing_blocks只要覆盖率未达标就继续
if not prompt:
if self.best_score >= self.target_coverage:
# 覆盖率到达目标,但需要检查是否有不可达分支
unreachable_info = self._analyze_unreachable_branches(last_annotated_file)
if unreachable_info["has_unreachable"]:
truly_unreachable = unreachable_info.get("truly_unreachable", [])
logger.warning(f"Coverage reached target, but found {unreachable_info['count']} unreachable branches:")
for item in truly_unreachable:
logger.warning(f" Line {item['line_num']}: {item['code']} - {item['reason']}")
logger.info("Stopping: unreachable branches exist by RTL design")
break
else:
# 没有不可达分支,且已达到目标覆盖率,优化完成
logger.info(f"Coverage reached {self.best_score:.2f}%, no unreachable branches found. Optimization complete.")
break
else:
# 覆盖率未达标但找不到明确的 missing_blocks
# 尝试生成随机探索 Prompt
logger.warning(f"No reachable missing blocks found, but coverage ({self.best_score:.2f}%) < target ({self.target_coverage}%).")
logger.info(f"Attempting random exploration to discover uncovered paths...")
prompt = self._generate_exploration_prompt(i)
if not prompt:
logger.info("Could not generate exploration prompt. Stopping.")
break
ls.save_code(prompt, os.path.join(iter_dir, "prompt.txt"))
logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
messages = [{"role": "user", "content": prompt}]
try:
response, _ = llm.llm_call(messages, self.model)
ls.save_code(response, os.path.join(iter_dir, "llm_response.txt"))
codes = llm.extract_code(response, "verilog")
new_task_code = codes[0] if codes else ""
if not new_task_code:
# [新增] 记录失败
if self.energy_allocator:
self.energy_allocator.record_generation(
success=False,
coverage_delta=0.0,
energy_cost=1.0
)
continue
except Exception as e:
logger.error(f"LLM Call failed: {e}")
# [新增] 记录失败
if self.energy_allocator:
self.energy_allocator.record_generation(
success=False,
coverage_delta=0.0,
energy_cost=1.0
)
break
ls.save_code(new_task_code, os.path.join(iter_dir, "generated_scenario.v"))
injector = TBInjector(self.best_tb)
enhanced_tb = injector.inject(new_task_code, iter_idx=i)
# [新增] 检查语法预检查结果
validation_result = injector.last_validation_result
syntax_issues = validation_result.get('syntax_check', {}) if validation_result else {}
if syntax_issues.get('should_retry', False):
logger.warning(f"[CGA-{i}] Syntax issues detected in generated code. Attempting retry...")
# 生成修正后的 Prompt包含语法问题提示
retry_prompt = self._generate_syntax_fix_prompt(new_task_code, syntax_issues, prompt)
if retry_prompt:
try:
retry_response, _ = llm.llm_call([{"role": "user", "content": retry_prompt}], self.model)
retry_codes = llm.extract_code(retry_response, "verilog")
if retry_codes:
new_task_code = retry_codes[0]
ls.save_code(retry_prompt, os.path.join(iter_dir, "retry_prompt.txt"))
ls.save_code(retry_response, os.path.join(iter_dir, "retry_response.txt"))
ls.save_code(new_task_code, os.path.join(iter_dir, "generated_scenario_retry.v"))
enhanced_tb = injector.inject(new_task_code, iter_idx=i)
logger.info(f"[CGA-{i}] Retry code generated successfully")
except Exception as e:
logger.warning(f"[CGA-{i}] Retry failed: {e}")
self._prepare_dut(iter_dir)
ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
# [新增] 编译失败时的错误反馈机制
if not success:
compile_error = self._get_compile_error(iter_dir)
if compile_error:
logger.error(f"[CGA-{i}] Verilator compilation failed:")
logger.error(compile_error[:500]) # 截断过长的错误信息
# 尝试让 LLM 修正编译错误(最多 1 次重试)
if not hasattr(self, '_compile_retry_count'):
self._compile_retry_count = {}
self._compile_retry_count[i] = self._compile_retry_count.get(i, 0)
if self._compile_retry_count[i] < 1:
logger.info(f"[CGA-{i}] Asking LLM to fix compilation errors...")
fix_prompt = self._generate_compile_fix_prompt(compile_error, new_task_code)
try:
fix_response, _ = llm.llm_call([{"role": "user", "content": fix_prompt}], self.model)
fix_codes = llm.extract_code(fix_response, "verilog")
if fix_codes:
fixed_code = fix_codes[0]
ls.save_code(fix_prompt, os.path.join(iter_dir, "compile_fix_prompt.txt"))
ls.save_code(fix_response, os.path.join(iter_dir, "compile_fix_response.txt"))
ls.save_code(fixed_code, os.path.join(iter_dir, "generated_scenario_compile_fix.v"))
enhanced_tb = injector.inject(fixed_code, iter_idx=i)
ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
# 再次尝试编译
success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
if success:
logger.info(f"[CGA-{i}] Compilation fixed! Score: {new_score:.2f}%")
new_task_code = fixed_code
except Exception as e:
logger.warning(f"[CGA-{i}] Compile fix attempt failed: {e}")
self._compile_retry_count[i] += 1
coverage_snapshot = self._extract_coverage_snapshot(new_annotated_path) if success else {
"covered_lines": set(),
"covered_functions": set(),
"coverable_lines": set(),
}
current_covered_lines = set(coverage_snapshot["covered_lines"])
current_covered_functions = set(coverage_snapshot["covered_functions"])
newly_covered_lines = current_covered_lines - self.best_covered_lines
newly_covered_functions = current_covered_functions - self.best_covered_functions
# === [新增] 记录生成结果到能量分配器 ===
coverage_delta = new_score - self.best_score if success else 0.0
current_target_name = None
target_hit = False
if self.energy_allocator and self.energy_allocator.current_target:
current_target_name = self.energy_allocator.current_target.function_point
if current_covered_functions:
target_hit = current_target_name in current_covered_functions
else:
target_hit = success and new_score > self.best_score
generation_success = success and target_hit
if self.energy_allocator:
self.energy_allocator.record_generation(
success=generation_success,
coverage_delta=coverage_delta,
energy_cost=1.0
)
extra_completed_functions = set(newly_covered_functions)
if generation_success and current_target_name:
extra_completed_functions.discard(current_target_name)
if extra_completed_functions:
self.energy_allocator.mark_targets_completed(sorted(extra_completed_functions))
# =========================================
# === [新增] 记录测试用例到多样性历史 ===
if self.diversity_injector:
# 提取已知信号
known_signals = []
if self.semantic_result:
known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
self.diversity_injector.record_test(
code=new_task_code,
target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
coverage_score=new_score,
success=generation_success,
iteration=i,
known_signals=known_signals
)
# =======================================
# === [新增] Layer 3: 质量评估 ===
if self.quality_evaluator:
# 评估测试用例质量
eval_result = self.quality_evaluator.evaluate_test_case(
code=new_task_code,
covered_lines=newly_covered_lines,
covered_functions=sorted(newly_covered_functions),
test_id=f"iter_{i}",
iteration=i
)
# 记录多样性得分
diversity_score = eval_result.get('diversity', {}).get('overall_score', 0)
logger.info(f" Quality Evaluation: diversity={diversity_score:.2f}")
# 检查是否应该接受该测试用例
should_accept, reason = self.quality_evaluator.should_accept(eval_result)
if not should_accept:
logger.warning(f" Quality check failed: {reason}")
# =====================================
if success and new_score > self.best_score:
improvement = new_score - self.best_score
logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
self.best_score = new_score
self.best_tb = enhanced_tb
last_annotated_file = new_annotated_path
self.best_covered_lines = current_covered_lines
self.best_covered_functions = current_covered_functions
elif success and new_score == self.best_score:
logger.info(f"Coverage unchanged. Keeping previous.")
else:
logger.warning(f"Regression or Failure. Discarding changes.")
if self.best_score >= self.target_coverage:
# 达到目标覆盖率,检查是否有不可达分支
unreachable_info = self._analyze_unreachable_branches(last_annotated_file)
if unreachable_info["has_unreachable"]:
truly_unreachable = unreachable_info.get("truly_unreachable", [])
logger.warning(f"Coverage reached target, but found {unreachable_info['count']} unreachable branches:")
for item in truly_unreachable:
logger.warning(f" Line {item['line_num']}: {item['code']} - {item['reason']}")
logger.info("Stopping: unreachable branches exist by RTL design")
else:
logger.success(f"Target coverage reached! No unreachable branches found.")
break
# === [新增] CGA 结束时检查不可达分支 ===
logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
unreachable_info = self._analyze_unreachable_branches(last_annotated_file)
if unreachable_info["has_unreachable"]:
report_content = "UNREACHABLE BRANCHES REPORT\n"
report_content += "=" * 60 + "\n\n"
# 真正不可达的分支
truly_unreachable = unreachable_info.get("truly_unreachable", [])
if truly_unreachable:
report_content += f"[TRULY UNREACHABLE] ({len(truly_unreachable)} branches)\n"
report_content += "-" * 60 + "\n"
report_content += "These branches are unreachable due to RTL design structure:\n\n"
for item in truly_unreachable:
line_num = item["line_num"]
code = item["code"]
reason = item["reason"]
# 翻译原因说明
reason_desc = {
"default branch in fully-covered case statement": "Case语句所有分支已覆盖default分支永远不会执行",
"code after endcase": "endcase之后的代码不可达",
"near endmodule": "endmodule附近的代码不可达",
"declaration statement": "声明语句不是可执行代码",
"else branch in fully-covered if statement": "if语句所有条件已覆盖else分支永远不会执行"
}.get(reason, reason)
report_content += f" Line {line_num}:\n"
report_content += f" Code: {code}\n"
report_content += f" Reason: {reason_desc}\n"
report_content += f" Analysis: {reason}\n\n"
# 可覆盖但未测试的分支
can_be_reached = unreachable_info.get("can_be_reached", [])
if can_be_reached:
report_content += f"\n[POTENTIALLY COVERABLE] ({len(can_be_reached)} branches)\n"
report_content += "-" * 60 + "\n"
report_content += "These branches are theoretically reachable but not exercised by testbench:\n\n"
for item in can_be_reached:
line_num = item["line_num"]
code = item["code"]
reason = item["reason"]
reason_desc = {
"default branch in fully-covered case statement": "Case语句所有分支已覆盖default分支理论不可达",
"potentially coverable but not tested": "理论上可通过添加特定测试向量覆盖"
}.get(reason, reason)
report_content += f" Line {line_num}:\n"
report_content += f" Code: {code}\n"
report_content += f" Reason: {reason_desc}\n"
report_content += f" Analysis: {reason}\n\n"
report_content += "=" * 60 + "\n"
report_content += f"Summary: {len(truly_unreachable)} truly unreachable, {len(can_be_reached)} potentially coverable\n"
report_content += "=" * 60 + "\n"
report_path = os.path.join(work_dir, "unreachable_branches_report.txt")
ls.save_code(report_content, report_path)
logger.warning(f"Unreachable branches found: {unreachable_info['count']} lines")
logger.info(f"Unreachable branches report saved to: {report_path}")
else:
logger.info("No unreachable branches found in DUT.")
# =========================================
# === [新增] 生成能量分配报告 ===
if self.energy_allocator:
energy_report = self.energy_allocator.generate_report()
ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
# =================================
# === [新增] 生成多样性报告并保存历史 ===
if self.diversity_injector:
diversity_report = self.diversity_injector.generate_diversity_report()
ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
# 保存测试历史
self.diversity_injector.history.save()
# ======================================
# === [新增] Layer 3: 生成质量评估报告 ===
if self.quality_evaluator:
quality_report = self.quality_evaluator.generate_report()
ls.save_code(quality_report, os.path.join(work_dir, "quality_evaluation_report.txt"))
logger.info(f"Quality evaluation report saved to {work_dir}/quality_evaluation_report.txt")
# 输出语义覆盖率摘要
coverage_result = self.quality_evaluator.semantic_coverage.calculate_coverage()
logger.info(f"Semantic Coverage: {coverage_result.semantic_coverage:.2%}")
# ===========================================
# [修改] 返回元组 (代码, 分数)
return self.best_tb, self.best_score