""" Description : analyze the output from autoline mode. Author : Ruidi Qiu (r.qiu@tum.de) Time : 2023/12/12 17:35:00 LastEdited : 2024/9/17 23:35:03 """ import loader_saver as ls import utils.utils as utils from data.probset import dictlist, HDLBitsProbset, muti_dictlist from LLM_call import PRICING_MODELS import os import math LOOSE_FACTOR = 0.8 DEFAULT_SAVING_DIR = "analysis" DEFAULT_LOG_NAME = "analyze_out.log" DEFAULT_LOG_PATH = os.path.join(DEFAULT_SAVING_DIR, DEFAULT_LOG_NAME) K_LIST = [1] # insert your Chatbench_RunInfo.json's path here to re-analyze the data, then run this file directly. the result will be saved in analysis/analyze_out.log CHATBENCH_RUNINFO_PATH = "saves_inEDA/DATE25/Main_Results/CorrectBench/disc_70wrong_25correct_20240831_181427/Chatbench_RunInfo.json" # this is for multiple directories' analysis, insert the directory path here and change the main function to regular_multiA_main, then run the file directly. the result will be saved in analysis/analyze_out.log MULTI_DIR = "saves_inEDA/DATE25/Main_Results/CorrectBench" def main(): # diy_main() regular_main() # regular_multiA_main() def diy_main(): # find the passed with correction Chatbench_RunInfo = ls.load_json_dict(CHATBENCH_RUNINFO_PATH) # analyze the data analyzer = Analyzer(Chatbench_RunInfo) analyzer.run() for i in analyzer.data: if i.get("TB_corrected", False) and analyzer.Eval2_pass(i): print(i["task_id"]) # save the result to a txt file # with open(DEFAULT_LOG_PATH, "w") as f: # f.write(analyzer.messages) # # also, write the current time # f.write("analysis time: %s\n" % (utils.get_time())) # with open(DEFAULT_LOG_PATH, "w") as f: # f.write("analysis time: %s\n" % (utils.get_time())) # # task_eval2passtimes_analyze() # full_analyzer = Analyzer(ls.load_json_dict(CHATBENCH_RUNINFO_PATH)) # full_analyzer.run() # with open(DEFAULT_LOG_PATH, "a") as f: # f.write(full_analyzer.messages) # analyze_subset(HDLBitsProbset("data/HDLBits/HDLBits_circuit_type.jsonl", filter_content={"circuit_type": "CMB"}), CHATBENCH_RUNINFO_PATH, "full CMB") # analyze_subset(HDLBitsProbset("data/HDLBits/HDLBits_circuit_type.jsonl", filter_content={"circuit_type": "SEQ"}), CHATBENCH_RUNINFO_PATH, "full SEQ") # analyze_subset("data/HDLBits/HDLBits_data_CMB15.jsonl", CHATBENCH_RUNINFO_PATH, "CMB15") # analyze_subset("data/HDLBits/HDLBits_data_SEQ15.jsonl", CHATBENCH_RUNINFO_PATH, "SEQ15") k_list = [1] # Eval_scenchecks = ["Eval0_scencheck", "Eval0_noscencheck", "Eval1_scencheck", "Eval1_noscencheck", "Eval2_scencheck", "Eval2_noscencheck"] # Eval_scenchecks = ["Eval2b", "Eval2"] multi_analyzer = MultiAnalyzer(MULTI_DIR, k_list) # multi_analyzer.exclude_debug = True multi_analyzer.messages += "\n#################### TOTAL ####################\n" multi_analyzer.run() multi_analyzer.get_avg_tokens_one_task() multi_analyzer.get_avg_pass_by_disc_and_corr() multi_analyzer.save() CMB_set = HDLBitsProbset("data/HDLBits/HDLBits_circuit_type.jsonl", filter_content={"circuit_type": "CMB"}) SEQ_set = HDLBitsProbset("data/HDLBits/HDLBits_circuit_type.jsonl", filter_content={"circuit_type": "SEQ"}) CMB_tasks = CMB_set.task_id_list SEQ_tasks = SEQ_set.task_id_list multi_analyzer_CMB = MultiAnalyzer(MULTI_DIR, k_list) # multi_analyzer_CMB.exclude_debug = True multi_analyzer_CMB.del_items(SEQ_tasks, del_by_list=True) # print(multi_analyzer_CMB.access("total_num")) multi_analyzer_CMB.messages += "\n#################### CMB ####################\n" multi_analyzer_CMB.run() multi_analyzer_CMB.get_avg_tokens_one_task() multi_analyzer_CMB.get_avg_pass_by_disc_and_corr() multi_analyzer_CMB.save(os.path.join(DEFAULT_SAVING_DIR, "CMB_" + DEFAULT_LOG_NAME)) multi_analyzer_SEQ = MultiAnalyzer(MULTI_DIR, k_list) # multi_analyzer_SEQ.exclude_debug = True multi_analyzer_SEQ.del_items(CMB_tasks, del_by_list=True) multi_analyzer_SEQ.messages += "\n#################### SEQ ####################\n" multi_analyzer_SEQ.run() multi_analyzer_SEQ.get_avg_tokens_one_task() multi_analyzer_SEQ.get_avg_pass_by_disc_and_corr() multi_analyzer_SEQ.save(os.path.join(DEFAULT_SAVING_DIR, "SEQ_" + DEFAULT_LOG_NAME)) # multi_analyzer_SEQ.save(os.path.join(DEFAULT_SAVING_DIR, "SEQ_" + DEFAULT_LOG_NAME)) # show the pass num for seq15 of each try # multi_analyzer = MultiAnalyzer(MULTI_DIR) # multi_analyzer.exclude_debug = True # SEQ15_set = HDLBitsProbset("data/HDLBits/HDLBits_data_SEQ15.jsonl") # SEQ15_tasks = SEQ15_set.task_id_list # multi_analyzer.del_items(SEQ15_tasks, del_by_list=False) # multi_analyzer.renew_result_dict() # multi_analyzer.run() # print(multi_analyzer.access("total_num")) # print(multi_analyzer.access("fullpass_num_nodebug")) # print(sum(multi_analyzer.access("fullpass_num_nodebug"))) # for task in multi_analyzer.result_dict.data: # print(task["task_id"] + ": " + str(round(task["Eval2_pass_at_1"], 2))) # k_list = [1,5,10] # multi_analyzer = MultiAnalyzer(MULTI_DIR, k_list) # data_list = multi_analyzer.dictlists # CMB_num_list = [] # for data in data_list: # CMB_num = 0 # for i in data.data: # if i.get("circuit_type", "NO data") == "CMB": # CMB_num += 1 # CMB_num_list.append(CMB_num) # print(CMB_num_list) # k_list = [1,5,10] # multi_analyzer = MultiAnalyzer(MULTI_DIR, k_list) # # multi_analyzer.exclude_debug = True # multi_analyzer.messages += "\n#################### TOTAL ####################\n" # multi_analyzer.run() # multi_analyzer.save() # CMB_set = HDLBitsProbset("data/HDLBits/HDLBits_circuit_type.jsonl", filter_content={"circuit_type": "CMB"}) # SEQ_set = HDLBitsProbset("data/HDLBits/HDLBits_circuit_type.jsonl", filter_content={"circuit_type": "SEQ"}) # CMB_tasks = CMB_set.task_id_list # SEQ_tasks = SEQ_set.task_id_list # multi_analyzer_CMB = MultiAnalyzer(MULTI_DIR, k_list) # # multi_analyzer_CMB.exclude_debug = True # multi_analyzer_CMB.del_items(SEQ_tasks, del_by_list=True) # # print(multi_analyzer_CMB.access("total_num")) # multi_analyzer_CMB.messages += "\n#################### CMB ####################\n" # multi_analyzer_CMB.run() # multi_analyzer_CMB.save(os.path.join(DEFAULT_SAVING_DIR, "CMB_" + DEFAULT_LOG_NAME)) # multi_analyzer_SEQ = MultiAnalyzer(MULTI_DIR, k_list) # # multi_analyzer_SEQ.exclude_debug = True # multi_analyzer_SEQ.del_items(CMB_tasks, del_by_list=True) # multi_analyzer_SEQ.messages += "\n#################### SEQ ####################\n" # multi_analyzer_SEQ.run() # multi_analyzer_SEQ.save(os.path.join(DEFAULT_SAVING_DIR, "SEQ_" + DEFAULT_LOG_NAME)) # k_list = [1,3,5,10] # multi_analyzer = MultiAnalyzer(MULTI_DIR, k_list) # multi_analyzer.exclude_debug = True # multi_analyzer.messages += "\n#################### TOTAL ####################\n" # multi_analyzer.run() # multi_analyzer.save() # CMB_set = HDLBitsProbset("data/HDLBits/HDLBits_circuit_type.jsonl", filter_content={"circuit_type": "CMB"}) # SEQ_set = HDLBitsProbset("data/HDLBits/HDLBits_circuit_type.jsonl", filter_content={"circuit_type": "SEQ"}) # CMB_tasks = CMB_set.task_id_list # SEQ_tasks = SEQ_set.task_id_list # multi_analyzer_CMB = MultiAnalyzer(MULTI_DIR, k_list) # multi_analyzer_CMB.del_items(SEQ_tasks, del_by_list=True) # multi_analyzer_CMB.exclude_debug = True # multi_analyzer_CMB.messages += "\n#################### CMB ####################\n" # multi_analyzer_CMB.run() # multi_analyzer_CMB.save(os.path.join(DEFAULT_SAVING_DIR, "CMB_" + DEFAULT_LOG_NAME)) # multi_analyzer_SEQ = MultiAnalyzer(MULTI_DIR, k_list) # multi_analyzer_SEQ.del_items(CMB_tasks, del_by_list=True) # multi_analyzer_SEQ.exclude_debug = True # multi_analyzer_SEQ.messages += "\n#################### SEQ ####################\n" # multi_analyzer_SEQ.run() # multi_analyzer_SEQ.save(os.path.join(DEFAULT_SAVING_DIR, "SEQ_" + DEFAULT_LOG_NAME)) # Chatbench_RunInfo = ls.load_json_dict(CHATBENCH_RUNINFO_PATH) # analyzer = Analyzer(Chatbench_RunInfo) # # task 2024/04/25 17:09:39, extract SEQ15 info from SEQ # Chatbench_RunInfo = ls.load_json_dict(CHATBENCH_RUNINFO_PATH) # seq15 = HDLBitsProbset("data/HDLBits/HDLBits_data_SEQ15.jsonl") # seq15_taskids = seq15.task_id_list # analyzer = Analyzer(Chatbench_RunInfo) # analyzer.del_items(seq15_taskids, False) # # analyzer.filter({"debug_iter_iv": 0}) # analyzer.run() # with open(DEFAULT_LOG_PATH, "w") as f: # f.write(analyzer.messages) # # also, write the current time # f.write("analysis time: %s\n" % (utils.get_time())) # Chatbench_RunInfo = ls.load_json_dict(CHATBENCH_RUNINFO_PATH) # # analyze the data # analyzer = Analyzer(Chatbench_RunInfo) # analyzer.filter({"debug_iter_iv": 0}) # analyzer.run() # # save the result to a txt file # with open(DEFAULT_LOG_PATH, "w") as f: # f.write(analyzer.messages) # # also, write the current time # f.write("analysis time: %s\n" % (utils.get_time())) pass def regular_multiA_main(): multi_analyzer = MultiAnalyzer(MULTI_DIR) multi_analyzer.run() multi_analyzer.save() def regular_main(): Chatbench_RunInfo = ls.load_json_dict(CHATBENCH_RUNINFO_PATH) # analyze the data analyzer = Analyzer(Chatbench_RunInfo) analyzer.run() # save the result to a txt file with open(DEFAULT_LOG_PATH, "w") as f: f.write(analyzer.messages) # also, write the current time f.write("analysis time: %s\n" % (utils.get_time())) def analyze_subset(subset:str|HDLBitsProbset, runinfo_path, subset_name=""): """ this function is used to only analyze a subset of the runinfo data - subset (only the task_ids are needed): - str: path of the subset - HDLBitsProbset: the subset - runinfo_path: path of the Chatbench_RunInfo.json - subset_name: the name of the subset """ if isinstance(subset, str): # path, load the subset subset = HDLBitsProbset(subset) elif isinstance(subset, HDLBitsProbset): subset = subset else: raise TypeError("subset should be a path or a HDLBitsProbset") subset_tasks = subset.task_id_list analyzer = Analyzer(ls.load_json_dict(runinfo_path)) analyzer.del_items(subset_tasks, False) analyzer.out_txt += f"\n#################### {subset_name} ####################\n" analyzer.run() with open(DEFAULT_LOG_PATH, "a") as f: f.write(analyzer.messages) f.write("analysis time: %s\n" % (utils.get_time())) class Analyzer(HDLBitsProbset): def __init__(self, Chatbench_RunInfo, pricing_model="gpt-4o-2024-05-13"): super().__init__() self.data = Chatbench_RunInfo self.check_existance() self.pricing_model = pricing_model self.out_txt = "" self.loose_factor = LOOSE_FACTOR def run(self): self.out_txt += "\n########## Analyze of Chatbench_RunInfo ##########\n" self.out_txt += "\n#### pass numbers:\n" if self.Eval2b_exist: self.out_txt += "Eval2b: %d\n" % self.Eval2bpass_num self.out_txt += "Eval2 : %d\n" % self.fullpass_num self.out_txt += "Eval1 : %d\n" % self.Eval1pass_num self.out_txt += "Eval0 : %d\n" % self.Eval0pass_num self.out_txt += "total : %d " % self.total_num self.out_txt += "(Failed: %d)\n" % (self.total_num - self.Eval0pass_num) if self.reboot_times_exist: self.out_txt += "passed TB by autoline reboot action (from TB3_check): %d\n" % self.autoline_reboot_task_num if self.TB_corrected_exist: self.out_txt += "\npassed TB by functional corrector: %d\n" % self.corrected_num # === [新增] CGA Coverage 统计 === # 计算平均覆盖率 total_coverage = 0.0 max_coverage = 0.0 min_coverage = 100.0 has_cov_data = False for task in self.data: cov = task.get("coverage", 0.0) if cov > 0: has_cov_data = True total_coverage += cov if cov > max_coverage: max_coverage = cov if cov < min_coverage: min_coverage = cov avg_cov = total_coverage / self.total_num if self.total_num > 0 else 0.0 self.out_txt += "\n#### CGA Coverage Info:\n" self.out_txt += "Average Coverage : %.2f%%\n" % avg_cov if has_cov_data: self.out_txt += "Max Coverage : %.2f%%\n" % max_coverage self.out_txt += "Min Coverage : %.2f%%\n" % min_coverage else: self.out_txt += "(No coverage data found in JSON)\n" # =============================== # self.out_txt += self.get_avg_debug_iter_on_sim_pass_with_debug()[-1] # self.out_txt += self.get_debug_failed_num()[-1] # self.out_txt += self.get_debug_total_pass_num()[-1] # self.out_txt += self.get_debug_sim_pass_num()[-1] self.out_txt += "\n#### tokens and cost:\n" # self.out_txt += "average prompt tokens: %d\naverage completion tokens: %d\n" % (self.prompt_tokens_num/self.total_num, self.completion_tokens_num/self.total_num) self.out_txt += "average prompt tokens: %d\n" % (self.prompt_tokens_num / self.total_num) self.out_txt += "average completion tokens: %d\n" % (self.completion_tokens_num / self.total_num) self.out_txt += "total cost: %.4f\n" % self.cost self.out_txt += "average cost: %.4f\n" % self.avg_cost self.out_txt += "\n#### time:\n" self.out_txt += "average time: %.2fs\n" % self.avg_time self.out_txt += "\n#### debug info table:\n" self.out_txt += self.get_debug_infotable() self.out_txt += "\n#### Eval2 ratio:\n" self.out_txt += self.get_eval2_ratio_each_problem() # === [新增] 每个任务的覆盖率详情 === self.out_txt += "\n#### CGA Coverage Detail List:\n" self.out_txt += f"{'Task ID':<25} | {'Coverage':<10}\n" self.out_txt += "-" * 40 + "\n" for task in self.data: tid = task.get("task_id", "Unknown") cov = task.get("coverage", 0.0) self.out_txt += f"{tid:<25} | {cov:.2f}%\n" # ================================== if self.Eval2b_exist: self.out_txt += "\n#### Eval2b ratio:\n" self.out_txt += self.get_eval2b_ratio_each_problem() # self.get_iv_runing_time_info() self.out_txt += "\nloose Eval2 pass metric applied: %s\n\n" % self.loose_factor # 将生成的 out_txt 同步给 messages 属性,保证 save 时能写入文件 def find_fake_eval0pass(self): self.filter({"sim_pass": 1}) task_ids_fake_eval0pass = [] for i in self.data: if i.get("Eval1_pass","NO data") == "NO data": task_ids_fake_eval0pass.append(i["task_id"]) self.out_txt += "fake Eval0 pass: %d\n" % len(task_ids_fake_eval0pass) for i in task_ids_fake_eval0pass: self.out_txt += i + "\n" def check_existance(self): self.Eval2b_exist = False self.TB_corrected_exist = False self.reboot_times_exist = False for i in self.data: if "Eval2b_pass" in i.keys(): self.Eval2b_exist = True if "TB_corrected" in i.keys(): self.TB_corrected_exist = True if "reboot_times" in i.keys(): self.reboot_times_exist = True if self.Eval2b_exist and self.TB_corrected_exist and self.reboot_times_exist: break # task def draw_Eval2_histogram(self, figurename="eval2_histogram.png"): import matplotlib.pyplot as plt import matplotlib matplotlib.use('Agg') ratios = [] for i in self.data: if self.Eval0_pass(i) and self.Eval1_pass(i): # get the numerical ratio ratio_str = i.get("Eval2_ratio", None) if ratio_str is None: continue numerator, denominator = ratio_str.split("/") ratio = float(numerator) / float(denominator) ratios.append(ratio) # draw histogram of ratios, it will have 10 bars, 0~10%, 10~20%, ..., 90~100% plt.hist(ratios, bins=10, range=(0,1)) # save to analysis/eval2_histogram.png plt.savefig(os.path.join(DEFAULT_SAVING_DIR, figurename)) plt.close() @property def messages(self): return self.out_txt @property def total_num(self): if not hasattr(self, "_total_num"): self._total_num = len(self.data) return self._total_num @property def fullpass_num(self): if not hasattr(self, "_fullpass_num"): self._fullpass_num = 0 for i in self.data: if self.Eval0_pass(i) and i.get("Eval1_pass",0) and self.Eval2_pass(i): self._fullpass_num += 1 return self._fullpass_num @property def fullpass_num_nodebug(self): if not hasattr(self, "_fullpass_num"): self._fullpass_num = 0 for i in self.data: if self.Eval0_pass(i) and i.get("Eval1_pass",0) and self.Eval2_pass(i) and self.debug_iter(i) == 0: self._fullpass_num += 1 return self._fullpass_num @property def Eval2bpass_num(self): if not hasattr(self, "_Eval2bpass_num"): self._Eval2bpass_num = 0 if self.Eval2b_exist: for i in self.data: if self.Eval0_pass(i) and i.get("Eval1_pass",0) and i.get("Eval2b_pass",0): self._Eval2bpass_num += 1 return self._Eval2bpass_num @property def Eval0pass_num(self): if not hasattr(self, "_Eval0pass_num"): self._Eval0pass_num = 0 for i in self.data: if self.Eval0_pass(i): self._Eval0pass_num += 1 return self._Eval0pass_num @property def Eval1pass_num(self): if not hasattr(self, "_Eval1pass_num"): self._Eval1pass_num = 0 for i in self.data: if self.Eval0_pass(i) and i.get("Eval1_pass",0): self._Eval1pass_num += 1 return self._Eval1pass_num @property def corrected_num(self): if not hasattr(self, "_corrected_num"): self._corrected_num = 0 if self.TB_corrected_exist: for i in self.data: if i.get("TB_corrected",0) and self.Eval2_pass(i): self._corrected_num += 1 return self._corrected_num @property def autoline_reboot_task_num(self): if not hasattr(self, "_autoline_reboot_task_num"): self._autoline_reboot_task_num = 0 if self.reboot_times_exist: for i in self.data: if (i.get("reboot_times",0) > 0) and self.Eval2_pass(i): self._autoline_reboot_task_num += 1 return self._autoline_reboot_task_num @property def avg_time(self): if not hasattr(self, "_avg_time"): time_sum = 0 for i in self.data: time_sum += i.get("time",0) self._avg_time = time_sum / len(self.data) return self._avg_time @property def tokens_num(self): if not hasattr(self, "_tokens_num"): prompt_tokens_sum = 0 completion_tokens_sum = 0 for i in self.data: prompt_tokens_sum += i.get("prompt_tokens",0) completion_tokens_sum += i.get("completion_tokens",0) self._prompt_tokens_num = prompt_tokens_sum self._completion_tokens_num = completion_tokens_sum self._tokens_num = prompt_tokens_sum + completion_tokens_sum return self._tokens_num @property def prompt_tokens_num(self): if not hasattr(self, "_prompt_tokens_num"): self.tokens_num return self._prompt_tokens_num @property def completion_tokens_num(self): if not hasattr(self, "_completion_tokens_num"): self.tokens_num return self._completion_tokens_num @property def avg_tokens(self): if not hasattr(self, "_avg_tokens"): self._avg_tokens = self.tokens_num / self.total_num return self._avg_tokens @property def cost(self): if not hasattr(self, "_cost"): self._cost = self.get_total_cost() return self._cost @property def avg_cost(self): if not hasattr(self, "_avg_cost"): self._avg_cost = self.cost / self.total_num return self._avg_cost def get_total_cost(self): """ return the average cost of the data """ prompt_cost_perk, completion_cost_perk = PRICING_MODELS[self.pricing_model] prompt_cost = self.prompt_tokens_num * prompt_cost_perk / 1000 completion_cost = self.completion_tokens_num * completion_cost_perk / 1000 total_cost = prompt_cost + completion_cost return total_cost def get_eval2_ratio_each_problem(self): """ return the ratio of the second evaluation """ txt_out = "" for i in self.data: if self.Eval0_pass(i) and i.get("Eval1_pass",0): task_id = i["task_id"] eval2_ratio = i.get("Eval2_ratio", "No Eval2 ratio data") txt_out += "%s: %s\n" % (task_id, eval2_ratio) return txt_out def get_eval2b_ratio_each_problem(self): """ return the ratio of the second evaluation """ txt_out = "" for i in self.data: if self.Eval2b_exist: if self.Eval0_pass(i) and i.get("Eval1_pass",0): task_id = i["task_id"] eval2_ratio = i.get("Eval2b_ratio", "No Eval2b ratio data") txt_out += "%s: %s\n" % (task_id, eval2_ratio) else: txt_out = "No Eval2b data" return txt_out def get_debug_infotable(self): """ return the debug info table: | un-debugged | debugged | total | failed | - | 2 | 2 | Eval0 | 3 | 5 | 8 | Eval1 | 2 | 2 | 4 | Eval2 | 1 | 0 | 1 | if have Eval2b: Eval2b | 1 | 0 | 1 | """ txt_out = "" # debugged but failed failed_debugged_num = 0 failed_undebugged_num = 0 Eval0_debugged_num, Eval1_debugged_num, Eval2_debugged_num, Eval2b_debugged_num = 0, 0, 0, 0 Eval0_undebugged_num, Eval1_undebugged_num, Eval2_undebugged_num, Eval2b_undebugged_num = 0, 0, 0, 0 if self.reboot_times_exist or self.TB_corrected_exist: mode = "funcdebug" else: mode = "syndebug" for i in self.data: if mode == "syndebug": debugged = (self.debug_iter(i) != 0) elif mode == "funcdebug": debugged = (i.get("reboot_times", 0) > 0 or (i.get("TB_corrected", False))) else: raise ValueError("mode should be 'syndebug' or 'funcdebug'") failed_debugged_num += 1 if not self.Eval0_pass(i) and debugged else 0 failed_undebugged_num += 1 if not self.Eval0_pass(i) and (not debugged) else 0 Eval0_debugged_num += 1 if self.Eval0_pass(i) and debugged else 0 Eval0_undebugged_num += 1 if self.Eval0_pass(i) and (not debugged) else 0 Eval1_debugged_num += 1 if self.Eval0_pass(i) and i.get("Eval1_pass", 0) and debugged else 0 Eval1_undebugged_num += 1 if self.Eval0_pass(i) and i.get("Eval1_pass", 0) and (not debugged) else 0 Eval2_debugged_num += 1 if self.Eval0_pass(i) and i.get("Eval1_pass", 0) and self.Eval2_pass(i) and debugged else 0 Eval2_undebugged_num += 1 if self.Eval0_pass(i) and i.get("Eval1_pass", 0) and self.Eval2_pass(i) and (not debugged) else 0 if self.Eval2b_exist: Eval2b_debugged_num += 1 if self.Eval0_pass(i) and i.get("Eval1_pass", 0) and self.Eval2b_pass(i) and debugged else 0 Eval2b_undebugged_num += 1 if self.Eval0_pass(i) and i.get("Eval1_pass", 0) and self.Eval2b_pass(i) and (not debugged) else 0 failed_num = failed_debugged_num + failed_undebugged_num Eval0_num = Eval0_debugged_num + Eval0_undebugged_num Eval1_num = Eval1_debugged_num + Eval1_undebugged_num Eval2_num = Eval2_debugged_num + Eval2_undebugged_num if self.Eval2b_exist: Eval2b_num = Eval2b_debugged_num + Eval2b_undebugged_num # make a table; each cell should have a width of 11 txt_out += ("SYNTACTIC" if mode == "syndebug" else "FUNCTIONAL") + " debug info table:\n" txt_out += "(debugged here means " + ("syntactic debugging" if mode == "syndebug" else "functional debugging") + ")\n" if mode == "syndebug": txt_out += " | un-synt-debugged | synt-debugged | total |\n" elif mode == "funcdebug": txt_out += " | un-func-debugged | func-debugged | total |\n" txt_out += "failed | %16d | %13d | %5d |\n" % (failed_undebugged_num, failed_debugged_num, failed_num) txt_out += "Eval0 | %16d | %13d | %5d |\n" % (Eval0_undebugged_num, Eval0_debugged_num, Eval0_num) txt_out += "Eval1 | %16d | %13d | %5d |\n" % (Eval1_undebugged_num, Eval1_debugged_num, Eval1_num) txt_out += "Eval2 | %16d | %13d | %5d |\n" % (Eval2_undebugged_num, Eval2_debugged_num, Eval2_num) if self.Eval2b_exist: txt_out += "Eval2b | %16d | %13d | %5d |\n" % (Eval2b_undebugged_num, Eval2b_debugged_num, Eval2b_num) return txt_out def get_iv_runing_time_info(self): max_time = 0.0 min_time = 0.0 total_time = 0.0 cnt = 0 for i in self.data: if self.Eval0_pass(i): time = float(i.get("iv_runing_time", 0.0)) if (time > max_time) or (max_time == 0.0): max_time = time if (time < min_time) or (min_time == 0.0): min_time = time total_time += time cnt += 1 avg_time = total_time / cnt if cnt != 0 else 0.0 if cnt != 0: self.out_txt += "\n#### iv_runing_time info:\n" self.out_txt += "avg_time: %.2fs\n" % avg_time self.out_txt += "max_time: %.2fs\n" % max_time self.out_txt += "min_time: %.2fs\n" % min_time def Eval0_pass(self, data): if "Eval0_pass" in data.keys(): return data["Eval0_pass"] # latest version elif "sim_pass" in data.keys(): return data["sim_pass"] # old version else: return False def Eval1_pass(self, data): return data.get("Eval1_pass", False) def Eval2_pass(self, data): """check if one data pass the Eval 2""" # we use this to compensate special cases: m2014_q3 if data["task_id"] == "m2014_q3": if data.get("Eval2_failed_mutant_idxes", []) == [3,4,7,8,9,10] or self.loose_Eval2_pass(data): return True else: return False # normal cases else: return self.loose_Eval2_pass(data) def Eval0_scencheck_pass(self, data): return self.Eval0_pass(data) and data.get("checklist_worked", False) def Eval1_scencheck_pass(self, data): return self.Eval1_pass(data) and data.get("checklist_worked", False) def Eval2_scencheck_pass(self, data): return self.Eval2_pass(data) and data.get("checklist_worked", False) def Eval0_noscencheck_pass(self, data): return self.Eval0_pass(data) and (not data.get("checklist_worked", False)) def Eval1_noscencheck_pass(self, data): return self.Eval1_pass(data) and (not data.get("checklist_worked", False)) def Eval2_noscencheck_pass(self, data): return self.Eval2_pass(data) and (not data.get("checklist_worked", False)) def Eval0_nodebug_pass(self, data): return (self.Eval0_pass(data)) and (self.debug_iter(data) == 0) def Eval1_nodebug_pass(self, data): return (self.Eval1_pass(data)) and (self.debug_iter(data) == 0) def Eval2_nodebug_pass(self, data): return (self.Eval2_pass(data)) and (self.debug_iter(data) == 0) def Eval2b_pass(self, data): """check if one data pass the Eval 2""" # we use this to compensate special cases: m2014_q3 if data["task_id"] == "m2014_q3": if self.Eval2_pass(data): return True else: return False # normal cases else: return self.loose_Eval2b_pass(data) def debug_iter(self, data): if data.get("debug_iter", None) is not None: return data["debug_iter"] else: return data.get("debug_iter_iv", 0) + data.get("debug_iter_py", 0) def loose_Eval2_pass(self, data): """pass for 9/10, 8/10 and 4/5""" if data.get("Eval2_pass", False): return True ratio_str = data.get("Eval2_ratio", None) if ratio_str is None: return False numerator, denominator = ratio_str.split("/") numerator, denominator = int(numerator), int(denominator) # if int(numerator) + 1 >= int(denominator): if float(numerator) / float(denominator) >= self.loose_factor: return True else: return False def loose_Eval2b_pass(self, data): """pass for 9/10, 8/10 and 4/5""" if data.get("Eval2b_pass", False): return True ratio_str = data.get("Eval2b_ratio", None) if ratio_str is None: return False numerator, denominator = ratio_str.split("/") numerator, denominator = int(numerator), int(denominator) # if int(numerator) + 1 >= int(denominator): if float(numerator) / float(denominator) >= self.loose_factor: return True else: return False class MultiAnalyzer(muti_dictlist): def __init__(self, group_dir:str=None, pass_at_k_kvalues = K_LIST): """ group_dir: includes many subdirs, each subdir contains a Chatbench_RunInfo.json """ super().__init__(id_key="task_id") self.runinfo_paths = [] self.result = {} # include the final results self.pass_at_k_kvalues = pass_at_k_kvalues self.exclude_debug = False # this is uesd in baseline when analyzing the data without debug self.messages = "" self.group_dir = group_dir for subdir in os.listdir(group_dir): path_runinfo = os.path.join(group_dir, subdir, "Chatbench_RunInfo.json") if os.path.exists(path_runinfo): self.runinfo_paths.append(path_runinfo) for path in self.runinfo_paths: self.dictlists.append(Analyzer(ls.load_json_dict(path))) self.dictlists: list[Analyzer] # check if the values in self.num(list) are the same if not self.all_equal("num"): print(self.num) raise ValueError("The total_num of the data are not the same") @property def analyzers(self): return self.dictlists def run(self, Evals=["Eval0", "Eval1", "Eval2"]): num_tasks = self.dictlists[0].total_num pass_at = self.pass_at_k_kvalues for Eval_idx in Evals: for pass_at_k in pass_at: self.Evalx_ratio_passatk(Eval_idx, pass_at_k) self.messages += "\n########## Analyze of Chatbench_RunInfos ##########\n" self.messages += "\n#### basic info:\n" self.messages += "total number of tasks: %d\n" % self.dictlists[0].total_num self.messages += "sample numbers: %d\n" % len(self.dictlists) self.messages += "\n#### pass@k ratios:\n" for key, value in self.result.items(): self.messages += "%s: %.2f%% (%.1f)\n" % (key, value*100, value*num_tasks) self.messages += "\nloose Eval2 pass metric applied: %s\n\n" % self.dictlists[0].loose_factor def save(self, path:str=None): if path is None: path = DEFAULT_LOG_PATH with open(path, "w") as f: f.write(self.messages) # also, write the current time f.write("analysis time: %s\n" % (utils.get_time())) def get_avg_tokens_one_task(self): self.prompt_tokens_num = 0 self.completion_tokens_num = 0 for analyzer in self.dictlists: self.prompt_tokens_num += analyzer.prompt_tokens_num self.completion_tokens_num += analyzer.completion_tokens_num self.avg_prompt_tokens = self.prompt_tokens_num / len(self.dictlists) / self.dictlists[0].total_num self.avg_completion_tokens = self.completion_tokens_num / len(self.dictlists) / self.dictlists[0].total_num self.messages += "average prompt tokens: %d\n" % self.avg_prompt_tokens self.messages += "average completion tokens: %d\n" % self.avg_completion_tokens def get_avg_pass_by_disc_and_corr(self): self.pass_by_corrected = 0.0 self.pass_by_disc = 0.0 for analyzer in self.dictlists: self.pass_by_corrected += analyzer.corrected_num self.pass_by_disc += analyzer.autoline_reboot_task_num self.pass_by_corrected /= len(self.dictlists) self.pass_by_disc /= len(self.dictlists) self.messages += "passed with functional corrector: %.1f\n" % self.pass_by_corrected self.messages += "passed with autoline reboot action: %.1f\n" % self.pass_by_disc def renew_result_dict(self): self.result_dict = HDLBitsProbset() self.result_dict.create_empty_set_via_taskids(self.dictlists[0].task_id_list) def Evalx_ratio_passatk(self, Eval_idx="Eval0", pass_at:int=1): """ return the ratio of the Eval0 pass under pass@k """ # assert Eval_idx in ["Eval0", "Eval1", "Eval2", "Eval2b"], "Eval_idx should be one of Eval0, Eval1, Eval2, Eval2b" if not hasattr(Analyzer, Eval_idx + "_pass"): raise ValueError("The function %s_pass is not defined in Analyzer" % Eval_idx) k = pass_at n = len(self.dictlists) Evalx_pass_at_k_total = 0 # compute the pass ratio under pass@k for each task for task_id in self.dictlists[0].task_id_list: if hasattr(self, "result_dict"): task_result = self.result_dict.access_data_via_taskid(task_id) pass_num = 0 for dictlist in self.dictlists: Evalx_pass_func = getattr(dictlist, "%s_pass"%Eval_idx) # if dictlist.access_data_via_taskid(task_id)["%s_pass"%Eval_idx]: if Evalx_pass_func(dictlist.access_data_via_taskid(task_id)): if not (self.exclude_debug and dictlist.debug_iter(dictlist.access_data_via_taskid(task_id))): # if exclude_debug is True and the task is debugged, we will not count it pass_num += 1 # data = self.result_set.access_data_via_taskid(task_id) pass_at_k = self.pass_at_k_under_n(n, k, pass_num) Evalx_pass_at_k_total += pass_at_k if hasattr(self, "result_dict"): task_result["%s_pass_num"%Eval_idx] = pass_num task_result["%s_pass_at_%d"%(Eval_idx, k)] = pass_at_k Evalx_pass_at_k_total /= self.dictlists[0].total_num self.result["%s_pass_at_%d" % (Eval_idx, k)] = Evalx_pass_at_k_total @staticmethod def pass_at_k_under_n(n:int, k:int, c:int): """ - n: total number of samples - k: number of samples we want to pick - c: number of samples passed - output: pass@k under n - return the pass ratio under pass@k for n times; we have n samples, pass_num samples passed. Now we want to calculate the possibility that we pick k samples and at least one of them passed """ return 1 - (math.comb(n-c, k) / math.comb(n, k)) def Eval2_histogram(): """draw Eval2 histogram""" k_list = [1,5,10] multi_analyzer = MultiAnalyzer(MULTI_DIR, k_list) ratios = [] for analyzer in multi_analyzer.dictlists: for i in analyzer.data: if analyzer.Eval0_pass(i) and analyzer.Eval1_pass(i): # get the numerical ratio ratio_str = i.get("Eval2_ratio", None) if ratio_str is None: continue numerator, denominator = ratio_str.split("/") ratio = float(numerator) / float(denominator) ratios.append(ratio) # draw histogram of ratios, it will have 10 bars, 0~10%, 10~20%, ..., 90~100% import matplotlib.pyplot as plt import matplotlib matplotlib.use('Agg') plt.hist(ratios, bins=10, range=(0,1)) # title: distribution of Eval2 (Our work) plt.title("distribution of Eval2 (Baseline)") # y label is the number of tasks, tick is 10 plt.ylabel("number of tasks") plt.yticks(range(0, 700, 50)) # small grid so we can see the number of tasks, major grid, both axis. the grid color should be very light plt.grid(True, which='both', axis='both', color='lightgray', linestyle='-', linewidth=0.5) # x label is the ratio, tick is 10% plt.xlabel("ratio") plt.xticks([0.1 * i for i in range(11)]) # save to analysis/eval2_histogram.png plt.savefig(os.path.join(DEFAULT_SAVING_DIR, "eval2_histogram_Baseline.png")) # save the ratios to out, it is grouped by 0.1. the first group will contain both 0 and 0.1 # the format is csv with open(os.path.join(DEFAULT_SAVING_DIR, "eval2_ratios_Baseline.txt"), "w") as f: for i in range(11): ratio = 0.1 * i ratio_num = len([j for j in ratios if math.floor(j*10) == i]) f.write("%.1f, %d\n" % (ratio, ratio_num)) # export the original bin data with open(os.path.join(DEFAULT_SAVING_DIR, "eval2_ratios_bin_Baseline.txt"), "w") as f: for i in ratios: f.write("%.2f\n" % i) plt.close() def task_eval2passtimes_analyze(): multi_analyzer = MultiAnalyzer(MULTI_DIR) pass_taskids_list = [] for analyzer in multi_analyzer.dictlists: # find the task_id that pass Eval2 pass_taskids = [] for data in analyzer.data: if analyzer.Eval2_pass(data): pass_taskids.append(data["task_id"]) pass_taskids_list.append(pass_taskids) # find the idx of task "lemming3" idxs = [] for idx, pass_taskids in enumerate(pass_taskids_list): if "countbcd" in pass_taskids: idxs.append(idx) print("countbcd passed at:") print(idxs) # calculate the pass times for each task_id # pass_times_dict = {} # for pass_taskids in pass_taskids_list: # for task_id in pass_taskids: # if task_id in pass_times_dict.keys(): # pass_times_dict[task_id] += 1 # else: # pass_times_dict[task_id] = 1 # circuit_type_data_path = "data/HDLBits/HDLBits_circuit_type.jsonl" # SEQ_task_ids = HDLBitsProbset(circuit_type_data_path, filter_content={"circuit_type": "SEQ"}).task_id_list # # remove tasks that are not seq and then print # seq_passed_tasks = [] # for task_id in pass_times_dict.keys(): # if task_id in SEQ_task_ids: # seq_passed_tasks.append(task_id) # # pick 5 most complex tasks according to complexity = 1*len(description) + 2*len(module_code) # HDLdata = HDLBitsProbset("data/HDLBits/HDLBits_data.jsonl", only_tasks=seq_passed_tasks) # HDLdata.data.sort(key=lambda x: 1*len(x["description"]) + 2*len(x["module_code"]), reverse=True) # # print the task_id and pass times of the 5 most complex tasks # for i in range(10): # task_id = HDLdata.data[i]["task_id"] # pass_times = pass_times_dict[task_id] # print(task_id + ": " + str(pass_times)) if __name__ == "__main__": main() # FULLEXP_no1_paths = [ # "saves/1211~1217/Chatbench_RunInfo.json" # ] # def get_TCpass_num(data): # """ # return the number of passed tasks # """ # pass_num = 0 # for i in data: # if i["TC_pass"]: # pass_num += 1 # return pass_num # def get_debugTCpass_num(data): # """ # return the number of passed tasks with debug iter != 0 # """ # pass_num = 0 # for i in data: # if i["TC_pass"] and self.debug_iter(i) != 0: # pass_num += 1 # return pass_num # def get_debugsimpass_num(data): # """ # return the number of passed tasks with debug iter != 0 # """ # pass_num = 0 # for i in data: # if self.Eval0_pass(i) and self.debug_iter(i) != 0: # pass_num += 1 # return pass_num # def get_average_debugiter_debugTCpass(data): # """ # return the average debug iter of the passed with debug data # """ # debug_iter_sum = 0 # debug_iter_num = 0 # for i in data: # if i["TC_pass"] and self.debug_iter(i) != 0: # debug_iter_sum += self.debug_iter(i) # debug_iter_num += 1 # return debug_iter_sum / debug_iter_num # def get_average_time(data): # """ # return the average time of the data # """ # time_sum = 0 # for i in data: # time_sum += i["time"] # return time_sum / len(data) # def get_num_of_onetime_simpass(data): # """ # return the number of tasks that passed in the first run # """ # pass_num = 0 # for i in data: # if self.Eval0_pass(i) and self.debug_iter(i) == 0: # pass_num += 1 # return pass_num # def analyze_EXP1_main(json_file_list): # output_data_list = [] # for json_file in json_file_list: # data = ls.load_json_dict(json_file) # output_data_list.extend(data) # analyze(output_data_list) # def correct_exp_no1_main(json_file_list): # output_data_list = correct_exp_no1(json_file_list) # analyze(output_data_list) # def analyze(output_data_list): # # show all of the above processed data # print("total number of tasks: %d" % (get_total_num(output_data_list))) # print("number of simpassed tasks: %d" % (get_simpass_num(output_data_list))) # print("number of allpassed tasks: %d" % (get_TCpass_num(output_data_list))) # print("TCpass percentage: %.2f%%" % (get_TCpass_num(output_data_list) / get_total_num(output_data_list) * 100)) # print("number of debug_and_TCpass: %d" % (get_debugTCpass_num(output_data_list))) # print("number of debug_and_simpass: %d" % (get_debugsimpass_num(output_data_list))) # print("average debug iter of debug_and_TCpass: %.2f" % (get_average_debugiter_debugTCpass(output_data_list))) # print("average time: %.2fs" % (get_average_time(output_data_list))) # # save them to a txt file analyze_out.txt # with open("analyze_out.txt", "w") as f: # f.write("total number of tasks: %d\n" % (get_total_num(output_data_list))) # f.write("number of simpassed tasks: %d\n" % (get_simpass_num(output_data_list))) # f.write("number of allpassed tasks: %d\n" % (get_TCpass_num(output_data_list))) # f.write("TCpass percentage: %.2f%%\n" % (get_TCpass_num(output_data_list) / get_total_num(output_data_list) * 100)) # f.write("number of debug_and_TCpass: %d\n" % (get_debugTCpass_num(output_data_list))) # f.write("number of debug_and_simpass: %d\n" % (get_debugsimpass_num(output_data_list))) # f.write("average debug iter of debug_and_TCpass: %.2f\n" % (get_average_debugiter_debugTCpass(output_data_list))) # f.write("average time: %.2fs\n" % (get_average_time(output_data_list))) # # also, write the current time # f.write("time: %s\n" % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))) # def correct_exp_no1(json_file_list): # new_results = [] # for json_file in json_file_list: # data_dir = json_file[:json_file.rfind("/")+1] # results = ls.load_json_dict(json_file) # for prob_result in results: # prob_dir = data_dir + prob_result["task_id"] + "/" # if prob_result["debug_iter"] == 0: # prob_last_run_info_path = prob_dir + "TBgen_codes/" + "run_info.txt" # else: # prob_last_run_info_path = prob_dir + "debug_%s" % (prob_result["debug_iter"]) + "/" + "run_info.txt" # prob_last_run_info = ls.load_txt(prob_last_run_info_path) # # check if "All test cases passed" is in the last run info # if "test cases passed" in prob_last_run_info: # prob_result["TC_pass"] = True # else: # prob_result["TC_pass"] = False # new_results.extend(results) # ls.save_dict_json_form(new_results, "corrected_exp_no1.json") # return new_results # if __name__ == "__main__": # # correct_exp_no1_main(FULLEXP_no1_paths) # analyze_EXP1_main(FULLEXP_no1_paths) # # main()