Files
CGA-bench/autoline/TB_cga.py
2026-05-22 10:02:42 +08:00

1750 lines
78 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# # #第四版
# # """
# # Description : Coverage-Guided Agent (CGA) Main Controller
# # - Integrated with Layer 0: Semantic Analysis
# # - Integrated with Layer 1: Diversity Constraint Injection
# # - Integrated with Layer 4: Energy Allocation
# # Author : CorrectBench Integration
# # """
# # import os
# # import sys
# # import shutil
# # import LLM_call as llm
# # import loader_saver as ls
# # from loader_saver import autologger as logger
# # from utils.verilator_call import verilator_run_coverage
# # from autoline.cga_utils import CoverageParser, TBInjector
# # # [新增] 导入语义分析层
# # from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
# # # [新增] 导入能量分配层
# # from autoline.energy_allocator import EnergyAllocator, EnergyState
# # # [新增] 导入多样性约束注入器
# # from autoline.diversity_injector import DiversityInjector
# # # [新增] 导入测试历史管理器
# # from autoline.test_history import TestHistoryManager
# # class TaskTBCGA:
# # def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config):
# # self.task_dir = task_dir
# # self.task_id = task_id
# # self.header = header
# # self.DUT_code = DUT_code
# # self.TB_code = TB_code
# # self.config = config
# # self.max_iter = 5
# # self.target_coverage = 95.0
# # self.model = config.gpt.model
# # self.best_tb = TB_code
# # self.best_score = 0.0
# # # [新增] 能量分配器
# # self.energy_allocator: EnergyAllocator = None
# # # [新增] 多样性约束注入器
# # self.diversity_injector: DiversityInjector = None
# # # [新增辅助函数] 从父目录拷贝 DUT
# # def _prepare_dut(self, target_dir):
# # source_dut = os.path.join(self.task_dir, "DUT.v")
# # target_dut = os.path.join(target_dir, "DUT.v")
# # # 优先拷贝现有的文件
# # if os.path.exists(source_dut):
# # shutil.copy(source_dut, target_dut)
# # else:
# # # 只有当文件由于某种原因被删除了,才降级使用内存中的 code
# # ls.save_code(self.DUT_code, target_dut)
# # def run(self):
# # logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
# # # 1. 确保工作目录存在 (saves/任务名/5_CGA)
# # work_dir = os.path.join(self.task_dir, "5_CGA")
# # if os.path.exists(work_dir):
# # shutil.rmtree(work_dir)
# # os.makedirs(work_dir, exist_ok=True)
# # # === [新增] Step 0: 语义分析 ===
# # logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
# # self.semantic_result = None
# # try:
# # semantic_analyzer = SemanticAnalyzer(self.DUT_code)
# # self.semantic_result = semantic_analyzer.analyze()
# # # 记录分析结果摘要
# # fp_count = len(self.semantic_result.get('function_points', []))
# # fsm_info = semantic_analyzer.get_fsm_info()
# # if fsm_info:
# # logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
# # f"({len(fsm_info.get('states', []))} states)")
# # logger.info(f" Total function points identified: {fp_count}")
# # # 保存语义分析报告
# # semantic_report = semantic_analyzer.generate_prompt_context()
# # ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
# # # === [新增] Step 0.1: 初始化能量分配器 ===
# # if self.semantic_result.get('function_points'):
# # self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
# # energy_init_result = self.energy_allocator.initialize(
# # self.semantic_result['function_points']
# # )
# # logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
# # # === [新增] Step 0.2: 初始化多样性约束注入器 ===
# # history_file = os.path.join(work_dir, "test_history.json")
# # # 创建 TestHistoryManager 并传递 history_file
# # history_manager = TestHistoryManager(history_file=history_file)
# # self.diversity_injector = DiversityInjector(history_manager=history_manager)
# # logger.info(f" Diversity injector initialized with history file: {history_file}")
# # except Exception as e:
# # logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
# # # ================================
# # current_tb = self.TB_code
# # last_annotated_file = None
# # # --- Baseline ---
# # logger.info(f"--- CGA Iter 0 (Baseline) ---")
# # iter0_dir = os.path.join(work_dir, "iter_0")
# # os.makedirs(iter0_dir, exist_ok=True)
# # self._prepare_dut(iter0_dir)
# # ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
# # success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
# # self.best_score = score
# # self.best_tb = current_tb
# # last_annotated_file = annotated_path
# # logger.info(f"Baseline Coverage: {score:.2f}%")
# # if score >= self.target_coverage:
# # logger.success(f"Target reached at baseline!")
# # # [修改] 返回元组 (代码, 分数)
# # return self.best_tb, self.best_score
# # # --- Loop ---
# # for i in range(1, self.max_iter + 1):
# # logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
# # # === [新增] 能量检查:是否还有活跃目标 ===
# # if self.energy_allocator:
# # current_target = self.energy_allocator.select_next_target()
# # if not current_target:
# # logger.info("No more active targets with remaining energy. Stopping.")
# # break
# # logger.info(f"Target: {current_target}")
# # # =========================================
# # if not last_annotated_file: break
# # # [修改] 传递语义分析结果、能量分配器、多样性注入器给 CoverageParser
# # parser = CoverageParser(
# # last_annotated_file,
# # tb_code=self.best_tb,
# # semantic_result=self.semantic_result,
# # energy_allocator=self.energy_allocator,
# # diversity_injector=self.diversity_injector # [新增]
# # )
# # prompt = parser.generate_prompt(self.best_score)
# # if not prompt:
# # logger.info("No reachable missing blocks found. Stopping.")
# # break
# # logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
# # messages = [{"role": "user", "content": prompt}]
# # try:
# # response, _ = llm.llm_call(messages, self.model)
# # codes = llm.extract_code(response, "verilog")
# # new_task_code = codes[0] if codes else ""
# # if not new_task_code:
# # # [新增] 记录失败
# # if self.energy_allocator:
# # self.energy_allocator.record_generation(
# # success=False,
# # coverage_delta=0.0,
# # energy_cost=1.0
# # )
# # continue
# # except Exception as e:
# # logger.error(f"LLM Call failed: {e}")
# # # [新增] 记录失败
# # if self.energy_allocator:
# # self.energy_allocator.record_generation(
# # success=False,
# # coverage_delta=0.0,
# # energy_cost=1.0
# # )
# # break
# # injector = TBInjector(self.best_tb)
# # enhanced_tb = injector.inject(new_task_code, iter_idx=i)
# # iter_dir = os.path.join(work_dir, f"iter_{i}")
# # os.makedirs(iter_dir, exist_ok=True)
# # self._prepare_dut(iter_dir)
# # ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
# # success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
# # # === [新增] 记录生成结果到能量分配器 ===
# # coverage_delta = new_score - self.best_score if success else 0.0
# # generation_success = success and new_score > self.best_score
# # if self.energy_allocator:
# # self.energy_allocator.record_generation(
# # success=generation_success,
# # coverage_delta=coverage_delta,
# # energy_cost=1.0
# # )
# # # =========================================
# # # === [新增] 记录测试用例到多样性历史 ===
# # if self.diversity_injector:
# # # 提取已知信号
# # known_signals = []
# # if self.semantic_result:
# # known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
# # self.diversity_injector.record_test(
# # code=new_task_code,
# # target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
# # coverage_score=new_score,
# # success=generation_success,
# # iteration=i,
# # known_signals=known_signals
# # )
# # # =======================================
# # if success and new_score > self.best_score:
# # improvement = new_score - self.best_score
# # logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
# # self.best_score = new_score
# # self.best_tb = enhanced_tb
# # last_annotated_file = new_annotated_path
# # elif success and new_score == self.best_score:
# # logger.info(f"Coverage unchanged. Keeping previous.")
# # else:
# # logger.warning(f"Regression or Failure. Discarding changes.")
# # if self.best_score >= self.target_coverage:
# # logger.success("Target coverage reached!")
# # break
# # logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
# # # === [新增] 生成能量分配报告 ===
# # if self.energy_allocator:
# # energy_report = self.energy_allocator.generate_report()
# # ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
# # logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
# # # =================================
# # # === [新增] 生成多样性报告并保存历史 ===
# # if self.diversity_injector:
# # diversity_report = self.diversity_injector.generate_diversity_report()
# # ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
# # logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
# # # 保存测试历史
# # self.diversity_injector.history.save()
# # # ======================================
# # # [修改] 返回元组 (代码, 分数)
# # return self.best_tb, self.best_score
# #终版
# """
# Description : Coverage-Guided Agent (CGA) Main Controller
# - Integrated with Layer 0: Semantic Analysis
# - Integrated with Layer 1: Diversity Constraint Injection
# - Integrated with Layer 3: Quality Evaluation
# - Integrated with Layer 4: Energy Allocation
# Author : CorrectBench Integration
# """
# import os
# import sys
# import shutil
# import LLM_call as llm
# import loader_saver as ls
# from loader_saver import autologger as logger
# from utils.verilator_call import verilator_run_coverage
# from autoline.cga_utils import CoverageParser, TBInjector
# # [新增] 导入语义分析层
# from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
# # [新增] 导入能量分配层
# from autoline.energy_allocator import EnergyAllocator, EnergyState
# # [新增] 导入多样性约束注入器
# from autoline.diversity_injector import DiversityInjector
# # [新增] 导入测试历史管理器
# from autoline.test_history import TestHistoryManager
# # [新增] 导入质量评估层
# from autoline.quality_evaluator import QualityEvaluator, DiversityScore, SemanticCoverageResult
# class TaskTBCGA:
# def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config):
# self.task_dir = task_dir
# self.task_id = task_id
# self.header = header
# self.DUT_code = DUT_code
# self.TB_code = TB_code
# self.config = config
# self.max_iter = 5
# self.target_coverage = 95.0
# self.model = config.gpt.model
# self.best_tb = TB_code
# self.best_score = 0.0
# # [新增] 能量分配器
# self.energy_allocator: EnergyAllocator = None
# # [新增] 多样性约束注入器
# self.diversity_injector: DiversityInjector = None
# # [新增] 质量评估器
# self.quality_evaluator: QualityEvaluator = None
# # [新增辅助函数] 从父目录拷贝 DUT
# def _prepare_dut(self, target_dir):
# source_dut = os.path.join(self.task_dir, "DUT.v")
# target_dut = os.path.join(target_dir, "DUT.v")
# # 优先拷贝现有的文件
# if os.path.exists(source_dut):
# shutil.copy(source_dut, target_dut)
# else:
# # 只有当文件由于某种原因被删除了,才降级使用内存中的 code
# ls.save_code(self.DUT_code, target_dut)
# def _generate_exploration_prompt(self, iteration: int) -> str:
# """
# 生成探索性测试 Prompt
# 当找不到明确的 missing blocks 但覆盖率仍未达标时,
# 生成一个探索性 Prompt 来尝试发现新的测试路径。
# Args:
# iteration: 当前迭代次数
# Returns:
# 探索性测试 Prompt如果无法生成则返回 None
# """
# # 从语义分析结果获取 FSM 和功能点信息
# fsm_info = ""
# if self.semantic_result:
# fsm_data = self.semantic_result.get('fsm', {})
# if fsm_data:
# states = fsm_data.get('states', [])
# state_var = fsm_data.get('state_variable', 'state')
# fsm_info = f"""
# [FSM INFORMATION]
# - State variable: {state_var}
# - Known states: {', '.join(states) if states else 'unknown'}
# The DUT appears to be a Finite State Machine. To improve coverage:
# 1. Try to visit each state by driving inputs that trigger state transitions
# 2. For each state, try different input combinations
# 3. Consider edge cases: reset transitions, timeout conditions, error states
# """
# # 从能量分配器获取目标功能点
# target_info = ""
# if self.energy_allocator and self.energy_allocator.current_target:
# target = self.energy_allocator.current_target
# target_info = f"""
# [CURRENT TARGET]
# Focus on: {target.function_point}
# Remaining energy: {target.remaining}
# """
# # 从多样性注入器获取已尝试的测试
# diversity_hints = ""
# if self.diversity_injector:
# history = self.diversity_injector.history
# # if history and len(history.history) > 0:
# # recent_tests = history.history[-5:] if len(history.history) > 5 else history.history
# if history and hasattr(history, 'records') and len(history.records) > 0:
# recent_tests = history.records[-5:] if len(history.records) > 5 else history.records
# diversity_hints = f"""
# [RECENTLY TRIED APPROACHES - AVOID REPETITION]
# Recent test patterns tried:
# """
# # for i, test in enumerate(recent_tests):
# # diversity_hints += f"- Iter {test.get('iteration', i)}: target={test.get('target_function', 'unknown')}\n"
# for i, test in enumerate(recent_tests):
# # TestRecord 是 dataclass使用属性访问
# target = getattr(test, 'target_function', 'unknown') if hasattr(test, 'target_function') else 'unknown'
# iteration = getattr(test, 'iteration', i) if hasattr(test, 'iteration') else i
# diversity_hints += f"- Iter {iteration}: target={target}\n"
# prompt = f"""
# [EXPLORATION MODE - ITERATION {iteration}]
# Current coverage is {self.best_score:.2f}%, but no specific uncovered code blocks were identified.
# This may happen when:
# 1. Coverage data is incomplete or filtered
# 2. Branch/condition coverage needs improvement (not just line coverage)
# 3. State transitions in FSM are not fully exercised
# {fsm_info}
# {target_info}
# {diversity_hints}
# [YOUR TASK]
# Write an EXPLORATORY test scenario that:
# 1. Covers different input combinations than previous tests
# 2. Explores different FSM state transitions
# 3. Tests edge cases and boundary conditions
# 4. Varies timing and sequence of inputs
# [OUTPUT FORMAT]
# Return ONLY Verilog test scenario code (no task wrapper).
# Use the signal names from the testbench.
# ```verilog
# // Your exploratory test code here
# ```
# """
# return prompt
# def run(self):
# logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
# # 1. 确保工作目录存在 (saves/任务名/5_CGA)
# work_dir = os.path.join(self.task_dir, "5_CGA")
# if os.path.exists(work_dir):
# shutil.rmtree(work_dir)
# os.makedirs(work_dir, exist_ok=True)
# # === [新增] Step 0: 语义分析 ===
# logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
# self.semantic_result = None
# try:
# semantic_analyzer = SemanticAnalyzer(self.DUT_code)
# self.semantic_result = semantic_analyzer.analyze()
# # 记录分析结果摘要
# fp_count = len(self.semantic_result.get('function_points', []))
# fsm_info = semantic_analyzer.get_fsm_info()
# if fsm_info:
# logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
# f"({len(fsm_info.get('states', []))} states)")
# logger.info(f" Total function points identified: {fp_count}")
# # 保存语义分析报告
# semantic_report = semantic_analyzer.generate_prompt_context()
# ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
# # === [新增] Step 0.1: 初始化能量分配器 ===
# if self.semantic_result.get('function_points'):
# self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
# energy_init_result = self.energy_allocator.initialize(
# self.semantic_result['function_points']
# )
# logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
# # === [新增] Step 0.2: 初始化多样性约束注入器 ===
# history_file = os.path.join(work_dir, "test_history.json")
# # 创建 TestHistoryManager 并传递 history_file
# history_manager = TestHistoryManager(history_file=history_file)
# self.diversity_injector = DiversityInjector(history_manager=history_manager)
# logger.info(f" Diversity injector initialized with history file: {history_file}")
# # === [新增] Step 0.3: 初始化质量评估器 ===
# if self.semantic_result.get('function_points'):
# self.quality_evaluator = QualityEvaluator(
# function_points=self.semantic_result['function_points']
# )
# logger.info(f" Quality evaluator initialized")
# except Exception as e:
# logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
# # ================================
# current_tb = self.TB_code
# last_annotated_file = None
# # --- Baseline ---
# logger.info(f"--- CGA Iter 0 (Baseline) ---")
# iter0_dir = os.path.join(work_dir, "iter_0")
# os.makedirs(iter0_dir, exist_ok=True)
# self._prepare_dut(iter0_dir)
# ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
# success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
# self.best_score = score
# self.best_tb = current_tb
# last_annotated_file = annotated_path
# logger.info(f"Baseline Coverage: {score:.2f}%")
# if score >= self.target_coverage:
# logger.success(f"Target reached at baseline!")
# # [修改] 返回元组 (代码, 分数)
# return self.best_tb, self.best_score
# # --- Loop ---
# for i in range(1, self.max_iter + 1):
# logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
# # === [新增] 能量检查:是否还有活跃目标 ===
# if self.energy_allocator:
# current_target = self.energy_allocator.select_next_target()
# if not current_target:
# logger.info("No more active targets with remaining energy. Stopping.")
# break
# logger.info(f"Target: {current_target}")
# # =========================================
# if not last_annotated_file: break
# # [修改] 传递语义分析结果、能量分配器、多样性注入器给 CoverageParser
# parser = CoverageParser(
# last_annotated_file,
# tb_code=self.best_tb,
# semantic_result=self.semantic_result,
# energy_allocator=self.energy_allocator,
# diversity_injector=self.diversity_injector # [新增]
# )
# prompt = parser.generate_prompt(self.best_score)
# # if not prompt:
# # logger.info("No reachable missing blocks found. Stopping.")
# # break
# if not prompt:
# if self.best_score >= self.target_coverage:
# break # 达标才停止
# else:
# # 未达标,尝试探索性测试
# prompt = self._generate_exploration_prompt(i)
# logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
# messages = [{"role": "user", "content": prompt}]
# try:
# response, _ = llm.llm_call(messages, self.model)
# codes = llm.extract_code(response, "verilog")
# new_task_code = codes[0] if codes else ""
# if not new_task_code:
# # [新增] 记录失败
# if self.energy_allocator:
# self.energy_allocator.record_generation(
# success=False,
# coverage_delta=0.0,
# energy_cost=1.0
# )
# continue
# except Exception as e:
# logger.error(f"LLM Call failed: {e}")
# # [新增] 记录失败
# if self.energy_allocator:
# self.energy_allocator.record_generation(
# success=False,
# coverage_delta=0.0,
# energy_cost=1.0
# )
# break
# injector = TBInjector(self.best_tb)
# enhanced_tb = injector.inject(new_task_code, iter_idx=i)
# iter_dir = os.path.join(work_dir, f"iter_{i}")
# os.makedirs(iter_dir, exist_ok=True)
# self._prepare_dut(iter_dir)
# ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
# success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
# # === [新增] 记录生成结果到能量分配器 ===
# coverage_delta = new_score - self.best_score if success else 0.0
# generation_success = success and new_score > self.best_score
# if self.energy_allocator:
# self.energy_allocator.record_generation(
# success=generation_success,
# coverage_delta=coverage_delta,
# energy_cost=1.0
# )
# # =========================================
# # === [新增] 记录测试用例到多样性历史 ===
# if self.diversity_injector:
# # 提取已知信号
# known_signals = []
# if self.semantic_result:
# known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
# self.diversity_injector.record_test(
# code=new_task_code,
# target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
# coverage_score=new_score,
# success=generation_success,
# iteration=i,
# known_signals=known_signals
# )
# # =======================================
# # === [新增] Layer 3: 质量评估 ===
# if self.quality_evaluator:
# # 评估测试用例质量
# eval_result = self.quality_evaluator.evaluate_test_case(
# code=new_task_code,
# covered_lines=set(), # 如果有具体覆盖行信息可传入
# covered_functions=[], # 如果有覆盖功能点信息可传入
# test_id=f"iter_{i}",
# iteration=i
# )
# # 记录多样性得分
# diversity_score = eval_result.get('diversity', {}).get('overall_score', 0)
# logger.info(f" Quality Evaluation: diversity={diversity_score:.2f}")
# # 检查是否应该接受该测试用例
# should_accept, reason = self.quality_evaluator.should_accept(eval_result)
# if not should_accept:
# logger.warning(f" Quality check failed: {reason}")
# # =====================================
# if success and new_score > self.best_score:
# improvement = new_score - self.best_score
# logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
# self.best_score = new_score
# self.best_tb = enhanced_tb
# last_annotated_file = new_annotated_path
# elif success and new_score == self.best_score:
# logger.info(f"Coverage unchanged. Keeping previous.")
# else:
# logger.warning(f"Regression or Failure. Discarding changes.")
# if self.best_score >= self.target_coverage:
# logger.success("Target coverage reached!")
# break
# logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
# # === [新增] 生成能量分配报告 ===
# if self.energy_allocator:
# energy_report = self.energy_allocator.generate_report()
# ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
# logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
# # =================================
# # === [新增] 生成多样性报告并保存历史 ===
# if self.diversity_injector:
# diversity_report = self.diversity_injector.generate_diversity_report()
# ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
# logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
# # 保存测试历史
# self.diversity_injector.history.save()
# # ======================================
# # === [新增] Layer 3: 生成质量评估报告 ===
# if self.quality_evaluator:
# quality_report = self.quality_evaluator.generate_report()
# ls.save_code(quality_report, os.path.join(work_dir, "quality_evaluation_report.txt"))
# logger.info(f"Quality evaluation report saved to {work_dir}/quality_evaluation_report.txt")
# # 输出语义覆盖率摘要
# coverage_result = self.quality_evaluator.semantic_coverage.calculate_coverage()
# logger.info(f"Semantic Coverage: {coverage_result.semantic_coverage:.2%}")
# # ===========================================
# # [修改] 返回元组 (代码, 分数)
# return self.best_tb, self.best_score
"""
Description : Coverage-Guided Agent (CGA) Main Controller
- Integrated with Layer 0: Semantic Analysis
- Integrated with Layer 1: Diversity Constraint Injection
- Integrated with Layer 3: Quality Evaluation
- Integrated with Layer 4: Energy Allocation
Author : CorrectBench Integration
"""
import os
import re
import sys
import shutil
import LLM_call as llm
import loader_saver as ls
from loader_saver import autologger as logger
from utils.verilator_call import verilator_run_coverage
from autoline.cga_utils import CoverageParser, TBInjector
# [新增] 导入语义分析层
from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
# [新增] 导入能量分配层
from autoline.energy_allocator import EnergyAllocator, EnergyState
# [新增] 导入多样性约束注入器
from autoline.diversity_injector import DiversityInjector
# [新增] 导入测试历史管理器
from autoline.test_history import TestHistoryManager
# [新增] 导入质量评估层
from autoline.quality_evaluator import QualityEvaluator, DiversityScore, SemanticCoverageResult
class TaskTBCGA:
def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config, work_subdir="CGA", max_iter=None):
self.task_dir = task_dir
self.task_id = task_id
self.header = header
self.DUT_code = DUT_code
self.TB_code = TB_code
self.config = config
self.work_subdir = work_subdir
self.max_iter = config.autoline.cga.max_iter if max_iter is None else max_iter
self.target_coverage = config.autoline.cga.target_coverage
self.model = config.gpt.model
self.best_tb = TB_code
self.best_score = 0.0
self.best_covered_lines = set()
self.best_covered_functions = set()
# [新增] 能量分配器
self.energy_allocator: EnergyAllocator = None
# [新增] 多样性约束注入器
self.diversity_injector: DiversityInjector = None
# [新增] 质量评估器
self.quality_evaluator: QualityEvaluator = None
# [新增辅助函数] 从父目录拷贝 DUT
def _prepare_dut(self, target_dir):
source_dut = os.path.join(self.task_dir, "DUT.v")
target_dut = os.path.join(target_dir, "DUT.v")
# 优先拷贝现有的文件
if os.path.exists(source_dut):
shutil.copy(source_dut, target_dut)
else:
# 只有当文件由于某种原因被删除了,才降级使用内存中的 code
ls.save_code(self.DUT_code, target_dut)
def _extract_coverage_snapshot(self, annotated_path):
"""
从 Verilator annotated DUT 中提取当前已覆盖行和已覆盖功能点。
"""
snapshot = {
"covered_lines": set(),
"covered_functions": set(),
"coverable_lines": set(),
}
if not annotated_path or not os.path.exists(annotated_path):
return snapshot
pct_pattern = re.compile(r"^%(\d+)\s+(.*)$")
tilde_pattern = re.compile(r"^~(\d+)\s+(.*)$")
caret_pattern = re.compile(r"^\^(\d+)\s+(.*)$")
plain_pattern = re.compile(r"^\s*(\d+)\s+(.*)$")
decl_pattern = re.compile(r"^\s*(input|output|inout|wire|reg|logic|parameter|localparam|assign)\b")
with open(annotated_path, "r", encoding="utf-8", errors="ignore") as f:
for line_no, raw_line in enumerate(f, start=1):
stripped = raw_line.strip()
if not stripped:
continue
# Skip the verilator coverage comment line at the start of annotated files
# This line causes offset between original DUT line numbers and annotated line numbers
if line_no == 1 and "// verilator_coverage annotation" in stripped:
continue
count = None
code_part = None
is_caret = False
match = pct_pattern.match(stripped)
if match:
count = int(match.group(1))
code_part = match.group(2).strip()
else:
match = tilde_pattern.match(stripped)
if match:
count = int(match.group(1))
code_part = match.group(2).strip()
else:
match = caret_pattern.match(stripped)
if match:
is_caret = True
code_part = match.group(2).strip()
else:
match = plain_pattern.match(stripped)
if match:
count = int(match.group(1))
code_part = match.group(2).strip()
if code_part is None:
continue
if "//" in code_part:
code_part = code_part.split("//", 1)[0].strip()
if not code_part:
continue
if decl_pattern.match(code_part):
continue
if code_part in {"begin", "end", "else", "endmodule", "endcase", ");", "default:"}:
continue
if not any(ch.isalnum() for ch in code_part):
continue
snapshot["coverable_lines"].add(line_no)
if (count is not None) and (count > 0) and not is_caret:
snapshot["covered_lines"].add(line_no)
snapshot["covered_functions"] = self._map_lines_to_function_points(snapshot["covered_lines"])
return snapshot
def _map_lines_to_function_points(self, covered_lines):
"""
用功能点 location 与已覆盖行做交集,推断当前已命中的功能点。
对于 location=(0,0) 的 FPException/Protocol使用启发式方法
如果模块有任何覆盖率认为这些FP可能被执行。
"""
matched = set()
if not self.semantic_result:
return matched
# Check if we have any coverage at all (for heuristic matching of location=(0,0) FPs)
has_any_coverage = len(covered_lines) > 0
for fp in self.semantic_result.get("function_points", []):
location = fp.get("location", {})
start_line = location.get("start_line", 0)
end_line = location.get("end_line", 0)
fp_name = fp.get("name", "")
fp_type = fp.get("type", "")
# Handle FPs with invalid location (Exception/Protocol types)
if (start_line <= 0) or (end_line <= 0):
# Heuristic: if the module has any coverage, assume Exception/Protocol FPs might be exercised
if has_any_coverage and fp_type in ("exception", "protocol"):
matched.add(fp_name)
continue
if any(start_line <= line_no <= end_line for line_no in covered_lines):
matched.add(fp_name)
matched.discard("")
return matched
def _generate_exploration_prompt(self, iteration: int) -> str:
"""
生成探索性测试 Prompt
当找不到明确的 missing blocks 但覆盖率仍未达标时,
生成一个探索性 Prompt 来尝试发现新的测试路径。
Args:
iteration: 当前迭代次数
Returns:
探索性测试 Prompt如果无法生成则返回 None
"""
# 从语义分析结果获取 FSM 和功能点信息
fsm_info = ""
if self.semantic_result:
fsm_data = self.semantic_result.get('fsm', {})
if fsm_data:
states = fsm_data.get('states', [])
state_var = fsm_data.get('state_variable', 'state')
fsm_info = f"""
[FSM INFORMATION]
- State variable: {state_var}
- Known states: {', '.join(states) if states else 'unknown'}
The DUT appears to be a Finite State Machine. To improve coverage:
1. Try to visit each state by driving inputs that trigger state transitions
2. For each state, try different input combinations
3. Consider edge cases: reset transitions, timeout conditions, error states
"""
# 从能量分配器获取目标功能点
target_info = ""
if self.energy_allocator and self.energy_allocator.current_target:
target = self.energy_allocator.current_target
target_info = f"""
[CURRENT TARGET]
Focus on: {target.function_point}
Remaining energy: {target.remaining}
"""
# 从多样性注入器获取已尝试的测试
diversity_hints = ""
if self.diversity_injector:
history = self.diversity_injector.history
# [修复] TestHistoryManager 使用 records 属性,不是 history
if history and hasattr(history, 'records') and len(history.records) > 0:
recent_tests = history.records[-5:] if len(history.records) > 5 else history.records
diversity_hints = f"""
[RECENTLY TRIED APPROACHES - AVOID REPETITION]
Recent test patterns tried:
"""
for i, test in enumerate(recent_tests):
# TestRecord 是 dataclass使用属性访问
target = getattr(test, 'target_function', 'unknown') if hasattr(test, 'target_function') else test.get('target_function', 'unknown') if isinstance(test, dict) else 'unknown'
iteration = getattr(test, 'iteration', i) if hasattr(test, 'iteration') else test.get('iteration', i) if isinstance(test, dict) else i
diversity_hints += f"- Iter {iteration}: target={target}\n"
prompt = f"""
[EXPLORATION MODE - ITERATION {iteration}]
Current coverage is {self.best_score:.2f}%, but no specific uncovered code blocks were identified.
This may happen when:
1. Coverage data is incomplete or filtered
2. Branch/condition coverage needs improvement (not just line coverage)
3. State transitions in FSM are not fully exercised
{fsm_info}
{target_info}
{diversity_hints}
[YOUR TASK]
Write an EXPLORATORY test scenario that:
1. Covers different input combinations than previous tests
2. Explores different FSM state transitions
3. Tests edge cases and boundary conditions
4. Varies timing and sequence of inputs
[OUTPUT FORMAT]
Return ONLY Verilog test scenario code (no task wrapper).
Use the signal names from the testbench.
```verilog
// Your exploratory test code here
```
"""
return prompt
def _analyze_unreachable_branches(self, annotated_file):
"""
分析当前覆盖率情况下是否还存在不可达分支。
当 CGA 达到目标覆盖率时,调用此方法判断是否应该停止:
- 如果存在不可达分支(由于 RTL 结构设计),停止并报告
- 如果所有未覆盖都是可覆盖但未测试的,继续优化
Args:
annotated_file: str, annotated coverage 文件路径
Returns:
dict: {
"has_unreachable": bool, # 是否存在不可达分支
"count": int, # 不可达分支数量
"details": list, # 详细信息列表
"reason": str # 原因说明
}
"""
if not annotated_file or not os.path.exists(annotated_file):
return {"has_unreachable": False, "count": 0, "details": [], "reason": "No annotated file found"}
try:
with open(annotated_file, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
except Exception as e:
logger.warning(f"Failed to read annotated file: {e}")
return {"has_unreachable": False, "count": 0, "details": [], "reason": f"Read error: {e}"}
# 分析未覆盖行
unreachable_details = []
zero_count_lines = []
# patterns from CoverageParser
line_pattern = re.compile(r'^%(\d+)\s+(.*)$')
tilde_pattern = re.compile(r'^~(\d+)\s+(.*)$')
caret_pattern = re.compile(r'^\^(\d+)\s+(.*)$')
plain_pattern = re.compile(r'^\s*(\d+)\s+(.*)$')
decl_pattern = re.compile(r'^\s*(input|output|inout|wire|reg|logic|parameter|localparam|assign)\b')
for i, line in enumerate(lines):
line = line.strip()
if not line:
continue
count = -1
clean_code = line
is_tilde = False
is_caret = False
match_line = line_pattern.match(line)
match_tilde = tilde_pattern.match(line)
match_caret = caret_pattern.match(line)
match_plain = plain_pattern.match(line)
if match_line:
count = int(match_line.group(1))
clean_code = match_line.group(2).strip()
elif match_tilde:
count = int(match_tilde.group(1))
clean_code = match_tilde.group(2).strip()
is_tilde = True
elif match_caret:
count = int(match_caret.group(1))
clean_code = match_caret.group(2).strip()
is_caret = True
elif match_plain:
count = int(match_plain.group(1))
clean_code = match_plain.group(2).strip()
if "//" in clean_code:
clean_code = clean_code.split("//")[0].strip()
is_hard_noise = (decl_pattern.match(clean_code) or clean_code == "endmodule")
# soft_noise 不包含 "default:" 和 "else",因为这些可能是不可达分支,需要进一步分析
is_soft_noise = (len(clean_code) < 2 or clean_code in ["end", "begin", ");", "endcase"] or
clean_code.startswith("module ") or not any(c.isalnum() for c in clean_code))
# 判断是否零覆盖
is_zero = (count == 0)
# 零覆盖行需要进一步分析,不做 soft_noise 过滤
if is_zero and not is_hard_noise:
zero_count_lines.append({
"line_num": i + 1,
"code": clean_code[:80],
"is_tilde": is_tilde,
"is_caret": is_caret
})
# 分析每个零覆盖行,判断是否不可达
truly_unreachable = []
can_be_reached = []
for item in zero_count_lines:
code = item["code"]
line_num = item["line_num"]
# 不可达的常见模式:
# 1. default: 在 case 语句中,如果所有其他情况都已覆盖
# 2. 死代码(如不可能的条件)
# 3. 异常处理分支
# 可覆盖但未覆盖的常见模式:
# 1. 特定的状态转换分支
# 2. 边界条件检查
# 3. 错误处理路径
# 简单启发式判断
is_unreachable = False
reason = ""
# 检查是否是 default: 分支
if "default:" in code:
# default 分支在 case 完全覆盖时确实不可达
is_unreachable = True
reason = "default branch in fully-covered case statement"
# 检查是否是 endcase 后面的代码
elif "endcase" in code:
is_unreachable = True
reason = "code after endcase"
# 检查是否是 endmodule 附近
elif "endmodule" in code:
is_unreachable = True
reason = "near endmodule"
# 检查是否是纯声明
elif any(kw in code for kw in ["input", "output", "wire", "reg", "parameter"]):
is_unreachable = True
reason = "declaration statement"
# else 分支在 if 链完全覆盖时可能不可达
elif code.strip() == "else":
is_unreachable = True
reason = "else branch in fully-covered if statement"
else:
# 无法确定,归类为"可覆盖但未覆盖"
reason = "potentially coverable but not tested"
if is_unreachable:
truly_unreachable.append({
"line_num": line_num,
"code": code,
"reason": reason
})
else:
can_be_reached.append({
"line_num": line_num,
"code": code,
"reason": reason
})
# 汇总结果
has_unreachable = len(truly_unreachable) > 0
details = []
if truly_unreachable:
details.append(f"Truly unreachable by RTL design ({len(truly_unreachable)} lines):")
for item in truly_unreachable[:10]: # 最多显示10个
details.append(f" Line {item['line_num']}: {item['code']} - {item['reason']}")
if len(truly_unreachable) > 10:
details.append(f" ... and {len(truly_unreachable) - 10} more")
if can_be_reached:
details.append(f"Potentially coverable but not tested ({len(can_be_reached)} lines):")
for item in can_be_reached[:5]: # 最多显示5个
details.append(f" Line {item['line_num']}: {item['code']} - {item['reason']}")
if len(can_be_reached) > 5:
details.append(f" ... and {len(can_be_reached) - 5} more")
logger.info(f"Unreachable analysis: {len(truly_unreachable)} truly unreachable, {len(can_be_reached)} potentially coverable")
return {
"has_unreachable": has_unreachable,
"count": len(truly_unreachable),
"details": details,
"reason": "Found truly unreachable branches" if has_unreachable else "All uncovered are potentially coverable",
"truly_unreachable": truly_unreachable,
"can_be_reached": can_be_reached
}
def _generate_syntax_fix_prompt(self, original_code: str, syntax_issues: dict, original_prompt: str) -> str:
"""
生成语法修正 Prompt让 LLM 修复检测到的语法问题
Args:
original_code: 原始生成的代码
syntax_issues: 语法检查结果
original_prompt: 原始 Prompt
Returns:
修正 Prompt
"""
issues_text = []
for issue in syntax_issues.get('width_mismatch', []):
issues_text.append(f"- {issue['message']}")
if 'suggestion' in issue:
issues_text.append(f" Suggestion: {issue['suggestion']}")
for issue in syntax_issues.get('logic_issues', []):
issues_text.append(f"- {issue['message']}")
if 'suggestion' in issue:
issues_text.append(f" Suggestion: {issue['suggestion']}")
for issue in syntax_issues.get('syntax_warnings', []):
if issue['severity'] == 'error':
issues_text.append(f"- ERROR: {issue['message']}")
prompt = f"""
[SYNTAX FIX REQUEST]
The previously generated Verilog test code has the following issues:
{chr(10).join(issues_text)}
[ORIGINAL CODE]
```verilog
{original_code}
```
[YOUR TASK]
Fix the above code to address these issues. Pay special attention to:
1. **Width Mismatch**: When you want to input a bit sequence (e.g., 01111100) to a single-bit signal:
- WRONG: `{{in}} = 8'b01111100;` (truncates to single bit)
- CORRECT: Use a shift register
```verilog
reg [7:0] shift_reg;
shift_reg = 8'b01111100;
for (i = 0; i < 8; i = i + 1) begin
in = shift_reg[7];
shift_reg = shift_reg << 1;
@(posedge clk);
end
```
2. **Single-bit Shift**: Shifting a 1-bit signal has no effect:
- WRONG: `in = in >> 1;` (always results in 0)
- CORRECT: Use a multi-bit shift register as shown above
[OUTPUT FORMAT]
Return ONLY the corrected Verilog test scenario code:
```verilog
// Your corrected test code here
```
"""
return prompt
def _get_compile_error(self, iter_dir: str) -> str:
"""
获取 Verilator 编译错误日志
Args:
iter_dir: 迭代目录
Returns:
错误日志字符串
"""
error_parts = []
# 检查 obj_dir 是否存在
obj_dir = os.path.join(iter_dir, "obj_dir")
if not os.path.exists(obj_dir):
error_parts.append("obj_dir not created - compilation failed early")
# 检查可能的日志文件
log_files = [
os.path.join(iter_dir, "verilator.log"),
os.path.join(iter_dir, "compile.log"),
os.path.join(obj_dir, "Vtestbench.log"),
]
for log_file in log_files:
if os.path.exists(log_file):
try:
with open(log_file, 'r', errors='ignore') as f:
content = f.read()
if content.strip():
error_parts.append(f"=== {os.path.basename(log_file)} ===")
error_parts.append(content[-2000:]) # 最后 2000 字符
except Exception:
pass
# 如果没有找到日志文件,检查目录内容
if not error_parts:
error_parts.append(f"Directory contents of {iter_dir}:")
try:
for item in os.listdir(iter_dir):
error_parts.append(f" {item}")
except Exception:
pass
return '\n'.join(error_parts) if error_parts else "Unknown compilation error"
def _generate_compile_fix_prompt(self, compile_error: str, original_code: str) -> str:
"""
生成编译错误修正 Prompt
Args:
compile_error: 编译错误日志
original_code: 原始代码
Returns:
修正 Prompt
"""
# 截取关键错误信息
error_lines = compile_error.split('\n')
key_errors = []
for line in error_lines:
line = line.strip()
if any(kw in line.lower() for kw in ['error', 'syntax', 'fatal', 'undefined', 'illegal']):
key_errors.append(line)
if len(key_errors) > 10: # 最多 10 条关键错误
break
prompt = f"""
[COMPILATION ERROR FIX REQUEST]
The Verilog test code failed to compile with Verilator. Here are the key errors:
```
{chr(10).join(key_errors) if key_errors else compile_error[:1000]}
```
[ORIGINAL CODE]
```verilog
{original_code[:2000]} // Truncated if too long
```
[COMMON VERILOG ISSUES TO CHECK]
1. **Width mismatch**: Assigning wide values to narrow signals
- Problem: `{{in}} = 8'b01111100;` where `in` is 1-bit
- Fix: Use shift register to input bits one at a time
2. **Undefined signals**: Using signals that are not declared
- Check spelling of signal names against the testbench
3. **Syntax errors**: Missing semicolons, mismatched begin/end
- Check all statements end with semicolon
- Ensure all `begin` have matching `end`
4. **Timescale issues**: Missing timescale directive
- The testbench should have `timescale 1ns / 1ps`
[YOUR TASK]
Generate a CORRECTED version of the test code that will compile successfully.
Focus on fixing the specific errors shown above.
[OUTPUT FORMAT]
Return ONLY the corrected Verilog test scenario code:
```verilog
// Your corrected test code here
```
"""
return prompt
def run(self):
logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
# 1. 确保工作目录存在 (saves/任务名/5_CGA)
work_dir = os.path.join(self.task_dir, self.work_subdir)
if os.path.exists(work_dir):
shutil.rmtree(work_dir)
os.makedirs(work_dir, exist_ok=True)
# === [新增] Step 0: 语义分析 ===
logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
self.semantic_result = None
try:
semantic_analyzer = SemanticAnalyzer(self.DUT_code)
self.semantic_result = semantic_analyzer.analyze()
# 记录分析结果摘要
fp_count = len(self.semantic_result.get('function_points', []))
fsm_info = semantic_analyzer.get_fsm_info()
if fsm_info:
logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
f"({len(fsm_info.get('states', []))} states)")
logger.info(f" Total function points identified: {fp_count}")
# 保存语义分析报告
semantic_report = semantic_analyzer.generate_prompt_context()
ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
# === [新增] Step 0.1: 初始化能量分配器 ===
if self.semantic_result.get('function_points'):
self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
energy_init_result = self.energy_allocator.initialize(
self.semantic_result['function_points']
)
logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
# === [新增] Step 0.2: 初始化多样性约束注入器 ===
history_file = os.path.join(work_dir, "test_history.json")
# 创建 TestHistoryManager 并传递 history_file
history_manager = TestHistoryManager(history_file=history_file)
self.diversity_injector = DiversityInjector(history_manager=history_manager)
logger.info(f" Diversity injector initialized with history file: {history_file}")
# === [新增] Step 0.3: 初始化质量评估器 ===
if self.semantic_result.get('function_points'):
self.quality_evaluator = QualityEvaluator(
function_points=self.semantic_result['function_points']
)
logger.info(f" Quality evaluator initialized")
except Exception as e:
logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
# ================================
current_tb = self.TB_code
last_annotated_file = None
# --- Baseline ---
logger.info(f"--- CGA Iter 0 (Baseline) ---")
iter0_dir = os.path.join(work_dir, "iter_0")
os.makedirs(iter0_dir, exist_ok=True)
self._prepare_dut(iter0_dir)
ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
self.best_score = score
self.best_tb = current_tb
last_annotated_file = annotated_path
baseline_snapshot = self._extract_coverage_snapshot(annotated_path)
self.best_covered_lines = set(baseline_snapshot["covered_lines"])
self.best_covered_functions = set(baseline_snapshot["covered_functions"])
# NOTE: Do NOT mark targets as completed here.
# This allows CGA to continue iterating even when semantic coverage is 100%,
# as long as statement coverage < target_coverage.
# The mark_targets_completed will be called after each successful improvement.
if self.quality_evaluator and self.best_covered_functions:
self.quality_evaluator.semantic_coverage.update_coverage(
covered_lines=self.best_covered_lines,
covered_functions=sorted(self.best_covered_functions),
test_id="iter_0",
iteration=0
)
logger.info(f"Baseline Coverage: {score:.2f}%")
if score >= self.target_coverage:
# Baseline 达到目标,需要检查是否有不可达分支
unreachable_info = self._analyze_unreachable_branches(last_annotated_file)
if unreachable_info["has_unreachable"]:
truly_unreachable = unreachable_info.get("truly_unreachable", [])
logger.warning(f"Baseline coverage reached target, but found {unreachable_info['count']} unreachable branches:")
for item in truly_unreachable:
logger.warning(f" Line {item['line_num']}: {item['code']} - {item['reason']}")
logger.info("Stopping: unreachable branches exist by RTL design")
else:
logger.success(f"Target reached at baseline! No unreachable branches found.")
# [修改] 返回元组 (代码, 分数)
return self.best_tb, self.best_score
# --- Loop ---
for i in range(1, self.max_iter + 1):
logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
iter_dir = os.path.join(work_dir, f"iter_{i}")
os.makedirs(iter_dir, exist_ok=True)
# === [修改] 能量检查:即使没有能量,只要覆盖率未达标就继续 ===
current_target = None
if self.energy_allocator:
current_target = self.energy_allocator.select_next_target()
if not current_target:
# 即使没有目标能量了,如果语句覆盖率未达标,仍继续探索
if self.best_score < self.target_coverage:
logger.info("No more energy targets, but coverage < target. Continuing with exploration...")
prompt = self._generate_exploration_prompt(i)
if not prompt:
logger.warning("Cannot generate exploration prompt. Stopping.")
break
else:
logger.info("No more active targets with remaining energy. Stopping.")
break
else:
logger.info(f"Target: {current_target}")
# =========================================
if not last_annotated_file: break
# [修改] 传递语义分析结果、能量分配器、多样性注入器、DUT代码给 CoverageParser
parser = CoverageParser(
last_annotated_file,
tb_code=self.best_tb,
semantic_result=self.semantic_result,
energy_allocator=self.energy_allocator,
diversity_injector=self.diversity_injector, # [新增]
dut_code=self.DUT_code # [新增] 传递 DUT 代码以提取信号名
)
prompt = parser.generate_prompt(self.best_score)
# [修改] 改进停止条件:即使找不到 missing_blocks只要覆盖率未达标就继续
if not prompt:
if self.best_score >= self.target_coverage:
# 覆盖率到达目标,但需要检查是否有不可达分支
unreachable_info = self._analyze_unreachable_branches(last_annotated_file)
if unreachable_info["has_unreachable"]:
truly_unreachable = unreachable_info.get("truly_unreachable", [])
logger.warning(f"Coverage reached target, but found {unreachable_info['count']} unreachable branches:")
for item in truly_unreachable:
logger.warning(f" Line {item['line_num']}: {item['code']} - {item['reason']}")
logger.info("Stopping: unreachable branches exist by RTL design")
break
else:
# 没有不可达分支,且已达到目标覆盖率,优化完成
logger.info(f"Coverage reached {self.best_score:.2f}%, no unreachable branches found. Optimization complete.")
break
else:
# 覆盖率未达标但找不到明确的 missing_blocks
# 尝试生成随机探索 Prompt
logger.warning(f"No reachable missing blocks found, but coverage ({self.best_score:.2f}%) < target ({self.target_coverage}%).")
logger.info(f"Attempting random exploration to discover uncovered paths...")
prompt = self._generate_exploration_prompt(i)
if not prompt:
logger.info("Could not generate exploration prompt. Stopping.")
break
ls.save_code(prompt, os.path.join(iter_dir, "prompt.txt"))
logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
messages = [{"role": "user", "content": prompt}]
try:
response, _ = llm.llm_call(messages, self.model)
ls.save_code(response, os.path.join(iter_dir, "llm_response.txt"))
codes = llm.extract_code(response, "verilog")
new_task_code = codes[0] if codes else ""
if not new_task_code:
# [新增] 记录失败
if self.energy_allocator:
self.energy_allocator.record_generation(
success=False,
coverage_delta=0.0,
energy_cost=1.0
)
continue
except Exception as e:
logger.error(f"LLM Call failed: {e}")
# [新增] 记录失败
if self.energy_allocator:
self.energy_allocator.record_generation(
success=False,
coverage_delta=0.0,
energy_cost=1.0
)
break
ls.save_code(new_task_code, os.path.join(iter_dir, "generated_scenario.v"))
injector = TBInjector(self.best_tb)
enhanced_tb = injector.inject(new_task_code, iter_idx=i)
# [新增] 检查语法预检查结果
validation_result = injector.last_validation_result
syntax_issues = validation_result.get('syntax_check', {}) if validation_result else {}
if syntax_issues.get('should_retry', False):
logger.warning(f"[CGA-{i}] Syntax issues detected in generated code. Attempting retry...")
# 生成修正后的 Prompt包含语法问题提示
retry_prompt = self._generate_syntax_fix_prompt(new_task_code, syntax_issues, prompt)
if retry_prompt:
try:
retry_response, _ = llm.llm_call([{"role": "user", "content": retry_prompt}], self.model)
retry_codes = llm.extract_code(retry_response, "verilog")
if retry_codes:
new_task_code = retry_codes[0]
ls.save_code(retry_prompt, os.path.join(iter_dir, "retry_prompt.txt"))
ls.save_code(retry_response, os.path.join(iter_dir, "retry_response.txt"))
ls.save_code(new_task_code, os.path.join(iter_dir, "generated_scenario_retry.v"))
enhanced_tb = injector.inject(new_task_code, iter_idx=i)
logger.info(f"[CGA-{i}] Retry code generated successfully")
except Exception as e:
logger.warning(f"[CGA-{i}] Retry failed: {e}")
self._prepare_dut(iter_dir)
ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
# [新增] 编译失败时的错误反馈机制
if not success:
compile_error = self._get_compile_error(iter_dir)
if compile_error:
logger.error(f"[CGA-{i}] Verilator compilation failed:")
logger.error(compile_error[:500]) # 截断过长的错误信息
# 尝试让 LLM 修正编译错误(最多 1 次重试)
if not hasattr(self, '_compile_retry_count'):
self._compile_retry_count = {}
self._compile_retry_count[i] = self._compile_retry_count.get(i, 0)
if self._compile_retry_count[i] < 1:
logger.info(f"[CGA-{i}] Asking LLM to fix compilation errors...")
fix_prompt = self._generate_compile_fix_prompt(compile_error, new_task_code)
try:
fix_response, _ = llm.llm_call([{"role": "user", "content": fix_prompt}], self.model)
fix_codes = llm.extract_code(fix_response, "verilog")
if fix_codes:
fixed_code = fix_codes[0]
ls.save_code(fix_prompt, os.path.join(iter_dir, "compile_fix_prompt.txt"))
ls.save_code(fix_response, os.path.join(iter_dir, "compile_fix_response.txt"))
ls.save_code(fixed_code, os.path.join(iter_dir, "generated_scenario_compile_fix.v"))
enhanced_tb = injector.inject(fixed_code, iter_idx=i)
ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
# 再次尝试编译
success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
if success:
logger.info(f"[CGA-{i}] Compilation fixed! Score: {new_score:.2f}%")
new_task_code = fixed_code
except Exception as e:
logger.warning(f"[CGA-{i}] Compile fix attempt failed: {e}")
self._compile_retry_count[i] += 1
coverage_snapshot = self._extract_coverage_snapshot(new_annotated_path) if success else {
"covered_lines": set(),
"covered_functions": set(),
"coverable_lines": set(),
}
current_covered_lines = set(coverage_snapshot["covered_lines"])
current_covered_functions = set(coverage_snapshot["covered_functions"])
newly_covered_lines = current_covered_lines - self.best_covered_lines
newly_covered_functions = current_covered_functions - self.best_covered_functions
# === [新增] 记录生成结果到能量分配器 ===
coverage_delta = new_score - self.best_score if success else 0.0
current_target_name = None
target_hit = False
if self.energy_allocator and self.energy_allocator.current_target:
current_target_name = self.energy_allocator.current_target.function_point
if current_covered_functions:
target_hit = current_target_name in current_covered_functions
else:
target_hit = success and new_score > self.best_score
generation_success = success and target_hit
if self.energy_allocator:
self.energy_allocator.record_generation(
success=generation_success,
coverage_delta=coverage_delta,
energy_cost=1.0
)
extra_completed_functions = set(newly_covered_functions)
if generation_success and current_target_name:
extra_completed_functions.discard(current_target_name)
if extra_completed_functions:
self.energy_allocator.mark_targets_completed(sorted(extra_completed_functions))
# =========================================
# === [新增] 记录测试用例到多样性历史 ===
if self.diversity_injector:
# 提取已知信号
known_signals = []
if self.semantic_result:
known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
self.diversity_injector.record_test(
code=new_task_code,
target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
coverage_score=new_score,
success=generation_success,
iteration=i,
known_signals=known_signals
)
# =======================================
# === [新增] Layer 3: 质量评估 ===
if self.quality_evaluator:
# 评估测试用例质量
eval_result = self.quality_evaluator.evaluate_test_case(
code=new_task_code,
covered_lines=newly_covered_lines,
covered_functions=sorted(newly_covered_functions),
test_id=f"iter_{i}",
iteration=i
)
# 记录多样性得分
diversity_score = eval_result.get('diversity', {}).get('overall_score', 0)
logger.info(f" Quality Evaluation: diversity={diversity_score:.2f}")
# 检查是否应该接受该测试用例
should_accept, reason = self.quality_evaluator.should_accept(eval_result)
if not should_accept:
logger.warning(f" Quality check failed: {reason}")
# =====================================
if success and new_score > self.best_score:
improvement = new_score - self.best_score
logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
self.best_score = new_score
self.best_tb = enhanced_tb
last_annotated_file = new_annotated_path
self.best_covered_lines = current_covered_lines
self.best_covered_functions = current_covered_functions
elif success and new_score == self.best_score:
logger.info(f"Coverage unchanged. Keeping previous.")
else:
logger.warning(f"Regression or Failure. Discarding changes.")
if self.best_score >= self.target_coverage:
# 达到目标覆盖率,检查是否有不可达分支
unreachable_info = self._analyze_unreachable_branches(last_annotated_file)
if unreachable_info["has_unreachable"]:
truly_unreachable = unreachable_info.get("truly_unreachable", [])
logger.warning(f"Coverage reached target, but found {unreachable_info['count']} unreachable branches:")
for item in truly_unreachable:
logger.warning(f" Line {item['line_num']}: {item['code']} - {item['reason']}")
logger.info("Stopping: unreachable branches exist by RTL design")
else:
logger.success(f"Target coverage reached! No unreachable branches found.")
break
# === [新增] CGA 结束时检查不可达分支 ===
logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
unreachable_info = self._analyze_unreachable_branches(last_annotated_file)
if unreachable_info["has_unreachable"]:
report_content = "UNREACHABLE BRANCHES REPORT\n"
report_content += "=" * 60 + "\n\n"
# 真正不可达的分支
truly_unreachable = unreachable_info.get("truly_unreachable", [])
if truly_unreachable:
report_content += f"[TRULY UNREACHABLE] ({len(truly_unreachable)} branches)\n"
report_content += "-" * 60 + "\n"
report_content += "These branches are unreachable due to RTL design structure:\n\n"
for item in truly_unreachable:
line_num = item["line_num"]
code = item["code"]
reason = item["reason"]
# 翻译原因说明
reason_desc = {
"default branch in fully-covered case statement": "Case语句所有分支已覆盖default分支永远不会执行",
"code after endcase": "endcase之后的代码不可达",
"near endmodule": "endmodule附近的代码不可达",
"declaration statement": "声明语句不是可执行代码",
"else branch in fully-covered if statement": "if语句所有条件已覆盖else分支永远不会执行"
}.get(reason, reason)
report_content += f" Line {line_num}:\n"
report_content += f" Code: {code}\n"
report_content += f" Reason: {reason_desc}\n"
report_content += f" Analysis: {reason}\n\n"
# 可覆盖但未测试的分支
can_be_reached = unreachable_info.get("can_be_reached", [])
if can_be_reached:
report_content += f"\n[POTENTIALLY COVERABLE] ({len(can_be_reached)} branches)\n"
report_content += "-" * 60 + "\n"
report_content += "These branches are theoretically reachable but not exercised by testbench:\n\n"
for item in can_be_reached:
line_num = item["line_num"]
code = item["code"]
reason = item["reason"]
reason_desc = {
"default branch in fully-covered case statement": "Case语句所有分支已覆盖default分支理论不可达",
"potentially coverable but not tested": "理论上可通过添加特定测试向量覆盖"
}.get(reason, reason)
report_content += f" Line {line_num}:\n"
report_content += f" Code: {code}\n"
report_content += f" Reason: {reason_desc}\n"
report_content += f" Analysis: {reason}\n\n"
report_content += "=" * 60 + "\n"
report_content += f"Summary: {len(truly_unreachable)} truly unreachable, {len(can_be_reached)} potentially coverable\n"
report_content += "=" * 60 + "\n"
report_path = os.path.join(work_dir, "unreachable_branches_report.txt")
ls.save_code(report_content, report_path)
logger.warning(f"Unreachable branches found: {unreachable_info['count']} lines")
logger.info(f"Unreachable branches report saved to: {report_path}")
else:
logger.info("No unreachable branches found in DUT.")
# =========================================
# === [新增] 生成能量分配报告 ===
if self.energy_allocator:
energy_report = self.energy_allocator.generate_report()
ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
# =================================
# === [新增] 生成多样性报告并保存历史 ===
if self.diversity_injector:
diversity_report = self.diversity_injector.generate_diversity_report()
ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
# 保存测试历史
self.diversity_injector.history.save()
# ======================================
# === [新增] Layer 3: 生成质量评估报告 ===
if self.quality_evaluator:
quality_report = self.quality_evaluator.generate_report()
ls.save_code(quality_report, os.path.join(work_dir, "quality_evaluation_report.txt"))
logger.info(f"Quality evaluation report saved to {work_dir}/quality_evaluation_report.txt")
# 输出语义覆盖率摘要
coverage_result = self.quality_evaluator.semantic_coverage.calculate_coverage()
logger.info(f"Semantic Coverage: {coverage_result.semantic_coverage:.2%}")
# ===========================================
# [修改] 返回元组 (代码, 分数)
return self.best_tb, self.best_score