551 lines
27 KiB
Python
551 lines
27 KiB
Python
"""
|
|
Description : This script is used to merge the data from VerilogEval (originally from HDLBits). Run this script in the root directory of the project.
|
|
Author : Ruidi Qiu (ruidi.qiu@tum.de)
|
|
Time : 2023/12/06 15:50:33
|
|
LastEdited : 2024/8/13 17:08:02
|
|
"""
|
|
|
|
import sys
|
|
if __name__ == "__main__":
|
|
sys.path.append(".")
|
|
import random
|
|
import LLM_call as gpt
|
|
import loader_saver as ls
|
|
from data.probset import HDLBitsProbset
|
|
from copy import deepcopy
|
|
from loader_saver import load_json_lines, save_json_lines, find_line_jsonl
|
|
|
|
|
|
VerilogDescription_Human_path = "data/HDLBits/original_data_human/VerilogDescription_Human.jsonl"
|
|
VerilogEval_Human = "data/HDLBits/original_data_human/VerilogEval_Human.jsonl"
|
|
FATHER_DATA_PATH = "data/HDLBits/HDLBits_data.jsonl"
|
|
MERGED_DATA_PATH = "data/HDLBits/HDLBits_data.jsonl"
|
|
CIRCUIT_TYPE_PATH = "data/HDLBits/HDLBits_circuit_type.jsonl"
|
|
REPORT_PATH = "data/HDLBits/HDLBits_data_report.txt"
|
|
MUTANT_TEMPLATE_PATH = "config/templates/script_template/mutant_template.txt"
|
|
LLM_MODEL = "gpt-4-turbo-2024-04-09"
|
|
|
|
def main():
|
|
# data_manager = HDLBitsManager("data/HDLBits/HDLBits_data.jsonl", REPORT_PATH)
|
|
# seq15 = HDLBitsProbset("data/HDLBits/HDLBits_data_SEQ15_RTL.jsonl")
|
|
# cmb15 = HDLBitsProbset("data/HDLBits/HDLBits_data_CMB15_RTL.jsonl")
|
|
# rtl_saved = HDLBitsProbset("data/HDLBits/HDLBits_data_RTL_saved.jsonl")
|
|
# seq15_tasklist = seq15.task_id_list
|
|
# cmb15_tasklist = cmb15.task_id_list
|
|
# print(cmb15_tasklist)
|
|
# rtl_saved_tasklist = rtl_saved.task_id_list
|
|
# data_manager.del_items(seq15_tasklist, del_by_list=True)
|
|
# data_manager.del_items(cmb15_tasklist, del_by_list=True)
|
|
# data_manager.del_items(rtl_saved_tasklist, del_by_list=True)
|
|
# print(data_manager.num)
|
|
# data_manager.task_RTL_gen("config/templates/script_template/RTL_template.txt", "data/HDLBits/HDLBits_data_RTL.jsonl", num_RTL=10)
|
|
|
|
# test_data.del_items(["rule110", "conwaylife", "circuit1"], del_by_list=False)
|
|
# test_data.del_items(["review2015_fsmonehot", "2014_q3c", "fsm3comb", "fsm3onehot"], del_by_list=False)
|
|
# test_data.task_circuit_type_gen_only_description("data/HDLBits/circuit_type_gen_test_report.txt", step2_algorithm=True)
|
|
# test_data.task_gen_circuit_type_strmatch("data/HDLBits/circuit_type_gen_strmatch_test_report.txt")
|
|
# circuit_type_datapath = "data/HDLBits/HDLBits_circuit_type.jsonl"
|
|
# # SEQ15_datapath = "data/HDLBits/HDLBits_data_SEQ15.jsonl"
|
|
# # SEQ15_data = HDLBitsManager(SEQ15_datapath, REPORT_PATH)
|
|
# # SEQ15_taskids = SEQ15_data.task_id_list
|
|
# HDLBits_data = HDLBitsManager(FATHER_DATA_PATH, REPORT_PATH, [circuit_type_datapath])
|
|
# HDLBits_data.filter({"circuit_type": "SEQ"})
|
|
# # HDLBits_data.del_items(SEQ15_taskids, del_by_list=True)
|
|
# # HDLBits_data.task_mutant_gen(MUTANT_TEMPLATE_PATH, "data/HDLBits/HDLBits_data_SEQothers_mutants.jsonl", num_mutants=10)
|
|
# SEQ_no_clk = []
|
|
# for data in HDLBits_data.data:
|
|
# if "clk" in data["header"]:
|
|
# pass
|
|
# else:
|
|
# SEQ_no_clk.append(data["task_id"])
|
|
# print(SEQ_no_clk)
|
|
|
|
hdldata = HDLBitsProbset("data/HDLBits/HDLBits_data.jsonl")
|
|
len_description = 0
|
|
len_module_code = 0
|
|
for data in hdldata.data:
|
|
len_description += len(data["description"])
|
|
len_module_code += len(data["module_code"])
|
|
print("len_description: %d" % (len_description))
|
|
print("len_module_code: %d" % (len_module_code))
|
|
|
|
|
|
## analyze the HDLBits json data
|
|
class HDLBitsManager(HDLBitsProbset):
|
|
def __init__(self, jsonl_path:str, report_path=None, more_info_paths:list=[], only_tasks=None, exclude_tasks=[], filter_content={}):
|
|
super().__init__(jsonl_path, more_info_paths=more_info_paths, only_tasks=only_tasks, exclude_tasks=exclude_tasks, filter_content=filter_content)
|
|
self.jsonl_path = jsonl_path
|
|
if report_path is None:
|
|
# save in the same directory as the jsonl file
|
|
report_path = self.jsonl_path[:self.jsonl_path.rfind(".")] + "_report.txt"
|
|
self.report_path = report_path
|
|
self.report_txt = ""
|
|
pass
|
|
|
|
@property
|
|
def additional_data(self):
|
|
if not hasattr(self, "_additional_data"):
|
|
self._additional_data = []
|
|
for data in self.data:
|
|
self._additional_data.append({
|
|
"task_id": data["task_id"]
|
|
})
|
|
return self._additional_data
|
|
|
|
def add_info_additional_data(self, task_id, info_key, info_value):
|
|
self.additional_data
|
|
for data in self._additional_data:
|
|
if data["task_id"] == task_id:
|
|
data[info_key] = info_value
|
|
return
|
|
raise ValueError("task_id %s not found!!!" % (task_id))
|
|
|
|
def load_info_additional_data(self, additional_data_path):
|
|
load_additional_data = ls.load_json_lines(additional_data_path)
|
|
for data in load_additional_data:
|
|
try:
|
|
for key, value in data.items():
|
|
if key != "task_id":
|
|
self.add_info_additional_data(data["task_id"], key, value)
|
|
except:
|
|
print("error in loading additional data at task_id: %s" % (data["task_id"]))
|
|
print("additional data loaded!")
|
|
|
|
# def merge_additional_data(self):
|
|
# """merge additional data into the original data"""
|
|
# for data in self.data:
|
|
# for add_data in self._additional_data:
|
|
# if data["task_id"] == add_data["task_id"]:
|
|
# for key, value in add_data.items():
|
|
# if key != "task_id":
|
|
# data[key] = value
|
|
# save_path = self.jsonl_path.replace(".jsonl", "_plus.jsonl")
|
|
# ls.save_json_lines(self.data, save_path)
|
|
# print("additional data merged!")
|
|
|
|
def save_report(self):
|
|
with open(self.report_path, 'w') as f:
|
|
f.write(self.report_txt)
|
|
|
|
def access_data(self, data_name):
|
|
"""
|
|
return a dict. dict format: {task_id: data}
|
|
original self.data format: [{"task_id": task_id, "data_1": data_1, "data_2": data_2, ...}, ...]
|
|
"""
|
|
return {i["task_id"]: i[data_name] for i in self.data}
|
|
|
|
|
|
|
|
def task_RTL_token_num_analysis(self):
|
|
"""
|
|
analyze the RTL token number of each task_id and add the result to the data
|
|
"""
|
|
for i in self.data:
|
|
i["RTL_token_num"] = gpt.num_tokens_from_string(i["module_code"]) + gpt.num_tokens_from_string(i["header"])
|
|
token_num_data = self.access_data("RTL_token_num")
|
|
## analyze the token number: min, max, average, and the name of the task_id
|
|
token_num_list = list(token_num_data.values())
|
|
self.report_txt += "////////// token number analysis //////////\n"
|
|
self.report_txt += "total number of tasks: %d\n" % (len(token_num_list))
|
|
self.report_txt += "min token number: %d\n" % (min(token_num_list))
|
|
self.report_txt += "min token task_id: %s\n" % (list(token_num_data.keys())[token_num_list.index(min(token_num_list))])
|
|
self.report_txt += "max token number: %d\n" % (max(token_num_list))
|
|
self.report_txt += "max token task_id: %s\n" % (list(token_num_data.keys())[token_num_list.index(max(token_num_list))])
|
|
self.report_txt += "average token number: %.2f\n" % (sum(token_num_list) / len(token_num_list))
|
|
## print the distribution of the token number: from 0 to 800, every 50
|
|
token_num_distribution = [0] * 17
|
|
for i in token_num_list:
|
|
token_num_distribution[i//50] += 1
|
|
# print it in a table
|
|
self.report_txt += "token number distribution:\n"
|
|
self.report_txt += "token number\tcount\n"
|
|
for i in range(17):
|
|
self.report_txt += "%d\t\t%d\n" % (i*50, token_num_distribution[i])
|
|
self.report_txt += "\n\n"
|
|
print("RTL token number analysis finished!")
|
|
|
|
def task_miniset_gen(self, mini_set_size=20, save_path=None, suffix="miniset"):
|
|
"""
|
|
generate a mini set of the data. the mini set size is determined by mini_set_size. data is randomly selected.
|
|
"""
|
|
mini_set = random.sample(self.data, mini_set_size)
|
|
if save_path is None:
|
|
# save the mini set in the same directory as the jsonl file.
|
|
save_path = self.jsonl_path[:self.jsonl_path.rfind(".")] + "_%s.jsonl"%(suffix)
|
|
save_json_lines(mini_set, save_path)
|
|
|
|
def task_RTL_gen(self, template_path, save_path=None, num_RTL = 10, gpt_model = LLM_MODEL):
|
|
"""
|
|
generate RTL code for the tasks in the data.
|
|
"""
|
|
if save_path is None:
|
|
# save the mutants in the same directory as the jsonl file.
|
|
save_path = self.jsonl_path[:self.jsonl_path.rfind(".")] + "_RTL.jsonl"
|
|
# load the template (txt)
|
|
with open(template_path, 'r') as f:
|
|
template = f.read()
|
|
# generate the mutants
|
|
new_list = []
|
|
usages = []
|
|
idx = 0
|
|
for data_i in self.data:
|
|
out_dict = {"task_id": data_i["task_id"]}
|
|
prompt_i = template
|
|
prompt_i = prompt_i.replace("{$problem description from HDLBits$}", data_i["description"])
|
|
prompt_i = prompt_i.replace("{$header from HDLBits$}", data_i["header"])
|
|
message_in = [{"role": "user", "content": prompt_i}]
|
|
rtl_list = []
|
|
for i in range(num_RTL):
|
|
response, info = gpt.llm_call(message_in, gpt_model, "config/key_API.json", temperature=0.8)
|
|
rtl_list.append(gpt.extract_code(response, "verilog")[-1])
|
|
usages.append(info["usage"])
|
|
out_dict["RTL_code"] = rtl_list
|
|
new_list.append(out_dict)
|
|
idx += 1
|
|
print("%d task(s) finished!" % (idx))
|
|
save_json_lines(new_list, save_path)
|
|
print("RTL generation finished!")
|
|
cost = gpt.cost_calculator(usages)
|
|
print("total tokens used: $%.4f\n" % (cost))
|
|
|
|
def task_mutant_gen(self, template_path, save_path=None, num_mutants=10):
|
|
"""
|
|
generate mutants for the tasks in the data.
|
|
"""
|
|
from config.config import GPT_MODELS
|
|
if save_path is None:
|
|
# save the mutants in the same directory as the jsonl file.
|
|
save_path = self.jsonl_path[:self.jsonl_path.rfind(".")] + "_mutants.jsonl"
|
|
# load the template (txt)
|
|
with open(template_path, 'r') as f:
|
|
template = f.read()
|
|
# template contains special character {$n$}, replace it with num_mutants (int)
|
|
template = template.replace("{$n$}", str(num_mutants))
|
|
# generate the mutants
|
|
new_list = []
|
|
usages = []
|
|
idx = 0
|
|
for data_i in self.data:
|
|
dict = {"task_id": data_i["task_id"]}
|
|
prompt_i = template
|
|
prompt_i = prompt_i.replace("{$problem description from HDLBits$}", data_i["description"])
|
|
prompt_i = prompt_i.replace("{$RTL code from HDLBits$}", data_i["module_code"])
|
|
message_in = [{"role": "user", "content": prompt_i}]
|
|
response, info = gpt.llm_call(message_in, GPT_MODELS["4"], "config/key_API.json", temperature=0.8)
|
|
mutant_list = gpt.extract_code(response, "verilog")
|
|
dict["mutants"] = mutant_list
|
|
new_list.append(dict)
|
|
idx += 1
|
|
print("%d task(s) finished!" % (idx))
|
|
save_json_lines(new_list, save_path) # save the mutants every time a task is finished
|
|
print("Mutants generation finished!")
|
|
|
|
def task_find_xxx_in_xxx(self, keyword, content_name="module_code", show_task_id=False):
|
|
"""
|
|
input:
|
|
- keyword: the keyword to be found
|
|
- content_name: the name of the content to be searched. default: "module_code"
|
|
- show_task_id: whether to show the task_id of the task containing the keyword. default: False
|
|
"""
|
|
count = 0
|
|
for prob in self.data:
|
|
if keyword in prob.get(content_name, ""):
|
|
if show_task_id:
|
|
print(prob["task_id"])
|
|
count += 1
|
|
print("%s in %s count: %d" % (keyword, content_name, count))
|
|
|
|
def task_circuit_type_gen(self):
|
|
"""
|
|
classify the tasks into COM or SEQ according to `module_code`
|
|
"""
|
|
def get_CMB_or_SEQ_from_GPT(module_code, description):
|
|
"""
|
|
ask GPT to classify the module_code into COM or SEQ
|
|
"""
|
|
prompt = "Please classify the following verilog code into combinational circuit or sequential circuit:\n" + module_code + "\nthe circuit description is:\n" + description + "\n IMPORTANT: please only reply one word as the response. If this verilog code is a combinational circuit, please reply 'CMB'. If this verilog code is a sequential circuit, please reply 'SEQ'. \nVERY IMPORTANT: DO NOT reply anything else."
|
|
system_message = "You are a very smart AI, please classify the following verilog code into combinational circuit or sequential circuit. You already have the knowledge to do this."
|
|
message_in = [{"role": "user", "content": prompt}]
|
|
response, info = gpt.llm_call(message_in, GPT_MODEL, "config/key_API.json", temperature=0.5, system_message=system_message)
|
|
return response, info
|
|
# classify the tasks
|
|
ITER_NUM = 2
|
|
GPT_MODEL = "gpt-4-0125-preview"
|
|
SAVE_PATH = self.jsonl_path.replace("data", "circuit_type")
|
|
total_tokens = 0
|
|
total_CMBs = 0
|
|
total_SEQs = 0
|
|
total_unknowns = 0
|
|
unknown_list = []
|
|
for prob in self.data:
|
|
CMB, SEQ = 0, 0
|
|
response_list = []
|
|
for i in range(ITER_NUM):
|
|
response, info = get_CMB_or_SEQ_from_GPT(prob["module_code"], prob["description"])
|
|
response_list.append(response)
|
|
total_tokens += info["usage"]["total_tokens"]
|
|
if "CMB" in response:
|
|
CMB += 1
|
|
elif "SEQ" in response:
|
|
SEQ += 1
|
|
if CMB > (ITER_NUM/2.0) or SEQ > (ITER_NUM/2.0):
|
|
break
|
|
if CMB > SEQ:
|
|
prob["circuit_type"] = "CMB"
|
|
total_CMBs += 1
|
|
elif CMB < SEQ:
|
|
prob["circuit_type"] = "SEQ"
|
|
total_SEQs += 1
|
|
else:
|
|
prob["circuit_type"] = "UNKNOWN"
|
|
total_unknowns += 1
|
|
unknown_list.append(prob["task_id"])
|
|
# save the result into addtional data
|
|
self.add_info_additional_data(prob["task_id"], "circuit_type", prob["circuit_type"])
|
|
ls.save_json_lines(self.additional_data, SAVE_PATH)
|
|
print("%s: %s" % (prob["task_id"], prob["circuit_type"]))
|
|
# save the result into the report
|
|
self.report_txt += "////////// COM or SEQ classification //////////\n"
|
|
self.report_txt += "total tokens used: %d\n" % (total_tokens)
|
|
self.report_txt += "total CMBs: %d\n" % (total_CMBs)
|
|
self.report_txt += "total SEQs: %d\n" % (total_SEQs)
|
|
self.report_txt += "total unknowns: %d\n" % (total_unknowns)
|
|
self.report_txt += "unknown task_id list: %s\n" % (unknown_list)
|
|
self.report_txt += "\n\n"
|
|
self.save_report()
|
|
print("COM or SEQ classification finished!")
|
|
|
|
def task_return_task_id_list(self):
|
|
"""
|
|
return the list of task_id
|
|
"""
|
|
return [i["task_id"] for i in self.data]
|
|
|
|
def task_circuit_type_gen_only_description(self, save_path, step2_algorithm=True):
|
|
# we need circuit_type information to check the correctness
|
|
model = "gpt-4-turbo-2024-04-09"
|
|
txt_out = ""
|
|
unmatched = 0
|
|
tokens = 0
|
|
if not ("circuit_type" in self.data[0].keys()):
|
|
raise ValueError("circuit_type information not found!")
|
|
for task in self.data:
|
|
# step 1, generate the RTL code
|
|
prompt = "Please generate the verilog RTL code according to the following description and header information:\nproblem description" + task["description"] + "\nRTL header:\n" + task["header"] + "\n\nplease only reply verilog codes, no other words."
|
|
message_in = [{"role": "user", "content": prompt}]
|
|
response, info = gpt.llm_call(message_in, model, "config/key_API.json")
|
|
tokens += info["usage"]["total_tokens"]
|
|
task["generated_code"] = gpt.extract_code(response, "verilog")[-1]
|
|
# step 2, classify the generated code
|
|
if step2_algorithm:
|
|
response = circuit_type_by_code(task["generated_code"])
|
|
else:
|
|
prompt = "Please classify the following verilog code into combinational circuit or sequential circuit:\n" + task["generated_code"] + "\nthe circuit description is:\n" + task["description"] + "\n IMPORTANT: please only reply one word as the response. If this verilog code is a combinational circuit, please reply 'CMB'. If this verilog code is a sequential circuit, please reply 'SEQ'. \nVERY IMPORTANT: DO NOT reply anything else."
|
|
system_message = "You are a very smart AI, please classify the following verilog code into combinational circuit (CMB) or sequential circuit (SEQ). You already have the knowledge to do this."
|
|
message_in = [{"role": "user", "content": prompt}]
|
|
response, info = gpt.llm_call(message_in, model, "config/key_API.json", temperature=0.5, system_message=system_message)
|
|
tokens += info["usage"]["total_tokens"]
|
|
if ("CMB" in response) or ("combinational" in response):
|
|
task["generated_circuit_type"] = "CMB"
|
|
elif ("SEQ" in response) or ("sequential" in response):
|
|
task["generated_circuit_type"] = "SEQ"
|
|
else:
|
|
task["generated_circuit_type"] = response
|
|
consistent = task["circuit_type"] == task["generated_circuit_type"]
|
|
unmatched += 1 if not consistent else 0
|
|
new_msg = "[%s] %s\n" % (task["task_id"], "consistent" if consistent else "should be %s, but got %s; code:\n%s" % (task["circuit_type"], task["generated_circuit_type"], task["generated_code"]))
|
|
txt_out += new_msg
|
|
additional_info = "unmatched: %d\n" % (unmatched) + "total tokens: %d\n" % (tokens)
|
|
# resave the txt every time a task is finished
|
|
with open(save_path, 'w') as f:
|
|
f.write(txt_out+additional_info)
|
|
print(new_msg, end="")
|
|
print("Circuit type generation finished! unmatched: %d; tokens: %d" % (unmatched, tokens))
|
|
|
|
def task_gen_circuit_type_strmatch(self, save_path):
|
|
SEQ_keywords = ["clock", "reset", "posedge", "negedge", "clk"]
|
|
unmatched = 0
|
|
if not ("circuit_type" in self.data[0].keys()):
|
|
raise ValueError("circuit_type information not found!")
|
|
for task in self.data:
|
|
# for keyword in SEQ_keywords:
|
|
# if keyword in task["module_code"]:
|
|
# task["circuit_type_strmatch"] = "SEQ"
|
|
# break
|
|
# else:
|
|
# task["circuit_type_strmatch"] = "CMB"
|
|
task["circuit_type_strmatch"] = circuit_type_by_code(task["module_code"])
|
|
if task["circuit_type_strmatch"] != task["circuit_type"]:
|
|
print("%s: should be %s, but got %s" % (task["task_id"], task["circuit_type"], task["circuit_type_strmatch"]))
|
|
unmatched += 1
|
|
print("Circuit type generation finished! unmatched: %d" % (unmatched))
|
|
|
|
# this function is used to merge the data from VerilogEval (originally from HDLBits)
|
|
# we don't need to run this function every time, because the merged data is already saved in the data folder
|
|
def merge_json_from_VerlogEval(VerilogDescription_Human_path, VerilogEval_Human, output_path):
|
|
"""
|
|
jsonl: jsonl is a json file with no limit on the size of the whole file, but each line should be smaller than 2GB
|
|
|
|
#### VerilogDescription_Human.jsonl:
|
|
- task_id: the name of task
|
|
- detail_description: the description in pure text form, from HDLBits website
|
|
|
|
#### VerilogEval_Human.jsonl:
|
|
- task_id: the name of task
|
|
- prompt: the module header in verilog
|
|
- canonical_solution: the canonical verilog solution for the problem (without header). In other words, the verilog code corresponding to the problem
|
|
- test: the golden testbench solution
|
|
|
|
#### output - HDLBits_data.jsonl:
|
|
- task_id: the name of task
|
|
- task_number: the number of task (starting from 1)
|
|
- description: detail_description from VerilogDescription_Human.jsonl
|
|
- header: prompt from VerilogEval_Human.jsonl
|
|
- module_code: prompt + canonical_solution from VerilogEval_Human.jsonl
|
|
- testbench: test from VerilogEval_Human.jsonl
|
|
|
|
"""
|
|
data_discription = load_json_lines(VerilogDescription_Human_path)
|
|
data_eval = load_json_lines(VerilogEval_Human)
|
|
data_merged = []
|
|
for i in range(len(data_discription)):
|
|
# find the corresponding line in data_eval with the same task_id
|
|
for j in range(len(data_eval)):
|
|
if data_discription[i]["task_id"] == data_eval[j]["task_id"]:
|
|
break
|
|
# merge the data
|
|
data_merged.append({
|
|
"task_id": data_discription[i]["task_id"],
|
|
"task_number": i+1,
|
|
"description": data_discription[i]["detail_description"],
|
|
"header": data_eval[j]["prompt"],
|
|
"module_code": data_eval[j]["prompt"] + data_eval[j]["canonical_solution"],
|
|
"testbench": data_eval[j]["test"]
|
|
})
|
|
save_json_lines(data_merged, output_path)
|
|
print("Merging finished!")
|
|
|
|
# this function is a primary function to return the module code according to task_id or task_number
|
|
# now we can use Analyzer to access the data and return the module code
|
|
def return_module_code(id_or_number, output_path=None):
|
|
"""
|
|
- Return the module code according to task_id or task_number.
|
|
- will return to a variable if output_path is not determined
|
|
- otherwise, directly write it into a txt file
|
|
"""
|
|
data = load_json_lines(MERGED_DATA_PATH)
|
|
line = find_line_jsonl(id_or_number, data)
|
|
# write it into txt file
|
|
if output_path is not None:
|
|
with open(output_path, 'w') as f:
|
|
f.write(line["module_code"])
|
|
else:
|
|
return line["module_code"]
|
|
|
|
# def main():
|
|
# merge_json_from_VerlogEval(VerilogDescription_Human_path, VerilogEval_Human, MERGED_DATA_PATH)
|
|
|
|
def circuit_type_by_code(code:str):
|
|
"""
|
|
- input: code
|
|
- output: "CMB" or "SEQ"
|
|
"""
|
|
def string_to_words(string:str):
|
|
words = string.split(" ")
|
|
words = [word for word in words if word != ""]
|
|
return words
|
|
# _SEQ_exit_pos = 0 # for debug
|
|
circuit_type = "CMB" # will be changed to "SEQ" if sequential
|
|
if "always" in code:
|
|
while True:
|
|
always_start = code.find("always")
|
|
if always_start == -1:
|
|
break
|
|
if code[always_start-1] not in [" ", "\n", "\t", ";"]:
|
|
code = code[always_start+6:]
|
|
continue
|
|
elif code[always_start+6] not in [" ", "@"]:
|
|
# check always_ff, _comb and _latch
|
|
if code[always_start+6] == "_":
|
|
always_word = code[always_start:code[always_start:].find(" ")+always_start]
|
|
if always_word == "always_ff" or always_word == "always_latch":
|
|
circuit_type = "SEQ"
|
|
break
|
|
code = code[always_start+6:]
|
|
continue
|
|
# check if there is a begin till next ";"
|
|
next_semicolon = code[always_start:].find(";")
|
|
if "begin" in code[always_start:always_start+next_semicolon]:
|
|
has_begin = True
|
|
always_end = code[always_start:].find("end") + always_start
|
|
else:
|
|
has_begin = False
|
|
always_end = next_semicolon + always_start
|
|
always_block = code[always_start:always_end]
|
|
# currently we use a naive way to check if the always block is sequential or not; will be improved in the future
|
|
# check if () exist for the sensitivity list
|
|
at_pos = always_block.find("@")
|
|
# check the first not-" " character after "@"
|
|
char_pos = at_pos
|
|
for char in always_block[at_pos+1:]:
|
|
char_pos += 1
|
|
if char != " ":
|
|
break
|
|
has_bracket = True if char == "(" else False
|
|
signal_list = []
|
|
if has_bracket:
|
|
sensitivity_list = always_block[always_block.find("(")+1:always_block.find(")")]
|
|
sensitivity_list = sensitivity_list.split(",")
|
|
for signal in sensitivity_list:
|
|
# get none-space words:
|
|
signal_seg = string_to_words(signal)
|
|
if len(signal_seg) > 1 and ("posedge" in signal_seg or "negedge" in signal_seg):
|
|
circuit_type = "SEQ"
|
|
# _SEQ_exit_pos = 1
|
|
break
|
|
signal_list.append(signal_seg[-1])
|
|
else: # no bracket, always @ a begin xxx = xxx end;
|
|
sensitivity_list_end = always_block[char_pos:].find(" ")
|
|
sensitivity_signal = always_block[char_pos:char_pos+sensitivity_list_end]
|
|
signal_list.append(sensitivity_signal)
|
|
if "*" in signal_list:
|
|
code = code[always_end:]
|
|
continue
|
|
if circuit_type == "SEQ":
|
|
# _SEQ_exit_pos = 2
|
|
break
|
|
else:
|
|
break_always_block = string_to_words(always_block)
|
|
if "<=" in break_always_block:
|
|
circuit_type = "SEQ"
|
|
# currently we use a naive way. Following codes are skipped
|
|
# check_next_signal = False
|
|
# for seg in break_always_block:
|
|
# if check_next_signal:
|
|
# if seg not in signal_list:
|
|
# circuit_type = "SEQ"
|
|
# break
|
|
# if "=" in seg:
|
|
# check_next_signal = True
|
|
# else:
|
|
# check_next_signal = False
|
|
if circuit_type == "SEQ":
|
|
# _SEQ_exit_pos = 3
|
|
break
|
|
code = code[always_end:]
|
|
return circuit_type
|
|
"""discarded main scripts"""
|
|
""" generate CMB15 and SEQ15
|
|
probset_pure = HDLBitsData(FATHER_DATA_PATH)
|
|
probset_with_type = HDLBitsData(FATHER_DATA_PATH, circuit_type_path=CIRCUIT_TYPE_PATH)
|
|
# rule110 = probset_pure.access_task_id("rule110")
|
|
# probset_with_type.data_clean(filter={"circuit_type": "CMB"})
|
|
probset_with_type.filter_data_by_dict({"circuit_type": "CMB"})
|
|
task_list = probset_with_type.task_return_task_id_list()
|
|
probset_pure.data_clean(only=task_list)
|
|
print(probset_pure.num)
|
|
probset_pure.task_miniset_gen(mini_set_size=15, suffix="CMB15") """
|
|
|
|
if __name__ == "__main__":
|
|
# return_module_code("rule110", "data/HDLBits/rule110.txt")
|
|
main() |