1454 lines
63 KiB
Python
1454 lines
63 KiB
Python
# # #第四版
|
||
|
||
# # """
|
||
# # Description : Coverage-Guided Agent (CGA) Main Controller
|
||
# # - Integrated with Layer 0: Semantic Analysis
|
||
# # - Integrated with Layer 1: Diversity Constraint Injection
|
||
# # - Integrated with Layer 4: Energy Allocation
|
||
# # Author : CorrectBench Integration
|
||
# # """
|
||
# # import os
|
||
# # import sys
|
||
# # import shutil
|
||
|
||
# # import LLM_call as llm
|
||
# # import loader_saver as ls
|
||
# # from loader_saver import autologger as logger
|
||
# # from utils.verilator_call import verilator_run_coverage
|
||
# # from autoline.cga_utils import CoverageParser, TBInjector
|
||
# # # [新增] 导入语义分析层
|
||
# # from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
|
||
# # # [新增] 导入能量分配层
|
||
# # from autoline.energy_allocator import EnergyAllocator, EnergyState
|
||
# # # [新增] 导入多样性约束注入器
|
||
# # from autoline.diversity_injector import DiversityInjector
|
||
# # # [新增] 导入测试历史管理器
|
||
# # from autoline.test_history import TestHistoryManager
|
||
|
||
# # class TaskTBCGA:
|
||
# # def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config):
|
||
# # self.task_dir = task_dir
|
||
# # self.task_id = task_id
|
||
# # self.header = header
|
||
# # self.DUT_code = DUT_code
|
||
# # self.TB_code = TB_code
|
||
# # self.config = config
|
||
|
||
# # self.max_iter = 5
|
||
# # self.target_coverage = 95.0
|
||
# # self.model = config.gpt.model
|
||
|
||
# # self.best_tb = TB_code
|
||
# # self.best_score = 0.0
|
||
|
||
# # # [新增] 能量分配器
|
||
# # self.energy_allocator: EnergyAllocator = None
|
||
# # # [新增] 多样性约束注入器
|
||
# # self.diversity_injector: DiversityInjector = None
|
||
|
||
# # # [新增辅助函数] 从父目录拷贝 DUT
|
||
# # def _prepare_dut(self, target_dir):
|
||
# # source_dut = os.path.join(self.task_dir, "DUT.v")
|
||
# # target_dut = os.path.join(target_dir, "DUT.v")
|
||
|
||
# # # 优先拷贝现有的文件
|
||
# # if os.path.exists(source_dut):
|
||
# # shutil.copy(source_dut, target_dut)
|
||
# # else:
|
||
# # # 只有当文件由于某种原因被删除了,才降级使用内存中的 code
|
||
# # ls.save_code(self.DUT_code, target_dut)
|
||
|
||
|
||
|
||
# # def run(self):
|
||
# # logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
|
||
|
||
# # # 1. 确保工作目录存在 (saves/任务名/5_CGA)
|
||
# # work_dir = os.path.join(self.task_dir, "5_CGA")
|
||
# # if os.path.exists(work_dir):
|
||
# # shutil.rmtree(work_dir)
|
||
# # os.makedirs(work_dir, exist_ok=True)
|
||
|
||
# # # === [新增] Step 0: 语义分析 ===
|
||
# # logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
|
||
# # self.semantic_result = None
|
||
# # try:
|
||
# # semantic_analyzer = SemanticAnalyzer(self.DUT_code)
|
||
# # self.semantic_result = semantic_analyzer.analyze()
|
||
|
||
# # # 记录分析结果摘要
|
||
# # fp_count = len(self.semantic_result.get('function_points', []))
|
||
# # fsm_info = semantic_analyzer.get_fsm_info()
|
||
# # if fsm_info:
|
||
# # logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
|
||
# # f"({len(fsm_info.get('states', []))} states)")
|
||
# # logger.info(f" Total function points identified: {fp_count}")
|
||
|
||
# # # 保存语义分析报告
|
||
# # semantic_report = semantic_analyzer.generate_prompt_context()
|
||
# # ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
|
||
|
||
# # # === [新增] Step 0.1: 初始化能量分配器 ===
|
||
# # if self.semantic_result.get('function_points'):
|
||
# # self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
|
||
# # energy_init_result = self.energy_allocator.initialize(
|
||
# # self.semantic_result['function_points']
|
||
# # )
|
||
# # logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
|
||
|
||
# # # === [新增] Step 0.2: 初始化多样性约束注入器 ===
|
||
# # history_file = os.path.join(work_dir, "test_history.json")
|
||
# # # 创建 TestHistoryManager 并传递 history_file
|
||
# # history_manager = TestHistoryManager(history_file=history_file)
|
||
# # self.diversity_injector = DiversityInjector(history_manager=history_manager)
|
||
# # logger.info(f" Diversity injector initialized with history file: {history_file}")
|
||
|
||
# # except Exception as e:
|
||
# # logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
|
||
# # # ================================
|
||
|
||
# # current_tb = self.TB_code
|
||
# # last_annotated_file = None
|
||
|
||
# # # --- Baseline ---
|
||
# # logger.info(f"--- CGA Iter 0 (Baseline) ---")
|
||
# # iter0_dir = os.path.join(work_dir, "iter_0")
|
||
# # os.makedirs(iter0_dir, exist_ok=True)
|
||
|
||
# # self._prepare_dut(iter0_dir)
|
||
# # ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
|
||
|
||
# # success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
|
||
|
||
# # self.best_score = score
|
||
# # self.best_tb = current_tb
|
||
# # last_annotated_file = annotated_path
|
||
|
||
# # logger.info(f"Baseline Coverage: {score:.2f}%")
|
||
|
||
# # if score >= self.target_coverage:
|
||
# # logger.success(f"Target reached at baseline!")
|
||
# # # [修改] 返回元组 (代码, 分数)
|
||
# # return self.best_tb, self.best_score
|
||
|
||
# # # --- Loop ---
|
||
# # for i in range(1, self.max_iter + 1):
|
||
# # logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
|
||
|
||
# # # === [新增] 能量检查:是否还有活跃目标 ===
|
||
# # if self.energy_allocator:
|
||
# # current_target = self.energy_allocator.select_next_target()
|
||
# # if not current_target:
|
||
# # logger.info("No more active targets with remaining energy. Stopping.")
|
||
# # break
|
||
# # logger.info(f"Target: {current_target}")
|
||
# # # =========================================
|
||
|
||
# # if not last_annotated_file: break
|
||
|
||
# # # [修改] 传递语义分析结果、能量分配器、多样性注入器给 CoverageParser
|
||
# # parser = CoverageParser(
|
||
# # last_annotated_file,
|
||
# # tb_code=self.best_tb,
|
||
# # semantic_result=self.semantic_result,
|
||
# # energy_allocator=self.energy_allocator,
|
||
# # diversity_injector=self.diversity_injector # [新增]
|
||
# # )
|
||
# # prompt = parser.generate_prompt(self.best_score)
|
||
|
||
# # if not prompt:
|
||
# # logger.info("No reachable missing blocks found. Stopping.")
|
||
# # break
|
||
|
||
# # logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
|
||
# # messages = [{"role": "user", "content": prompt}]
|
||
|
||
# # try:
|
||
# # response, _ = llm.llm_call(messages, self.model)
|
||
# # codes = llm.extract_code(response, "verilog")
|
||
# # new_task_code = codes[0] if codes else ""
|
||
# # if not new_task_code:
|
||
# # # [新增] 记录失败
|
||
# # if self.energy_allocator:
|
||
# # self.energy_allocator.record_generation(
|
||
# # success=False,
|
||
# # coverage_delta=0.0,
|
||
# # energy_cost=1.0
|
||
# # )
|
||
# # continue
|
||
# # except Exception as e:
|
||
# # logger.error(f"LLM Call failed: {e}")
|
||
# # # [新增] 记录失败
|
||
# # if self.energy_allocator:
|
||
# # self.energy_allocator.record_generation(
|
||
# # success=False,
|
||
# # coverage_delta=0.0,
|
||
# # energy_cost=1.0
|
||
# # )
|
||
# # break
|
||
|
||
# # injector = TBInjector(self.best_tb)
|
||
# # enhanced_tb = injector.inject(new_task_code, iter_idx=i)
|
||
|
||
# # iter_dir = os.path.join(work_dir, f"iter_{i}")
|
||
# # os.makedirs(iter_dir, exist_ok=True)
|
||
|
||
# # self._prepare_dut(iter_dir)
|
||
# # ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
|
||
|
||
# # success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
|
||
|
||
# # # === [新增] 记录生成结果到能量分配器 ===
|
||
# # coverage_delta = new_score - self.best_score if success else 0.0
|
||
# # generation_success = success and new_score > self.best_score
|
||
|
||
# # if self.energy_allocator:
|
||
# # self.energy_allocator.record_generation(
|
||
# # success=generation_success,
|
||
# # coverage_delta=coverage_delta,
|
||
# # energy_cost=1.0
|
||
# # )
|
||
# # # =========================================
|
||
|
||
# # # === [新增] 记录测试用例到多样性历史 ===
|
||
# # if self.diversity_injector:
|
||
# # # 提取已知信号
|
||
# # known_signals = []
|
||
# # if self.semantic_result:
|
||
# # known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
|
||
|
||
# # self.diversity_injector.record_test(
|
||
# # code=new_task_code,
|
||
# # target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
|
||
# # coverage_score=new_score,
|
||
# # success=generation_success,
|
||
# # iteration=i,
|
||
# # known_signals=known_signals
|
||
# # )
|
||
# # # =======================================
|
||
|
||
# # if success and new_score > self.best_score:
|
||
# # improvement = new_score - self.best_score
|
||
# # logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
|
||
# # self.best_score = new_score
|
||
# # self.best_tb = enhanced_tb
|
||
# # last_annotated_file = new_annotated_path
|
||
# # elif success and new_score == self.best_score:
|
||
# # logger.info(f"Coverage unchanged. Keeping previous.")
|
||
# # else:
|
||
# # logger.warning(f"Regression or Failure. Discarding changes.")
|
||
|
||
# # if self.best_score >= self.target_coverage:
|
||
# # logger.success("Target coverage reached!")
|
||
# # break
|
||
|
||
# # logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
|
||
|
||
# # # === [新增] 生成能量分配报告 ===
|
||
# # if self.energy_allocator:
|
||
# # energy_report = self.energy_allocator.generate_report()
|
||
# # ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
|
||
# # logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
|
||
# # # =================================
|
||
|
||
# # # === [新增] 生成多样性报告并保存历史 ===
|
||
# # if self.diversity_injector:
|
||
# # diversity_report = self.diversity_injector.generate_diversity_report()
|
||
# # ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
|
||
# # logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
|
||
|
||
# # # 保存测试历史
|
||
# # self.diversity_injector.history.save()
|
||
# # # ======================================
|
||
|
||
# # # [修改] 返回元组 (代码, 分数)
|
||
# # return self.best_tb, self.best_score
|
||
|
||
# #终版
|
||
# """
|
||
# Description : Coverage-Guided Agent (CGA) Main Controller
|
||
# - Integrated with Layer 0: Semantic Analysis
|
||
# - Integrated with Layer 1: Diversity Constraint Injection
|
||
# - Integrated with Layer 3: Quality Evaluation
|
||
# - Integrated with Layer 4: Energy Allocation
|
||
# Author : CorrectBench Integration
|
||
# """
|
||
# import os
|
||
# import sys
|
||
# import shutil
|
||
|
||
# import LLM_call as llm
|
||
# import loader_saver as ls
|
||
# from loader_saver import autologger as logger
|
||
# from utils.verilator_call import verilator_run_coverage
|
||
# from autoline.cga_utils import CoverageParser, TBInjector
|
||
# # [新增] 导入语义分析层
|
||
# from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
|
||
# # [新增] 导入能量分配层
|
||
# from autoline.energy_allocator import EnergyAllocator, EnergyState
|
||
# # [新增] 导入多样性约束注入器
|
||
# from autoline.diversity_injector import DiversityInjector
|
||
# # [新增] 导入测试历史管理器
|
||
# from autoline.test_history import TestHistoryManager
|
||
# # [新增] 导入质量评估层
|
||
# from autoline.quality_evaluator import QualityEvaluator, DiversityScore, SemanticCoverageResult
|
||
|
||
# class TaskTBCGA:
|
||
# def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config):
|
||
# self.task_dir = task_dir
|
||
# self.task_id = task_id
|
||
# self.header = header
|
||
# self.DUT_code = DUT_code
|
||
# self.TB_code = TB_code
|
||
# self.config = config
|
||
|
||
# self.max_iter = 5
|
||
# self.target_coverage = 95.0
|
||
# self.model = config.gpt.model
|
||
|
||
# self.best_tb = TB_code
|
||
# self.best_score = 0.0
|
||
|
||
# # [新增] 能量分配器
|
||
# self.energy_allocator: EnergyAllocator = None
|
||
# # [新增] 多样性约束注入器
|
||
# self.diversity_injector: DiversityInjector = None
|
||
# # [新增] 质量评估器
|
||
# self.quality_evaluator: QualityEvaluator = None
|
||
|
||
# # [新增辅助函数] 从父目录拷贝 DUT
|
||
# def _prepare_dut(self, target_dir):
|
||
# source_dut = os.path.join(self.task_dir, "DUT.v")
|
||
# target_dut = os.path.join(target_dir, "DUT.v")
|
||
|
||
# # 优先拷贝现有的文件
|
||
# if os.path.exists(source_dut):
|
||
# shutil.copy(source_dut, target_dut)
|
||
# else:
|
||
# # 只有当文件由于某种原因被删除了,才降级使用内存中的 code
|
||
# ls.save_code(self.DUT_code, target_dut)
|
||
|
||
# def _generate_exploration_prompt(self, iteration: int) -> str:
|
||
# """
|
||
# 生成探索性测试 Prompt
|
||
|
||
# 当找不到明确的 missing blocks 但覆盖率仍未达标时,
|
||
# 生成一个探索性 Prompt 来尝试发现新的测试路径。
|
||
|
||
# Args:
|
||
# iteration: 当前迭代次数
|
||
|
||
# Returns:
|
||
# 探索性测试 Prompt,如果无法生成则返回 None
|
||
# """
|
||
# # 从语义分析结果获取 FSM 和功能点信息
|
||
# fsm_info = ""
|
||
# if self.semantic_result:
|
||
# fsm_data = self.semantic_result.get('fsm', {})
|
||
# if fsm_data:
|
||
# states = fsm_data.get('states', [])
|
||
# state_var = fsm_data.get('state_variable', 'state')
|
||
# fsm_info = f"""
|
||
# [FSM INFORMATION]
|
||
# - State variable: {state_var}
|
||
# - Known states: {', '.join(states) if states else 'unknown'}
|
||
|
||
# The DUT appears to be a Finite State Machine. To improve coverage:
|
||
# 1. Try to visit each state by driving inputs that trigger state transitions
|
||
# 2. For each state, try different input combinations
|
||
# 3. Consider edge cases: reset transitions, timeout conditions, error states
|
||
# """
|
||
|
||
# # 从能量分配器获取目标功能点
|
||
# target_info = ""
|
||
# if self.energy_allocator and self.energy_allocator.current_target:
|
||
# target = self.energy_allocator.current_target
|
||
# target_info = f"""
|
||
# [CURRENT TARGET]
|
||
# Focus on: {target.function_point}
|
||
# Remaining energy: {target.remaining}
|
||
# """
|
||
|
||
# # 从多样性注入器获取已尝试的测试
|
||
# diversity_hints = ""
|
||
# if self.diversity_injector:
|
||
# history = self.diversity_injector.history
|
||
# # if history and len(history.history) > 0:
|
||
# # recent_tests = history.history[-5:] if len(history.history) > 5 else history.history
|
||
# if history and hasattr(history, 'records') and len(history.records) > 0:
|
||
# recent_tests = history.records[-5:] if len(history.records) > 5 else history.records
|
||
# diversity_hints = f"""
|
||
# [RECENTLY TRIED APPROACHES - AVOID REPETITION]
|
||
# Recent test patterns tried:
|
||
# """
|
||
# # for i, test in enumerate(recent_tests):
|
||
# # diversity_hints += f"- Iter {test.get('iteration', i)}: target={test.get('target_function', 'unknown')}\n"
|
||
# for i, test in enumerate(recent_tests):
|
||
# # TestRecord 是 dataclass,使用属性访问
|
||
# target = getattr(test, 'target_function', 'unknown') if hasattr(test, 'target_function') else 'unknown'
|
||
# iteration = getattr(test, 'iteration', i) if hasattr(test, 'iteration') else i
|
||
# diversity_hints += f"- Iter {iteration}: target={target}\n"
|
||
|
||
# prompt = f"""
|
||
# [EXPLORATION MODE - ITERATION {iteration}]
|
||
|
||
# Current coverage is {self.best_score:.2f}%, but no specific uncovered code blocks were identified.
|
||
# This may happen when:
|
||
# 1. Coverage data is incomplete or filtered
|
||
# 2. Branch/condition coverage needs improvement (not just line coverage)
|
||
# 3. State transitions in FSM are not fully exercised
|
||
|
||
# {fsm_info}
|
||
# {target_info}
|
||
# {diversity_hints}
|
||
|
||
# [YOUR TASK]
|
||
# Write an EXPLORATORY test scenario that:
|
||
# 1. Covers different input combinations than previous tests
|
||
# 2. Explores different FSM state transitions
|
||
# 3. Tests edge cases and boundary conditions
|
||
# 4. Varies timing and sequence of inputs
|
||
|
||
# [OUTPUT FORMAT]
|
||
# Return ONLY Verilog test scenario code (no task wrapper).
|
||
# Use the signal names from the testbench.
|
||
|
||
# ```verilog
|
||
# // Your exploratory test code here
|
||
# ```
|
||
# """
|
||
# return prompt
|
||
|
||
|
||
|
||
|
||
# def run(self):
|
||
# logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
|
||
|
||
# # 1. 确保工作目录存在 (saves/任务名/5_CGA)
|
||
# work_dir = os.path.join(self.task_dir, "5_CGA")
|
||
# if os.path.exists(work_dir):
|
||
# shutil.rmtree(work_dir)
|
||
# os.makedirs(work_dir, exist_ok=True)
|
||
|
||
# # === [新增] Step 0: 语义分析 ===
|
||
# logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
|
||
# self.semantic_result = None
|
||
# try:
|
||
# semantic_analyzer = SemanticAnalyzer(self.DUT_code)
|
||
# self.semantic_result = semantic_analyzer.analyze()
|
||
|
||
# # 记录分析结果摘要
|
||
# fp_count = len(self.semantic_result.get('function_points', []))
|
||
# fsm_info = semantic_analyzer.get_fsm_info()
|
||
# if fsm_info:
|
||
# logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
|
||
# f"({len(fsm_info.get('states', []))} states)")
|
||
# logger.info(f" Total function points identified: {fp_count}")
|
||
|
||
# # 保存语义分析报告
|
||
# semantic_report = semantic_analyzer.generate_prompt_context()
|
||
# ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
|
||
|
||
# # === [新增] Step 0.1: 初始化能量分配器 ===
|
||
# if self.semantic_result.get('function_points'):
|
||
# self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
|
||
# energy_init_result = self.energy_allocator.initialize(
|
||
# self.semantic_result['function_points']
|
||
# )
|
||
# logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
|
||
|
||
# # === [新增] Step 0.2: 初始化多样性约束注入器 ===
|
||
# history_file = os.path.join(work_dir, "test_history.json")
|
||
# # 创建 TestHistoryManager 并传递 history_file
|
||
# history_manager = TestHistoryManager(history_file=history_file)
|
||
# self.diversity_injector = DiversityInjector(history_manager=history_manager)
|
||
# logger.info(f" Diversity injector initialized with history file: {history_file}")
|
||
|
||
# # === [新增] Step 0.3: 初始化质量评估器 ===
|
||
# if self.semantic_result.get('function_points'):
|
||
# self.quality_evaluator = QualityEvaluator(
|
||
# function_points=self.semantic_result['function_points']
|
||
# )
|
||
# logger.info(f" Quality evaluator initialized")
|
||
|
||
# except Exception as e:
|
||
# logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
|
||
# # ================================
|
||
|
||
# current_tb = self.TB_code
|
||
# last_annotated_file = None
|
||
|
||
# # --- Baseline ---
|
||
# logger.info(f"--- CGA Iter 0 (Baseline) ---")
|
||
# iter0_dir = os.path.join(work_dir, "iter_0")
|
||
# os.makedirs(iter0_dir, exist_ok=True)
|
||
|
||
# self._prepare_dut(iter0_dir)
|
||
# ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
|
||
|
||
# success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
|
||
|
||
# self.best_score = score
|
||
# self.best_tb = current_tb
|
||
# last_annotated_file = annotated_path
|
||
|
||
# logger.info(f"Baseline Coverage: {score:.2f}%")
|
||
|
||
# if score >= self.target_coverage:
|
||
# logger.success(f"Target reached at baseline!")
|
||
# # [修改] 返回元组 (代码, 分数)
|
||
# return self.best_tb, self.best_score
|
||
|
||
# # --- Loop ---
|
||
# for i in range(1, self.max_iter + 1):
|
||
# logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
|
||
|
||
# # === [新增] 能量检查:是否还有活跃目标 ===
|
||
# if self.energy_allocator:
|
||
# current_target = self.energy_allocator.select_next_target()
|
||
# if not current_target:
|
||
# logger.info("No more active targets with remaining energy. Stopping.")
|
||
# break
|
||
# logger.info(f"Target: {current_target}")
|
||
# # =========================================
|
||
|
||
# if not last_annotated_file: break
|
||
|
||
# # [修改] 传递语义分析结果、能量分配器、多样性注入器给 CoverageParser
|
||
# parser = CoverageParser(
|
||
# last_annotated_file,
|
||
# tb_code=self.best_tb,
|
||
# semantic_result=self.semantic_result,
|
||
# energy_allocator=self.energy_allocator,
|
||
# diversity_injector=self.diversity_injector # [新增]
|
||
# )
|
||
# prompt = parser.generate_prompt(self.best_score)
|
||
|
||
# # if not prompt:
|
||
# # logger.info("No reachable missing blocks found. Stopping.")
|
||
# # break
|
||
# if not prompt:
|
||
# if self.best_score >= self.target_coverage:
|
||
# break # 达标才停止
|
||
# else:
|
||
# # 未达标,尝试探索性测试
|
||
# prompt = self._generate_exploration_prompt(i)
|
||
|
||
# logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
|
||
# messages = [{"role": "user", "content": prompt}]
|
||
|
||
# try:
|
||
# response, _ = llm.llm_call(messages, self.model)
|
||
# codes = llm.extract_code(response, "verilog")
|
||
# new_task_code = codes[0] if codes else ""
|
||
# if not new_task_code:
|
||
# # [新增] 记录失败
|
||
# if self.energy_allocator:
|
||
# self.energy_allocator.record_generation(
|
||
# success=False,
|
||
# coverage_delta=0.0,
|
||
# energy_cost=1.0
|
||
# )
|
||
# continue
|
||
# except Exception as e:
|
||
# logger.error(f"LLM Call failed: {e}")
|
||
# # [新增] 记录失败
|
||
# if self.energy_allocator:
|
||
# self.energy_allocator.record_generation(
|
||
# success=False,
|
||
# coverage_delta=0.0,
|
||
# energy_cost=1.0
|
||
# )
|
||
# break
|
||
|
||
# injector = TBInjector(self.best_tb)
|
||
# enhanced_tb = injector.inject(new_task_code, iter_idx=i)
|
||
|
||
# iter_dir = os.path.join(work_dir, f"iter_{i}")
|
||
# os.makedirs(iter_dir, exist_ok=True)
|
||
|
||
# self._prepare_dut(iter_dir)
|
||
# ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
|
||
|
||
# success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
|
||
|
||
# # === [新增] 记录生成结果到能量分配器 ===
|
||
# coverage_delta = new_score - self.best_score if success else 0.0
|
||
# generation_success = success and new_score > self.best_score
|
||
|
||
# if self.energy_allocator:
|
||
# self.energy_allocator.record_generation(
|
||
# success=generation_success,
|
||
# coverage_delta=coverage_delta,
|
||
# energy_cost=1.0
|
||
# )
|
||
# # =========================================
|
||
|
||
# # === [新增] 记录测试用例到多样性历史 ===
|
||
# if self.diversity_injector:
|
||
# # 提取已知信号
|
||
# known_signals = []
|
||
# if self.semantic_result:
|
||
# known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
|
||
|
||
# self.diversity_injector.record_test(
|
||
# code=new_task_code,
|
||
# target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
|
||
# coverage_score=new_score,
|
||
# success=generation_success,
|
||
# iteration=i,
|
||
# known_signals=known_signals
|
||
# )
|
||
# # =======================================
|
||
|
||
# # === [新增] Layer 3: 质量评估 ===
|
||
# if self.quality_evaluator:
|
||
# # 评估测试用例质量
|
||
# eval_result = self.quality_evaluator.evaluate_test_case(
|
||
# code=new_task_code,
|
||
# covered_lines=set(), # 如果有具体覆盖行信息可传入
|
||
# covered_functions=[], # 如果有覆盖功能点信息可传入
|
||
# test_id=f"iter_{i}",
|
||
# iteration=i
|
||
# )
|
||
|
||
# # 记录多样性得分
|
||
# diversity_score = eval_result.get('diversity', {}).get('overall_score', 0)
|
||
# logger.info(f" Quality Evaluation: diversity={diversity_score:.2f}")
|
||
|
||
# # 检查是否应该接受该测试用例
|
||
# should_accept, reason = self.quality_evaluator.should_accept(eval_result)
|
||
# if not should_accept:
|
||
# logger.warning(f" Quality check failed: {reason}")
|
||
# # =====================================
|
||
|
||
# if success and new_score > self.best_score:
|
||
# improvement = new_score - self.best_score
|
||
# logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
|
||
# self.best_score = new_score
|
||
# self.best_tb = enhanced_tb
|
||
# last_annotated_file = new_annotated_path
|
||
# elif success and new_score == self.best_score:
|
||
# logger.info(f"Coverage unchanged. Keeping previous.")
|
||
# else:
|
||
# logger.warning(f"Regression or Failure. Discarding changes.")
|
||
|
||
# if self.best_score >= self.target_coverage:
|
||
# logger.success("Target coverage reached!")
|
||
# break
|
||
|
||
# logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
|
||
|
||
# # === [新增] 生成能量分配报告 ===
|
||
# if self.energy_allocator:
|
||
# energy_report = self.energy_allocator.generate_report()
|
||
# ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
|
||
# logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
|
||
# # =================================
|
||
|
||
# # === [新增] 生成多样性报告并保存历史 ===
|
||
# if self.diversity_injector:
|
||
# diversity_report = self.diversity_injector.generate_diversity_report()
|
||
# ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
|
||
# logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
|
||
|
||
# # 保存测试历史
|
||
# self.diversity_injector.history.save()
|
||
# # ======================================
|
||
|
||
# # === [新增] Layer 3: 生成质量评估报告 ===
|
||
# if self.quality_evaluator:
|
||
# quality_report = self.quality_evaluator.generate_report()
|
||
# ls.save_code(quality_report, os.path.join(work_dir, "quality_evaluation_report.txt"))
|
||
# logger.info(f"Quality evaluation report saved to {work_dir}/quality_evaluation_report.txt")
|
||
|
||
# # 输出语义覆盖率摘要
|
||
# coverage_result = self.quality_evaluator.semantic_coverage.calculate_coverage()
|
||
# logger.info(f"Semantic Coverage: {coverage_result.semantic_coverage:.2%}")
|
||
# # ===========================================
|
||
|
||
# # [修改] 返回元组 (代码, 分数)
|
||
# return self.best_tb, self.best_score
|
||
|
||
"""
|
||
Description : Coverage-Guided Agent (CGA) Main Controller
|
||
- Integrated with Layer 0: Semantic Analysis
|
||
- Integrated with Layer 1: Diversity Constraint Injection
|
||
- Integrated with Layer 3: Quality Evaluation
|
||
- Integrated with Layer 4: Energy Allocation
|
||
Author : CorrectBench Integration
|
||
"""
|
||
import os
|
||
import re
|
||
import sys
|
||
import shutil
|
||
|
||
import LLM_call as llm
|
||
import loader_saver as ls
|
||
from loader_saver import autologger as logger
|
||
from utils.verilator_call import verilator_run_coverage
|
||
from autoline.cga_utils import CoverageParser, TBInjector
|
||
# [新增] 导入语义分析层
|
||
from autoline.semantic_analyzer import SemanticAnalyzer, FunctionPointType
|
||
# [新增] 导入能量分配层
|
||
from autoline.energy_allocator import EnergyAllocator, EnergyState
|
||
# [新增] 导入多样性约束注入器
|
||
from autoline.diversity_injector import DiversityInjector
|
||
# [新增] 导入测试历史管理器
|
||
from autoline.test_history import TestHistoryManager
|
||
# [新增] 导入质量评估层
|
||
from autoline.quality_evaluator import QualityEvaluator, DiversityScore, SemanticCoverageResult
|
||
|
||
class TaskTBCGA:
|
||
def __init__(self, task_dir, task_id, header, DUT_code, TB_code, config, work_subdir="CGA", max_iter=None):
|
||
self.task_dir = task_dir
|
||
self.task_id = task_id
|
||
self.header = header
|
||
self.DUT_code = DUT_code
|
||
self.TB_code = TB_code
|
||
self.config = config
|
||
self.work_subdir = work_subdir
|
||
|
||
self.max_iter = config.autoline.cga.max_iter if max_iter is None else max_iter
|
||
self.target_coverage = config.autoline.cga.target_coverage
|
||
self.model = config.gpt.model
|
||
|
||
self.best_tb = TB_code
|
||
self.best_score = 0.0
|
||
self.best_covered_lines = set()
|
||
self.best_covered_functions = set()
|
||
|
||
# [新增] 能量分配器
|
||
self.energy_allocator: EnergyAllocator = None
|
||
# [新增] 多样性约束注入器
|
||
self.diversity_injector: DiversityInjector = None
|
||
# [新增] 质量评估器
|
||
self.quality_evaluator: QualityEvaluator = None
|
||
|
||
# [新增辅助函数] 从父目录拷贝 DUT
|
||
def _prepare_dut(self, target_dir):
|
||
source_dut = os.path.join(self.task_dir, "DUT.v")
|
||
target_dut = os.path.join(target_dir, "DUT.v")
|
||
|
||
# 优先拷贝现有的文件
|
||
if os.path.exists(source_dut):
|
||
shutil.copy(source_dut, target_dut)
|
||
else:
|
||
# 只有当文件由于某种原因被删除了,才降级使用内存中的 code
|
||
ls.save_code(self.DUT_code, target_dut)
|
||
|
||
def _extract_coverage_snapshot(self, annotated_path):
|
||
"""
|
||
从 Verilator annotated DUT 中提取当前已覆盖行和已覆盖功能点。
|
||
"""
|
||
snapshot = {
|
||
"covered_lines": set(),
|
||
"covered_functions": set(),
|
||
"coverable_lines": set(),
|
||
}
|
||
|
||
if not annotated_path or not os.path.exists(annotated_path):
|
||
return snapshot
|
||
|
||
pct_pattern = re.compile(r"^%(\d+)\s+(.*)$")
|
||
tilde_pattern = re.compile(r"^~(\d+)\s+(.*)$")
|
||
caret_pattern = re.compile(r"^\^(\d+)\s+(.*)$")
|
||
plain_pattern = re.compile(r"^\s*(\d+)\s+(.*)$")
|
||
decl_pattern = re.compile(r"^\s*(input|output|inout|wire|reg|logic|parameter|localparam|assign)\b")
|
||
|
||
with open(annotated_path, "r", encoding="utf-8", errors="ignore") as f:
|
||
for line_no, raw_line in enumerate(f, start=1):
|
||
stripped = raw_line.strip()
|
||
if not stripped:
|
||
continue
|
||
|
||
count = None
|
||
code_part = None
|
||
is_caret = False
|
||
|
||
match = pct_pattern.match(stripped)
|
||
if match:
|
||
count = int(match.group(1))
|
||
code_part = match.group(2).strip()
|
||
else:
|
||
match = tilde_pattern.match(stripped)
|
||
if match:
|
||
count = int(match.group(1))
|
||
code_part = match.group(2).strip()
|
||
else:
|
||
match = caret_pattern.match(stripped)
|
||
if match:
|
||
is_caret = True
|
||
code_part = match.group(2).strip()
|
||
else:
|
||
match = plain_pattern.match(stripped)
|
||
if match:
|
||
count = int(match.group(1))
|
||
code_part = match.group(2).strip()
|
||
|
||
if code_part is None:
|
||
continue
|
||
|
||
if "//" in code_part:
|
||
code_part = code_part.split("//", 1)[0].strip()
|
||
|
||
if not code_part:
|
||
continue
|
||
if decl_pattern.match(code_part):
|
||
continue
|
||
if code_part in {"begin", "end", "else", "endmodule", "endcase", ");", "default:"}:
|
||
continue
|
||
if not any(ch.isalnum() for ch in code_part):
|
||
continue
|
||
|
||
snapshot["coverable_lines"].add(line_no)
|
||
if (count is not None) and (count > 0) and not is_caret:
|
||
snapshot["covered_lines"].add(line_no)
|
||
|
||
snapshot["covered_functions"] = self._map_lines_to_function_points(snapshot["covered_lines"])
|
||
return snapshot
|
||
|
||
def _map_lines_to_function_points(self, covered_lines):
|
||
"""
|
||
用功能点 location 与已覆盖行做交集,推断当前已命中的功能点。
|
||
"""
|
||
matched = set()
|
||
|
||
if not self.semantic_result:
|
||
return matched
|
||
|
||
for fp in self.semantic_result.get("function_points", []):
|
||
location = fp.get("location", {})
|
||
start_line = location.get("start_line", 0)
|
||
end_line = location.get("end_line", 0)
|
||
if (start_line <= 0) or (end_line <= 0):
|
||
continue
|
||
|
||
if any(start_line <= line_no <= end_line for line_no in covered_lines):
|
||
matched.add(fp.get("name", ""))
|
||
|
||
matched.discard("")
|
||
return matched
|
||
|
||
def _generate_exploration_prompt(self, iteration: int) -> str:
|
||
"""
|
||
生成探索性测试 Prompt
|
||
|
||
当找不到明确的 missing blocks 但覆盖率仍未达标时,
|
||
生成一个探索性 Prompt 来尝试发现新的测试路径。
|
||
|
||
Args:
|
||
iteration: 当前迭代次数
|
||
|
||
Returns:
|
||
探索性测试 Prompt,如果无法生成则返回 None
|
||
"""
|
||
# 从语义分析结果获取 FSM 和功能点信息
|
||
fsm_info = ""
|
||
if self.semantic_result:
|
||
fsm_data = self.semantic_result.get('fsm', {})
|
||
if fsm_data:
|
||
states = fsm_data.get('states', [])
|
||
state_var = fsm_data.get('state_variable', 'state')
|
||
fsm_info = f"""
|
||
[FSM INFORMATION]
|
||
- State variable: {state_var}
|
||
- Known states: {', '.join(states) if states else 'unknown'}
|
||
|
||
The DUT appears to be a Finite State Machine. To improve coverage:
|
||
1. Try to visit each state by driving inputs that trigger state transitions
|
||
2. For each state, try different input combinations
|
||
3. Consider edge cases: reset transitions, timeout conditions, error states
|
||
"""
|
||
|
||
# 从能量分配器获取目标功能点
|
||
target_info = ""
|
||
if self.energy_allocator and self.energy_allocator.current_target:
|
||
target = self.energy_allocator.current_target
|
||
target_info = f"""
|
||
[CURRENT TARGET]
|
||
Focus on: {target.function_point}
|
||
Remaining energy: {target.remaining}
|
||
"""
|
||
|
||
# 从多样性注入器获取已尝试的测试
|
||
diversity_hints = ""
|
||
if self.diversity_injector:
|
||
history = self.diversity_injector.history
|
||
# [修复] TestHistoryManager 使用 records 属性,不是 history
|
||
if history and hasattr(history, 'records') and len(history.records) > 0:
|
||
recent_tests = history.records[-5:] if len(history.records) > 5 else history.records
|
||
diversity_hints = f"""
|
||
[RECENTLY TRIED APPROACHES - AVOID REPETITION]
|
||
Recent test patterns tried:
|
||
"""
|
||
for i, test in enumerate(recent_tests):
|
||
# TestRecord 是 dataclass,使用属性访问
|
||
target = getattr(test, 'target_function', 'unknown') if hasattr(test, 'target_function') else test.get('target_function', 'unknown') if isinstance(test, dict) else 'unknown'
|
||
iteration = getattr(test, 'iteration', i) if hasattr(test, 'iteration') else test.get('iteration', i) if isinstance(test, dict) else i
|
||
diversity_hints += f"- Iter {iteration}: target={target}\n"
|
||
|
||
prompt = f"""
|
||
[EXPLORATION MODE - ITERATION {iteration}]
|
||
|
||
Current coverage is {self.best_score:.2f}%, but no specific uncovered code blocks were identified.
|
||
This may happen when:
|
||
1. Coverage data is incomplete or filtered
|
||
2. Branch/condition coverage needs improvement (not just line coverage)
|
||
3. State transitions in FSM are not fully exercised
|
||
|
||
{fsm_info}
|
||
{target_info}
|
||
{diversity_hints}
|
||
|
||
[YOUR TASK]
|
||
Write an EXPLORATORY test scenario that:
|
||
1. Covers different input combinations than previous tests
|
||
2. Explores different FSM state transitions
|
||
3. Tests edge cases and boundary conditions
|
||
4. Varies timing and sequence of inputs
|
||
|
||
[OUTPUT FORMAT]
|
||
Return ONLY Verilog test scenario code (no task wrapper).
|
||
Use the signal names from the testbench.
|
||
|
||
```verilog
|
||
// Your exploratory test code here
|
||
```
|
||
"""
|
||
return prompt
|
||
|
||
def _generate_syntax_fix_prompt(self, original_code: str, syntax_issues: dict, original_prompt: str) -> str:
|
||
"""
|
||
生成语法修正 Prompt,让 LLM 修复检测到的语法问题
|
||
|
||
Args:
|
||
original_code: 原始生成的代码
|
||
syntax_issues: 语法检查结果
|
||
original_prompt: 原始 Prompt
|
||
|
||
Returns:
|
||
修正 Prompt
|
||
"""
|
||
issues_text = []
|
||
|
||
for issue in syntax_issues.get('width_mismatch', []):
|
||
issues_text.append(f"- {issue['message']}")
|
||
if 'suggestion' in issue:
|
||
issues_text.append(f" Suggestion: {issue['suggestion']}")
|
||
|
||
for issue in syntax_issues.get('logic_issues', []):
|
||
issues_text.append(f"- {issue['message']}")
|
||
if 'suggestion' in issue:
|
||
issues_text.append(f" Suggestion: {issue['suggestion']}")
|
||
|
||
for issue in syntax_issues.get('syntax_warnings', []):
|
||
if issue['severity'] == 'error':
|
||
issues_text.append(f"- ERROR: {issue['message']}")
|
||
|
||
prompt = f"""
|
||
[SYNTAX FIX REQUEST]
|
||
|
||
The previously generated Verilog test code has the following issues:
|
||
|
||
{chr(10).join(issues_text)}
|
||
|
||
[ORIGINAL CODE]
|
||
```verilog
|
||
{original_code}
|
||
```
|
||
|
||
[YOUR TASK]
|
||
Fix the above code to address these issues. Pay special attention to:
|
||
|
||
1. **Width Mismatch**: When you want to input a bit sequence (e.g., 01111100) to a single-bit signal:
|
||
- WRONG: `{{in}} = 8'b01111100;` (truncates to single bit)
|
||
- CORRECT: Use a shift register
|
||
```verilog
|
||
reg [7:0] shift_reg;
|
||
shift_reg = 8'b01111100;
|
||
for (i = 0; i < 8; i = i + 1) begin
|
||
in = shift_reg[7];
|
||
shift_reg = shift_reg << 1;
|
||
@(posedge clk);
|
||
end
|
||
```
|
||
|
||
2. **Single-bit Shift**: Shifting a 1-bit signal has no effect:
|
||
- WRONG: `in = in >> 1;` (always results in 0)
|
||
- CORRECT: Use a multi-bit shift register as shown above
|
||
|
||
[OUTPUT FORMAT]
|
||
Return ONLY the corrected Verilog test scenario code:
|
||
```verilog
|
||
// Your corrected test code here
|
||
```
|
||
"""
|
||
return prompt
|
||
|
||
def _get_compile_error(self, iter_dir: str) -> str:
|
||
"""
|
||
获取 Verilator 编译错误日志
|
||
|
||
Args:
|
||
iter_dir: 迭代目录
|
||
|
||
Returns:
|
||
错误日志字符串
|
||
"""
|
||
error_parts = []
|
||
|
||
# 检查 obj_dir 是否存在
|
||
obj_dir = os.path.join(iter_dir, "obj_dir")
|
||
if not os.path.exists(obj_dir):
|
||
error_parts.append("obj_dir not created - compilation failed early")
|
||
|
||
# 检查可能的日志文件
|
||
log_files = [
|
||
os.path.join(iter_dir, "verilator.log"),
|
||
os.path.join(iter_dir, "compile.log"),
|
||
os.path.join(obj_dir, "Vtestbench.log"),
|
||
]
|
||
|
||
for log_file in log_files:
|
||
if os.path.exists(log_file):
|
||
try:
|
||
with open(log_file, 'r', errors='ignore') as f:
|
||
content = f.read()
|
||
if content.strip():
|
||
error_parts.append(f"=== {os.path.basename(log_file)} ===")
|
||
error_parts.append(content[-2000:]) # 最后 2000 字符
|
||
except Exception:
|
||
pass
|
||
|
||
# 如果没有找到日志文件,检查目录内容
|
||
if not error_parts:
|
||
error_parts.append(f"Directory contents of {iter_dir}:")
|
||
try:
|
||
for item in os.listdir(iter_dir):
|
||
error_parts.append(f" {item}")
|
||
except Exception:
|
||
pass
|
||
|
||
return '\n'.join(error_parts) if error_parts else "Unknown compilation error"
|
||
|
||
def _generate_compile_fix_prompt(self, compile_error: str, original_code: str) -> str:
|
||
"""
|
||
生成编译错误修正 Prompt
|
||
|
||
Args:
|
||
compile_error: 编译错误日志
|
||
original_code: 原始代码
|
||
|
||
Returns:
|
||
修正 Prompt
|
||
"""
|
||
# 截取关键错误信息
|
||
error_lines = compile_error.split('\n')
|
||
key_errors = []
|
||
for line in error_lines:
|
||
line = line.strip()
|
||
if any(kw in line.lower() for kw in ['error', 'syntax', 'fatal', 'undefined', 'illegal']):
|
||
key_errors.append(line)
|
||
if len(key_errors) > 10: # 最多 10 条关键错误
|
||
break
|
||
|
||
prompt = f"""
|
||
[COMPILATION ERROR FIX REQUEST]
|
||
|
||
The Verilog test code failed to compile with Verilator. Here are the key errors:
|
||
|
||
```
|
||
{chr(10).join(key_errors) if key_errors else compile_error[:1000]}
|
||
```
|
||
|
||
[ORIGINAL CODE]
|
||
```verilog
|
||
{original_code[:2000]} // Truncated if too long
|
||
```
|
||
|
||
[COMMON VERILOG ISSUES TO CHECK]
|
||
|
||
1. **Width mismatch**: Assigning wide values to narrow signals
|
||
- Problem: `{{in}} = 8'b01111100;` where `in` is 1-bit
|
||
- Fix: Use shift register to input bits one at a time
|
||
|
||
2. **Undefined signals**: Using signals that are not declared
|
||
- Check spelling of signal names against the testbench
|
||
|
||
3. **Syntax errors**: Missing semicolons, mismatched begin/end
|
||
- Check all statements end with semicolon
|
||
- Ensure all `begin` have matching `end`
|
||
|
||
4. **Timescale issues**: Missing timescale directive
|
||
- The testbench should have `timescale 1ns / 1ps`
|
||
|
||
[YOUR TASK]
|
||
Generate a CORRECTED version of the test code that will compile successfully.
|
||
Focus on fixing the specific errors shown above.
|
||
|
||
[OUTPUT FORMAT]
|
||
Return ONLY the corrected Verilog test scenario code:
|
||
```verilog
|
||
// Your corrected test code here
|
||
```
|
||
"""
|
||
return prompt
|
||
|
||
|
||
|
||
def run(self):
|
||
logger.info(f"[{self.task_id}] Starting Coverage-Guided Agent (CGA)...")
|
||
|
||
# 1. 确保工作目录存在 (saves/任务名/5_CGA)
|
||
work_dir = os.path.join(self.task_dir, self.work_subdir)
|
||
if os.path.exists(work_dir):
|
||
shutil.rmtree(work_dir)
|
||
os.makedirs(work_dir, exist_ok=True)
|
||
|
||
# === [新增] Step 0: 语义分析 ===
|
||
logger.info(f"[{self.task_id}] Running Semantic Analysis (Layer 0)...")
|
||
self.semantic_result = None
|
||
try:
|
||
semantic_analyzer = SemanticAnalyzer(self.DUT_code)
|
||
self.semantic_result = semantic_analyzer.analyze()
|
||
|
||
# 记录分析结果摘要
|
||
fp_count = len(self.semantic_result.get('function_points', []))
|
||
fsm_info = semantic_analyzer.get_fsm_info()
|
||
if fsm_info:
|
||
logger.info(f" FSM detected: {fsm_info.get('state_variable', 'unknown')} "
|
||
f"({len(fsm_info.get('states', []))} states)")
|
||
logger.info(f" Total function points identified: {fp_count}")
|
||
|
||
# 保存语义分析报告
|
||
semantic_report = semantic_analyzer.generate_prompt_context()
|
||
ls.save_code(semantic_report, os.path.join(work_dir, "semantic_analysis.txt"))
|
||
|
||
# === [新增] Step 0.1: 初始化能量分配器 ===
|
||
if self.semantic_result.get('function_points'):
|
||
self.energy_allocator = EnergyAllocator(max_iterations=self.max_iter)
|
||
energy_init_result = self.energy_allocator.initialize(
|
||
self.semantic_result['function_points']
|
||
)
|
||
logger.info(f" Energy allocator initialized: {energy_init_result['targets']} targets")
|
||
|
||
# === [新增] Step 0.2: 初始化多样性约束注入器 ===
|
||
history_file = os.path.join(work_dir, "test_history.json")
|
||
# 创建 TestHistoryManager 并传递 history_file
|
||
history_manager = TestHistoryManager(history_file=history_file)
|
||
self.diversity_injector = DiversityInjector(history_manager=history_manager)
|
||
logger.info(f" Diversity injector initialized with history file: {history_file}")
|
||
|
||
# === [新增] Step 0.3: 初始化质量评估器 ===
|
||
if self.semantic_result.get('function_points'):
|
||
self.quality_evaluator = QualityEvaluator(
|
||
function_points=self.semantic_result['function_points']
|
||
)
|
||
logger.info(f" Quality evaluator initialized")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Semantic analysis failed: {e}. Continuing without semantic guidance.")
|
||
# ================================
|
||
|
||
current_tb = self.TB_code
|
||
last_annotated_file = None
|
||
|
||
# --- Baseline ---
|
||
logger.info(f"--- CGA Iter 0 (Baseline) ---")
|
||
iter0_dir = os.path.join(work_dir, "iter_0")
|
||
os.makedirs(iter0_dir, exist_ok=True)
|
||
|
||
self._prepare_dut(iter0_dir)
|
||
ls.save_code(current_tb, os.path.join(iter0_dir, "driver.v"))
|
||
|
||
success, score, annotated_path = verilator_run_coverage(iter0_dir, "DUT.v", "driver.v")
|
||
|
||
self.best_score = score
|
||
self.best_tb = current_tb
|
||
last_annotated_file = annotated_path
|
||
baseline_snapshot = self._extract_coverage_snapshot(annotated_path)
|
||
self.best_covered_lines = set(baseline_snapshot["covered_lines"])
|
||
self.best_covered_functions = set(baseline_snapshot["covered_functions"])
|
||
|
||
if self.energy_allocator and self.best_covered_functions:
|
||
self.energy_allocator.mark_targets_completed(sorted(self.best_covered_functions))
|
||
|
||
if self.quality_evaluator and self.best_covered_functions:
|
||
self.quality_evaluator.semantic_coverage.update_coverage(
|
||
covered_lines=self.best_covered_lines,
|
||
covered_functions=sorted(self.best_covered_functions),
|
||
test_id="iter_0",
|
||
iteration=0
|
||
)
|
||
|
||
logger.info(f"Baseline Coverage: {score:.2f}%")
|
||
|
||
if score >= self.target_coverage:
|
||
logger.success(f"Target reached at baseline!")
|
||
# [修改] 返回元组 (代码, 分数)
|
||
return self.best_tb, self.best_score
|
||
|
||
# --- Loop ---
|
||
for i in range(1, self.max_iter + 1):
|
||
logger.info(f"--- CGA Iter {i} / {self.max_iter} ---")
|
||
iter_dir = os.path.join(work_dir, f"iter_{i}")
|
||
os.makedirs(iter_dir, exist_ok=True)
|
||
|
||
# === [新增] 能量检查:是否还有活跃目标 ===
|
||
if self.energy_allocator:
|
||
current_target = self.energy_allocator.select_next_target()
|
||
if not current_target:
|
||
logger.info("No more active targets with remaining energy. Stopping.")
|
||
break
|
||
logger.info(f"Target: {current_target}")
|
||
# =========================================
|
||
|
||
if not last_annotated_file: break
|
||
|
||
# [修改] 传递语义分析结果、能量分配器、多样性注入器、DUT代码给 CoverageParser
|
||
parser = CoverageParser(
|
||
last_annotated_file,
|
||
tb_code=self.best_tb,
|
||
semantic_result=self.semantic_result,
|
||
energy_allocator=self.energy_allocator,
|
||
diversity_injector=self.diversity_injector, # [新增]
|
||
dut_code=self.DUT_code # [新增] 传递 DUT 代码以提取信号名
|
||
)
|
||
prompt = parser.generate_prompt(self.best_score)
|
||
|
||
# [修改] 改进停止条件:即使找不到 missing_blocks,只要覆盖率未达标就继续
|
||
if not prompt:
|
||
if self.best_score >= self.target_coverage:
|
||
logger.success(f"Target coverage reached: {self.best_score:.2f}%")
|
||
break
|
||
else:
|
||
# 覆盖率未达标但找不到明确的 missing_blocks
|
||
# 尝试生成随机探索 Prompt
|
||
logger.warning(f"No reachable missing blocks found, but coverage ({self.best_score:.2f}%) < target ({self.target_coverage}%).")
|
||
logger.info(f"Attempting random exploration to discover uncovered paths...")
|
||
prompt = self._generate_exploration_prompt(i)
|
||
|
||
if not prompt:
|
||
logger.info("Could not generate exploration prompt. Stopping.")
|
||
break
|
||
|
||
ls.save_code(prompt, os.path.join(iter_dir, "prompt.txt"))
|
||
logger.info(f"Asking LLM to fix missing logic (Current: {self.best_score:.2f}%)...")
|
||
messages = [{"role": "user", "content": prompt}]
|
||
|
||
try:
|
||
response, _ = llm.llm_call(messages, self.model)
|
||
ls.save_code(response, os.path.join(iter_dir, "llm_response.txt"))
|
||
codes = llm.extract_code(response, "verilog")
|
||
new_task_code = codes[0] if codes else ""
|
||
if not new_task_code:
|
||
# [新增] 记录失败
|
||
if self.energy_allocator:
|
||
self.energy_allocator.record_generation(
|
||
success=False,
|
||
coverage_delta=0.0,
|
||
energy_cost=1.0
|
||
)
|
||
continue
|
||
except Exception as e:
|
||
logger.error(f"LLM Call failed: {e}")
|
||
# [新增] 记录失败
|
||
if self.energy_allocator:
|
||
self.energy_allocator.record_generation(
|
||
success=False,
|
||
coverage_delta=0.0,
|
||
energy_cost=1.0
|
||
)
|
||
break
|
||
|
||
ls.save_code(new_task_code, os.path.join(iter_dir, "generated_scenario.v"))
|
||
injector = TBInjector(self.best_tb)
|
||
enhanced_tb = injector.inject(new_task_code, iter_idx=i)
|
||
|
||
# [新增] 检查语法预检查结果
|
||
validation_result = injector.last_validation_result
|
||
syntax_issues = validation_result.get('syntax_check', {}) if validation_result else {}
|
||
|
||
if syntax_issues.get('should_retry', False):
|
||
logger.warning(f"[CGA-{i}] Syntax issues detected in generated code. Attempting retry...")
|
||
# 生成修正后的 Prompt,包含语法问题提示
|
||
retry_prompt = self._generate_syntax_fix_prompt(new_task_code, syntax_issues, prompt)
|
||
if retry_prompt:
|
||
try:
|
||
retry_response, _ = llm.llm_call([{"role": "user", "content": retry_prompt}], self.model)
|
||
retry_codes = llm.extract_code(retry_response, "verilog")
|
||
if retry_codes:
|
||
new_task_code = retry_codes[0]
|
||
ls.save_code(retry_prompt, os.path.join(iter_dir, "retry_prompt.txt"))
|
||
ls.save_code(retry_response, os.path.join(iter_dir, "retry_response.txt"))
|
||
ls.save_code(new_task_code, os.path.join(iter_dir, "generated_scenario_retry.v"))
|
||
enhanced_tb = injector.inject(new_task_code, iter_idx=i)
|
||
logger.info(f"[CGA-{i}] Retry code generated successfully")
|
||
except Exception as e:
|
||
logger.warning(f"[CGA-{i}] Retry failed: {e}")
|
||
|
||
self._prepare_dut(iter_dir)
|
||
ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
|
||
|
||
success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
|
||
|
||
# [新增] 编译失败时的错误反馈机制
|
||
if not success:
|
||
compile_error = self._get_compile_error(iter_dir)
|
||
if compile_error:
|
||
logger.error(f"[CGA-{i}] Verilator compilation failed:")
|
||
logger.error(compile_error[:500]) # 截断过长的错误信息
|
||
|
||
# 尝试让 LLM 修正编译错误(最多 1 次重试)
|
||
if not hasattr(self, '_compile_retry_count'):
|
||
self._compile_retry_count = {}
|
||
self._compile_retry_count[i] = self._compile_retry_count.get(i, 0)
|
||
|
||
if self._compile_retry_count[i] < 1:
|
||
logger.info(f"[CGA-{i}] Asking LLM to fix compilation errors...")
|
||
fix_prompt = self._generate_compile_fix_prompt(compile_error, new_task_code)
|
||
try:
|
||
fix_response, _ = llm.llm_call([{"role": "user", "content": fix_prompt}], self.model)
|
||
fix_codes = llm.extract_code(fix_response, "verilog")
|
||
if fix_codes:
|
||
fixed_code = fix_codes[0]
|
||
ls.save_code(fix_prompt, os.path.join(iter_dir, "compile_fix_prompt.txt"))
|
||
ls.save_code(fix_response, os.path.join(iter_dir, "compile_fix_response.txt"))
|
||
ls.save_code(fixed_code, os.path.join(iter_dir, "generated_scenario_compile_fix.v"))
|
||
enhanced_tb = injector.inject(fixed_code, iter_idx=i)
|
||
ls.save_code(enhanced_tb, os.path.join(iter_dir, "driver.v"))
|
||
|
||
# 再次尝试编译
|
||
success, new_score, new_annotated_path = verilator_run_coverage(iter_dir, "DUT.v", "driver.v")
|
||
if success:
|
||
logger.info(f"[CGA-{i}] Compilation fixed! Score: {new_score:.2f}%")
|
||
new_task_code = fixed_code
|
||
except Exception as e:
|
||
logger.warning(f"[CGA-{i}] Compile fix attempt failed: {e}")
|
||
self._compile_retry_count[i] += 1
|
||
|
||
coverage_snapshot = self._extract_coverage_snapshot(new_annotated_path) if success else {
|
||
"covered_lines": set(),
|
||
"covered_functions": set(),
|
||
"coverable_lines": set(),
|
||
}
|
||
current_covered_lines = set(coverage_snapshot["covered_lines"])
|
||
current_covered_functions = set(coverage_snapshot["covered_functions"])
|
||
newly_covered_lines = current_covered_lines - self.best_covered_lines
|
||
newly_covered_functions = current_covered_functions - self.best_covered_functions
|
||
|
||
# === [新增] 记录生成结果到能量分配器 ===
|
||
coverage_delta = new_score - self.best_score if success else 0.0
|
||
current_target_name = None
|
||
target_hit = False
|
||
if self.energy_allocator and self.energy_allocator.current_target:
|
||
current_target_name = self.energy_allocator.current_target.function_point
|
||
if current_covered_functions:
|
||
target_hit = current_target_name in current_covered_functions
|
||
else:
|
||
target_hit = success and new_score > self.best_score
|
||
generation_success = success and target_hit
|
||
|
||
if self.energy_allocator:
|
||
self.energy_allocator.record_generation(
|
||
success=generation_success,
|
||
coverage_delta=coverage_delta,
|
||
energy_cost=1.0
|
||
)
|
||
extra_completed_functions = set(newly_covered_functions)
|
||
if generation_success and current_target_name:
|
||
extra_completed_functions.discard(current_target_name)
|
||
if extra_completed_functions:
|
||
self.energy_allocator.mark_targets_completed(sorted(extra_completed_functions))
|
||
# =========================================
|
||
|
||
# === [新增] 记录测试用例到多样性历史 ===
|
||
if self.diversity_injector:
|
||
# 提取已知信号
|
||
known_signals = []
|
||
if self.semantic_result:
|
||
known_signals = [p.get('name', '') for p in self.semantic_result.get('ports', [])]
|
||
|
||
self.diversity_injector.record_test(
|
||
code=new_task_code,
|
||
target_function=self.energy_allocator.current_target.function_point if self.energy_allocator and self.energy_allocator.current_target else "",
|
||
coverage_score=new_score,
|
||
success=generation_success,
|
||
iteration=i,
|
||
known_signals=known_signals
|
||
)
|
||
# =======================================
|
||
|
||
# === [新增] Layer 3: 质量评估 ===
|
||
if self.quality_evaluator:
|
||
# 评估测试用例质量
|
||
eval_result = self.quality_evaluator.evaluate_test_case(
|
||
code=new_task_code,
|
||
covered_lines=newly_covered_lines,
|
||
covered_functions=sorted(newly_covered_functions),
|
||
test_id=f"iter_{i}",
|
||
iteration=i
|
||
)
|
||
|
||
# 记录多样性得分
|
||
diversity_score = eval_result.get('diversity', {}).get('overall_score', 0)
|
||
logger.info(f" Quality Evaluation: diversity={diversity_score:.2f}")
|
||
|
||
# 检查是否应该接受该测试用例
|
||
should_accept, reason = self.quality_evaluator.should_accept(eval_result)
|
||
if not should_accept:
|
||
logger.warning(f" Quality check failed: {reason}")
|
||
# =====================================
|
||
|
||
if success and new_score > self.best_score:
|
||
improvement = new_score - self.best_score
|
||
logger.success(f"Coverage Improved! +{improvement:.2f}% ({self.best_score:.2f}% -> {new_score:.2f}%)")
|
||
self.best_score = new_score
|
||
self.best_tb = enhanced_tb
|
||
last_annotated_file = new_annotated_path
|
||
self.best_covered_lines = current_covered_lines
|
||
self.best_covered_functions = current_covered_functions
|
||
elif success and new_score == self.best_score:
|
||
logger.info(f"Coverage unchanged. Keeping previous.")
|
||
else:
|
||
logger.warning(f"Regression or Failure. Discarding changes.")
|
||
|
||
if self.best_score >= self.target_coverage:
|
||
logger.success("Target coverage reached!")
|
||
break
|
||
|
||
logger.info(f"CGA Finished. Final Coverage: {self.best_score:.2f}%")
|
||
|
||
# === [新增] 生成能量分配报告 ===
|
||
if self.energy_allocator:
|
||
energy_report = self.energy_allocator.generate_report()
|
||
ls.save_code(energy_report, os.path.join(work_dir, "energy_report.txt"))
|
||
logger.info(f"Energy report saved to {work_dir}/energy_report.txt")
|
||
# =================================
|
||
|
||
# === [新增] 生成多样性报告并保存历史 ===
|
||
if self.diversity_injector:
|
||
diversity_report = self.diversity_injector.generate_diversity_report()
|
||
ls.save_code(diversity_report, os.path.join(work_dir, "diversity_report.txt"))
|
||
logger.info(f"Diversity report saved to {work_dir}/diversity_report.txt")
|
||
|
||
# 保存测试历史
|
||
self.diversity_injector.history.save()
|
||
# ======================================
|
||
|
||
# === [新增] Layer 3: 生成质量评估报告 ===
|
||
if self.quality_evaluator:
|
||
quality_report = self.quality_evaluator.generate_report()
|
||
ls.save_code(quality_report, os.path.join(work_dir, "quality_evaluation_report.txt"))
|
||
logger.info(f"Quality evaluation report saved to {work_dir}/quality_evaluation_report.txt")
|
||
|
||
# 输出语义覆盖率摘要
|
||
coverage_result = self.quality_evaluator.semantic_coverage.calculate_coverage()
|
||
logger.info(f"Semantic Coverage: {coverage_result.semantic_coverage:.2%}")
|
||
# ===========================================
|
||
|
||
# [修改] 返回元组 (代码, 分数)
|
||
return self.best_tb, self.best_score
|