From 05504bb2976ef1851afbb4f00c235d7c5efc5292 Mon Sep 17 00:00:00 2001 From: JYF <1470892937@qq.com> Date: Thu, 5 Feb 2026 16:34:31 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=E3=80=8C/=E3=80=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- LlmStaticAnalyze.py | 517 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 517 insertions(+) create mode 100644 LlmStaticAnalyze.py diff --git a/LlmStaticAnalyze.py b/LlmStaticAnalyze.py new file mode 100644 index 0000000..06f0a15 --- /dev/null +++ b/LlmStaticAnalyze.py @@ -0,0 +1,517 @@ +import tree_sitter +import faiss +from openai import OpenAI + +from tree_sitter import Language, Parser +import tree_sitter_c +import tree_sitter_cpp +import os +import json +import requests +import chardet + +# Load C language +CPP_LANGUAGE = Language(tree_sitter_cpp.language()) +parser = Parser() +parser.language = CPP_LANGUAGE +CPP_EXTENSIONS = {'.c', '.cpp', '.h', '.hpp', '.tcc'} +IGNORE_DIRS = {'build', 'cmake-build', '.git', 'vendor', 'lib', 'external', 'Debug'} +from tree_sitter import Language, Parser + + +def read_gbk_file(filepath): + with open(filepath, 'rb') as f: + raw_data = f.read() + encoding = chardet.detect(raw_data)['encoding'] + if encoding.lower() in ('gbk', 'gb2312', 'cp936'): + return raw_data.decode('gbk') + else: + return raw_data.decode('utf-8') # fallback + + +def find_c_files(project_path): + """ + 递归查找项目文件夹中的所有 C/C++ 文件 + """ + c_files = [] + for root, dirs, files in os.walk(project_path): + # 过滤需要忽略的目录 + dirs[:] = [d for d in dirs if d not in IGNORE_DIRS] + + for file in files: + if any(file.endswith(ext) for ext in CPP_EXTENSIONS): + full_path = os.path.join(root, file) + c_files.append(full_path) + return c_files + + +def extract_functions_from_file(file_path): + """ + 从单个文件中提取所有函数 + """ + try: + # 检测文件编码并读取 + with open(file_path, 'rb') as f: + raw_data = f.read() + encoding = chardet.detect(raw_data)['encoding'] + if encoding.lower() in ('gbk', 'gb2312', 'cp936'): + text_str = raw_data.decode('gbk') + else: + text_str = raw_data.decode('utf-8') + + code_bytes = text_str.encode('utf-8') + functions = extract_functions(code_bytes) + + # 为每个函数添加文件路径信息 + for func in functions: + func['file_path'] = file_path + func['file_name'] = os.path.basename(file_path) + + return functions + except Exception as e: + print(f"处理文件 {file_path} 时出错: {e}") + return [] + + +def extract_functions(code_bytes: bytes): + tree = parser.parse(code_bytes) + root_node = tree.root_node + functions = [] + + def traverse(node): + if node.type == 'function_definition': + start = node.start_byte + end = node.end_byte + func_code = code_bytes[start:end].decode('utf-8') + func_name = get_function_name(node, code_bytes) + functions.append({ + 'name': func_name, + 'code': func_code, + 'start_line': node.start_point[0] + 1, + 'end_line': node.end_point[0] + 1 + }) + else: + for child in node.children: + traverse(child) + + traverse(root_node) + return functions + + +def find_declarator(node): + """ + 递归查找 declarator 节点。 + 在 C 语言中,函数名位于 declarator -> ... -> identifier 中。 + 我们的目标是找到最内层的 declarator(通常是 function_declarator)。 + """ + # 如果当前节点本身就是 declarator 类型,直接返回 + if node.type in ('declarator', 'function_declarator', 'pointer_declarator'): + return node + + # 否则在其子节点中递归查找 + for child in node.children: + # declarator 通常出现在 declaration 的第2个或之后的子节点中 + # 常见结构: [storage_class?, type, declarator, ...] + result = find_declarator(child) + if result is not None: + return result + + return None + + +def get_function_name(func_node, code_bytes: bytes): + """ + 从 function_definition 节点中提取函数名。 + code_bytes: 原始源码的 bytes 对象(UTF-8 编码) + """ + declarator = None + + for child in func_node.children: + if child.type == 'function_declarator': + declarator = child + break + elif child.type == 'declaration': + declarator = find_declarator(child) + if declarator: + break + + if declarator is None: + return "" + + def find_identifier(node): + if node.type == 'identifier': + return node + for child in node.children: + found = find_identifier(child) + if found: + return found + return None + + ident_node = find_identifier(declarator) + if ident_node: + name_bytes = code_bytes[ident_node.start_byte:ident_node.end_byte] + return name_bytes.decode('utf-8') + else: + return "" + + +MAX_CHAR_LENGTH = 60000 # ~15k tokens +import pandas as pd + + +def load_review_rules(excel_path: str): + """ + 从 Excel 加载评审规则。 + 返回列表:[{"id": 1, "description": "..."}, ...] + 仅包含第3列为"是"的规则。 + """ + df = pd.read_excel(excel_path, header=None, engine='openpyxl') + + # 假设:第0列=序号(可选),第1列=规则描述(B列),第2列=是否启用(C列) + rules = [] + for idx, row in df.iterrows(): + # 兼容不同长度行 + if len(row) < 3: + continue + desc = str(row[1]).strip() if pd.notna(row[1]) else "" + enabled = str(row[2]).strip() if pd.notna(row[2]) else "" + if desc and enabled == "是": + rule_id = str(row[0]).strip() if pd.notna(row[0]) else f"R{idx + 1}" + rules.append({ + 'id': rule_id, + 'description': desc + }) + return rules + + +def save_results_to_excel(results, output_path): + """ + 将分析结果保存为 Excel 文件。 + + :param results: List[Dict], 每个元素包含 keys: + 'function_name', 'rule_name', 'has_issue', 'explanation' + :param output_path: str, 输出 Excel 路径,如 "audit_report.xlsx" + """ + df = pd.DataFrame(results, columns=[ + "文件名", + "函数名", + "规则名", + "是否缺陷", + "解释规则" + ]) + # 自动调整列宽(可选,提升可读性) + with pd.ExcelWriter(output_path, engine='openpyxl') as writer: + df.to_excel(writer, index=False, sheet_name='静态分析结果') + worksheet = writer.sheets['静态分析结果'] + for column in worksheet.columns: + max_length = 0 + column_letter = column[0].column_letter + for cell in column: + try: + if len(str(cell.value)) > max_length: + max_length = len(str(cell.value)) + except: + pass + adjusted_width = min(max_length + 2, 80) # 最大列宽限制为80 + worksheet.column_dimensions[column_letter].width = adjusted_width + print(f"分析结果已保存至: {output_path}") + + +def split_long_function(func_code: str) -> list[str]: + if len(func_code) <= MAX_CHAR_LENGTH: + return [func_code] + # 简单按行分片(更优方式:按语句块分割) + lines = func_code.splitlines(keepends=True) + chunks = [] + current = [] + current_len = 0 + for line in lines: + if current_len + len(line) > MAX_CHAR_LENGTH and current: + chunks.append(''.join(current)) + current = [line] + current_len = len(line) + else: + current.append(line) + current_len += len(line) + if current: + chunks.append(''.join(current)) + return chunks + + +import os +from openai import OpenAI + +client = OpenAI( + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + api_key="sk-b51bf9fea0b3447896ce6d3a150f4a67" +) + +RULE_BASED_PROMPT = """你是一名 C 语言安全审计专家。请根据以下特定编码或安全规则,严格审查提供的函数代码片段: + +【审查规则】 +{rule_description} + +【代码上下文】 +- 当前为函数 "{function_name}" 的第 {chunk_index}/{total_chunks} 片段 +- 若函数被分片,请综合所有片段判断;若仅提供一片,则基于此片判断 + +【审查要求】 +- 如果此代码片段**明确违反**上述规则,请指出:违规位置、风险说明、修复建议; +- 如果**未发现违规**,请回答:"本片段未发现违反该规则的问题"; +- 如果因**代码不完整无法判断**(如只看到函数开头),请回答:"需结合完整函数判断"; +- 回答必须简洁、基于事实,用中文,不超过50字。 +【回答要求】 +-请严格按以下 JSON 格式回答,不要包含任何额外文本、注释或 Markdown: +{{ +"has_issue": true 或 false, +"explanation": "简明解释为何存在或不存在该问题" +}} +【函数代码片段】 +c +{code_chunk} +""" +def analyze_function_chunk_against_rule( + client, + func_name: str, + code_chunk: str, + rule_desc: str, + chunk_index: int, + total_chunks: int, + model: str = "qwen-max" +) -> str: + prompt = RULE_BASED_PROMPT.format( + rule_description=rule_desc, + function_name=func_name, + chunk_index=chunk_index, + total_chunks=total_chunks, + code_chunk=code_chunk + ) + try: + response = client.chat.completions.create( + model=model, + messages=[{"role": "user", "content": prompt}], + temperature=0.1, + max_tokens=500 + ) + return response.choices[0].message.content.strip() + except Exception as e: + return f"[API Error] {str(e)}" +def analyze_function_against_rules(func_info: dict, rules: list, client, model: str = "qwen-max") -> dict: + """ + 对单个函数应用所有启用的规则。 + 若函数过长则分片,每片都送入同一条规则分析,最后合并结果。 + """ + func_name = func_info['name'] + full_code = func_info['code'] + + # 分片(复用之前的 split_long_function,但保留行信息更好) + chunks = split_long_function(full_code) # 返回 list[str] + total = len(chunks) + + rule_results = {} + + import json + + # 假设 analyze_function_chunk_against_rule 已按新 prompt 返回纯 JSON 字符串 + # 且 LLM 被强制要求只输出 JSON(无多余文本) + + for rule in rules: + rule_id = rule['id'] + rule_desc = rule['description'] + chunk_results = [] # 存储每个 chunk 的结构化结果:{has_issue: bool, explanation: str} + + if total == 1: + # 不分片 + raw_response = analyze_function_chunk_against_rule( + client, func_name, chunks[0], rule_desc, + chunk_index=1, total_chunks=1, model=model + ) + try: + res = json.loads(raw_response.strip()) + chunk_results.append({ + 'has_issue': bool(res.get('has_issue', False)), + 'explanation': str(res.get('explanation', '')) + }) + except (json.JSONDecodeError, AttributeError): + # 解析失败时降级处理 + chunk_results.append({ + 'has_issue': False, + 'explanation': f"[解析失败] 原始响应: {raw_response[:200]}..." + }) + else: + # 分片分析 + for i, chunk in enumerate(chunks): + raw_response = analyze_function_chunk_against_rule( + client, func_name, chunk, rule_desc, + chunk_index=i + 1, total_chunks=total, model=model + ) + try: + res = json.loads(raw_response.strip()) + chunk_results.append({ + 'has_issue': bool(res.get('has_issue', False)), + 'explanation': f"【片段{i + 1}】{res.get('explanation', '')}" + }) + except (json.JSONDecodeError, AttributeError): + chunk_results.append({ + 'has_issue': False, + 'explanation': f"【片段{i + 1}】[解析失败] 原始响应: {raw_response[:200]}..." + }) + + # 合并分片结果:只要有一个片段存在缺陷,整体视为有缺陷 + final_has_issue = any(item['has_issue'] for item in chunk_results) + final_explanation = "\n".join(item['explanation'] for item in chunk_results) + + rule_results[rule_id] = { + 'rule_description': rule_desc, + 'has_issue': final_has_issue, # 新增结构化字段 + 'analysis': final_explanation # 保留原始解释用于报告 + } + + return rule_results +def split_long_function(func_code: str, max_lines: int = 1600) -> list[str]: + """ + 按行分片,尽量避免在 {} 中间切断(简单启发式)。 + """ + lines = func_code.splitlines(keepends=True) + if len(lines) <= max_lines: + return [func_code] + + chunks = [] + current = [] + brace_depth = 0 + + for line in lines: + current.append(line) + # 粗略跟踪花括号深度 + brace_depth += line.count('{') - line.count('}') + + if len(current) >= max_lines and brace_depth <= 0: + # 在块结束处分片 + chunks.append(''.join(current)) + current = [] + brace_depth = 0 + + if current: + chunks.append(''.join(current)) + + # 防御:万一 brace_depth 始终 >0,强制分片 + if not chunks: + # fallback: 每 max_lines 行一切 + for i in range(0, len(lines), max_lines): + chunks.append(''.join(lines[i:i + max_lines])) + + return chunks +def main(project_path: str, rules_excel: str, output_report: str = "rule_based_report.md"): + print("正在加载评审规则...") + rules = load_review_rules(rules_excel) + print(f"共加载 {len(rules)} 条启用的规则") + print("正在读取 GBK 编码源文件...") + print(f"正在扫描项目文件夹: {project_path}") + c_files = find_c_files(project_path) + print(f"找到 {len(c_files)} 个 C/C++ 文件") + + if not c_files: + print("未找到任何 C/C++ 文件,请检查项目路径") + return + + # 提取所有文件中的函数 + all_functions = [] + for file_path in c_files: + print(f"正在处理文件: {file_path}") + functions = extract_functions_from_file(file_path) + all_functions.extend(functions) + print(f" 找到 {len(functions)} 个函数") + + print(f"共找到 {len(all_functions)} 个函数") + + if not all_functions: + print("未找到任何函数,分析结束") + return + + # 初始化 OpenAI/Qwen 客户端 + client = OpenAI( + base_url=os.getenv("QWEN_API_BASE", "https://dashscope.aliyuncs.com/compatible-mode/v1"), + api_key=os.getenv("QWEN_API_KEY", "sk-b51bf9fea0b3447896ce6d3a150f4a67") + ) + + all_results = [] + + for i, func in enumerate(all_functions, 1): + print(f"\n分析函数 {i}/{len(all_functions)}: {func['name']} (来自 {func['file_name']})") + rule_results = analyze_function_against_rules(func, rules, client) + print(rule_results) + all_results.append({ + 'function': func, + 'rule_results': rule_results + }) + + # 生成 Excel 报告 + excel_rows = [] + for item in all_results: + func_info = item['function'] + func_name = func_info['name'] + file_name = func_info['file_name'] + + for rule_id, res in item['rule_results'].items(): + issue_flag = "是" if res.get('has_issue', False) else "否" + + excel_rows.append({ + "文件名": file_name, + "函数名": func_name, + "规则名": f"[{rule_id}] {res['rule_description']}", + "是否缺陷": issue_flag, + "解释规则": res['analysis'] + }) + + # 调用保存 Excel 的函数 + excel_output_path = output_report.replace('.md', '.xlsx') + save_results_to_excel(excel_rows, excel_output_path) + + # 生成 Markdown 报告 + with open(output_report, 'w', encoding='utf-8') as f: + f.write("# 基于规则的 C 函数静态分析报告\n\n") + f.write(f"**项目路径**: `{project_path}`\n\n") + f.write(f"**规则来源**: `{rules_excel}`\n\n") + f.write(f"**分析文件数**: {len(c_files)} 个\n\n") + f.write(f"**共分析函数**: {len(all_functions)} 个\n\n") + f.write(f"**启用规则数**: {len(rules)} 条\n\n---\n\n") + + # 按文件分组显示结果 + files_dict = {} + for item in all_results: + func = item['function'] + file_path = func['file_path'] + if file_path not in files_dict: + files_dict[file_path] = [] + files_dict[file_path].append(item) + + for file_path, items in files_dict.items(): + f.write(f"## 文件: `{os.path.basename(file_path)}`\n\n") + f.write(f"**完整路径**: {file_path}\n\n") + + for item in items: + func = item['function'] + f.write(f"### 函数: `{func['name']}` (L{func['start_line']}-L{func['end_line']})\n\n") + f.write("```c\n") + snippet = func['code'][:1500] + ("..." if len(func['code']) > 1500 else "") + f.write(snippet) + f.write("\n```\n\n") + + for rule_id, res in item['rule_results'].items(): + f.write(f"#### 规则 [{rule_id}] {res['rule_description']}\n\n") + f.write(f"{res['analysis']}\n\n") + f.write("---\n\n") + + print(f"\n分析完成!") + print(f"Excel 报告已保存至: {excel_output_path}") + print(f"Markdown 报告已保存至: {output_report}") +if __name__ == "__main__": + import os + os.environ["QWEN_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + os.environ["QWEN_API_KEY"] = "" + + main( + project_path="", + rules_excel="审查规则.xlsx", + output_report="audit_report.md" + ) \ No newline at end of file