import tree_sitter import faiss from openai import OpenAI from tree_sitter import Language, Parser import tree_sitter_c import tree_sitter_cpp import os import json import requests import chardet # Load C language CPP_LANGUAGE = Language(tree_sitter_cpp.language()) parser = Parser() parser.language = CPP_LANGUAGE CPP_EXTENSIONS = {'.c', '.cpp', '.h', '.hpp', '.tcc'} IGNORE_DIRS = {'build', 'cmake-build', '.git', 'vendor', 'lib', 'external', 'Debug'} from tree_sitter import Language, Parser def read_gbk_file(filepath): with open(filepath, 'rb') as f: raw_data = f.read() encoding = chardet.detect(raw_data)['encoding'] if encoding.lower() in ('gbk', 'gb2312', 'cp936'): return raw_data.decode('gbk') else: return raw_data.decode('utf-8') # fallback def find_c_files(project_path): """ 递归查找项目文件夹中的所有 C/C++ 文件 """ c_files = [] for root, dirs, files in os.walk(project_path): # 过滤需要忽略的目录 dirs[:] = [d for d in dirs if d not in IGNORE_DIRS] for file in files: if any(file.endswith(ext) for ext in CPP_EXTENSIONS): full_path = os.path.join(root, file) c_files.append(full_path) return c_files def extract_functions_from_file(file_path): """ 从单个文件中提取所有函数 """ try: # 检测文件编码并读取 with open(file_path, 'rb') as f: raw_data = f.read() encoding = chardet.detect(raw_data)['encoding'] if encoding.lower() in ('gbk', 'gb2312', 'cp936'): text_str = raw_data.decode('gbk') else: text_str = raw_data.decode('utf-8') code_bytes = text_str.encode('utf-8') functions = extract_functions(code_bytes) # 为每个函数添加文件路径信息 for func in functions: func['file_path'] = file_path func['file_name'] = os.path.basename(file_path) return functions except Exception as e: print(f"处理文件 {file_path} 时出错: {e}") return [] def extract_functions(code_bytes: bytes): tree = parser.parse(code_bytes) root_node = tree.root_node functions = [] def traverse(node): if node.type == 'function_definition': start = node.start_byte end = node.end_byte func_code = code_bytes[start:end].decode('utf-8') func_name = get_function_name(node, code_bytes) functions.append({ 'name': func_name, 'code': func_code, 'start_line': node.start_point[0] + 1, 'end_line': node.end_point[0] + 1 }) else: for child in node.children: traverse(child) traverse(root_node) return functions def find_declarator(node): """ 递归查找 declarator 节点。 在 C 语言中,函数名位于 declarator -> ... -> identifier 中。 我们的目标是找到最内层的 declarator(通常是 function_declarator)。 """ # 如果当前节点本身就是 declarator 类型,直接返回 if node.type in ('declarator', 'function_declarator', 'pointer_declarator'): return node # 否则在其子节点中递归查找 for child in node.children: # declarator 通常出现在 declaration 的第2个或之后的子节点中 # 常见结构: [storage_class?, type, declarator, ...] result = find_declarator(child) if result is not None: return result return None def get_function_name(func_node, code_bytes: bytes): """ 从 function_definition 节点中提取函数名。 code_bytes: 原始源码的 bytes 对象(UTF-8 编码) """ declarator = None for child in func_node.children: if child.type == 'function_declarator': declarator = child break elif child.type == 'declaration': declarator = find_declarator(child) if declarator: break if declarator is None: return "" def find_identifier(node): if node.type == 'identifier': return node for child in node.children: found = find_identifier(child) if found: return found return None ident_node = find_identifier(declarator) if ident_node: name_bytes = code_bytes[ident_node.start_byte:ident_node.end_byte] return name_bytes.decode('utf-8') else: return "" MAX_CHAR_LENGTH = 60000 # ~15k tokens import pandas as pd def load_review_rules(excel_path: str): """ 从 Excel 加载评审规则。 返回列表:[{"id": 1, "description": "..."}, ...] 仅包含第3列为"是"的规则。 """ df = pd.read_excel(excel_path, header=None, engine='openpyxl') # 假设:第0列=序号(可选),第1列=规则描述(B列),第2列=是否启用(C列) rules = [] for idx, row in df.iterrows(): # 兼容不同长度行 if len(row) < 3: continue desc = str(row[1]).strip() if pd.notna(row[1]) else "" enabled = str(row[2]).strip() if pd.notna(row[2]) else "" if desc and enabled == "是": rule_id = str(row[0]).strip() if pd.notna(row[0]) else f"R{idx + 1}" rules.append({ 'id': rule_id, 'description': desc }) return rules def save_results_to_excel(results, output_path): """ 将分析结果保存为 Excel 文件。 :param results: List[Dict], 每个元素包含 keys: 'function_name', 'rule_name', 'has_issue', 'explanation' :param output_path: str, 输出 Excel 路径,如 "audit_report.xlsx" """ df = pd.DataFrame(results, columns=[ "文件名", "函数名", "规则名", "是否缺陷", "解释规则" ]) # 自动调整列宽(可选,提升可读性) with pd.ExcelWriter(output_path, engine='openpyxl') as writer: df.to_excel(writer, index=False, sheet_name='静态分析结果') worksheet = writer.sheets['静态分析结果'] for column in worksheet.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = min(max_length + 2, 80) # 最大列宽限制为80 worksheet.column_dimensions[column_letter].width = adjusted_width print(f"分析结果已保存至: {output_path}") def split_long_function(func_code: str) -> list[str]: if len(func_code) <= MAX_CHAR_LENGTH: return [func_code] # 简单按行分片(更优方式:按语句块分割) lines = func_code.splitlines(keepends=True) chunks = [] current = [] current_len = 0 for line in lines: if current_len + len(line) > MAX_CHAR_LENGTH and current: chunks.append(''.join(current)) current = [line] current_len = len(line) else: current.append(line) current_len += len(line) if current: chunks.append(''.join(current)) return chunks import os from openai import OpenAI client = OpenAI( base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-b51bf9fea0b3447896ce6d3a150f4a67" ) RULE_BASED_PROMPT = """你是一名 C 语言安全审计专家。请根据以下特定编码或安全规则,严格审查提供的函数代码片段: 【审查规则】 {rule_description} 【代码上下文】 - 当前为函数 "{function_name}" 的第 {chunk_index}/{total_chunks} 片段 - 若函数被分片,请综合所有片段判断;若仅提供一片,则基于此片判断 【审查要求】 - 如果此代码片段**明确违反**上述规则,请指出:违规位置、风险说明、修复建议; - 如果**未发现违规**,请回答:"本片段未发现违反该规则的问题"; - 如果因**代码不完整无法判断**(如只看到函数开头),请回答:"需结合完整函数判断"; - 回答必须简洁、基于事实,用中文,不超过50字。 【回答要求】 -请严格按以下 JSON 格式回答,不要包含任何额外文本、注释或 Markdown: {{ "has_issue": true 或 false, "explanation": "简明解释为何存在或不存在该问题" }} 【函数代码片段】 c {code_chunk} """ def analyze_function_chunk_against_rule( client, func_name: str, code_chunk: str, rule_desc: str, chunk_index: int, total_chunks: int, model: str = "qwen-max" ) -> str: prompt = RULE_BASED_PROMPT.format( rule_description=rule_desc, function_name=func_name, chunk_index=chunk_index, total_chunks=total_chunks, code_chunk=code_chunk ) try: response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], temperature=0.1, max_tokens=500 ) return response.choices[0].message.content.strip() except Exception as e: return f"[API Error] {str(e)}" def analyze_function_against_rules(func_info: dict, rules: list, client, model: str = "qwen-max") -> dict: """ 对单个函数应用所有启用的规则。 若函数过长则分片,每片都送入同一条规则分析,最后合并结果。 """ func_name = func_info['name'] full_code = func_info['code'] # 分片(复用之前的 split_long_function,但保留行信息更好) chunks = split_long_function(full_code) # 返回 list[str] total = len(chunks) rule_results = {} import json # 假设 analyze_function_chunk_against_rule 已按新 prompt 返回纯 JSON 字符串 # 且 LLM 被强制要求只输出 JSON(无多余文本) for rule in rules: rule_id = rule['id'] rule_desc = rule['description'] chunk_results = [] # 存储每个 chunk 的结构化结果:{has_issue: bool, explanation: str} if total == 1: # 不分片 raw_response = analyze_function_chunk_against_rule( client, func_name, chunks[0], rule_desc, chunk_index=1, total_chunks=1, model=model ) try: res = json.loads(raw_response.strip()) chunk_results.append({ 'has_issue': bool(res.get('has_issue', False)), 'explanation': str(res.get('explanation', '')) }) except (json.JSONDecodeError, AttributeError): # 解析失败时降级处理 chunk_results.append({ 'has_issue': False, 'explanation': f"[解析失败] 原始响应: {raw_response[:200]}..." }) else: # 分片分析 for i, chunk in enumerate(chunks): raw_response = analyze_function_chunk_against_rule( client, func_name, chunk, rule_desc, chunk_index=i + 1, total_chunks=total, model=model ) try: res = json.loads(raw_response.strip()) chunk_results.append({ 'has_issue': bool(res.get('has_issue', False)), 'explanation': f"【片段{i + 1}】{res.get('explanation', '')}" }) except (json.JSONDecodeError, AttributeError): chunk_results.append({ 'has_issue': False, 'explanation': f"【片段{i + 1}】[解析失败] 原始响应: {raw_response[:200]}..." }) # 合并分片结果:只要有一个片段存在缺陷,整体视为有缺陷 final_has_issue = any(item['has_issue'] for item in chunk_results) final_explanation = "\n".join(item['explanation'] for item in chunk_results) rule_results[rule_id] = { 'rule_description': rule_desc, 'has_issue': final_has_issue, # 新增结构化字段 'analysis': final_explanation # 保留原始解释用于报告 } return rule_results def split_long_function(func_code: str, max_lines: int = 1600) -> list[str]: """ 按行分片,尽量避免在 {} 中间切断(简单启发式)。 """ lines = func_code.splitlines(keepends=True) if len(lines) <= max_lines: return [func_code] chunks = [] current = [] brace_depth = 0 for line in lines: current.append(line) # 粗略跟踪花括号深度 brace_depth += line.count('{') - line.count('}') if len(current) >= max_lines and brace_depth <= 0: # 在块结束处分片 chunks.append(''.join(current)) current = [] brace_depth = 0 if current: chunks.append(''.join(current)) # 防御:万一 brace_depth 始终 >0,强制分片 if not chunks: # fallback: 每 max_lines 行一切 for i in range(0, len(lines), max_lines): chunks.append(''.join(lines[i:i + max_lines])) return chunks def main(project_path: str, rules_excel: str, output_report: str = "rule_based_report.md"): print("正在加载评审规则...") rules = load_review_rules(rules_excel) print(f"共加载 {len(rules)} 条启用的规则") print("正在读取 GBK 编码源文件...") print(f"正在扫描项目文件夹: {project_path}") c_files = find_c_files(project_path) print(f"找到 {len(c_files)} 个 C/C++ 文件") if not c_files: print("未找到任何 C/C++ 文件,请检查项目路径") return # 提取所有文件中的函数 all_functions = [] for file_path in c_files: print(f"正在处理文件: {file_path}") functions = extract_functions_from_file(file_path) all_functions.extend(functions) print(f" 找到 {len(functions)} 个函数") print(f"共找到 {len(all_functions)} 个函数") if not all_functions: print("未找到任何函数,分析结束") return # 初始化 OpenAI/Qwen 客户端 client = OpenAI( base_url=os.getenv("QWEN_API_BASE", "https://dashscope.aliyuncs.com/compatible-mode/v1"), api_key=os.getenv("QWEN_API_KEY", "sk-b51bf9fea0b3447896ce6d3a150f4a67") ) all_results = [] for i, func in enumerate(all_functions, 1): print(f"\n分析函数 {i}/{len(all_functions)}: {func['name']} (来自 {func['file_name']})") rule_results = analyze_function_against_rules(func, rules, client) print(rule_results) all_results.append({ 'function': func, 'rule_results': rule_results }) # 生成 Excel 报告 excel_rows = [] for item in all_results: func_info = item['function'] func_name = func_info['name'] file_name = func_info['file_name'] for rule_id, res in item['rule_results'].items(): issue_flag = "是" if res.get('has_issue', False) else "否" excel_rows.append({ "文件名": file_name, "函数名": func_name, "规则名": f"[{rule_id}] {res['rule_description']}", "是否缺陷": issue_flag, "解释规则": res['analysis'] }) # 调用保存 Excel 的函数 excel_output_path = output_report.replace('.md', '.xlsx') save_results_to_excel(excel_rows, excel_output_path) # 生成 Markdown 报告 with open(output_report, 'w', encoding='utf-8') as f: f.write("# 基于规则的 C 函数静态分析报告\n\n") f.write(f"**项目路径**: `{project_path}`\n\n") f.write(f"**规则来源**: `{rules_excel}`\n\n") f.write(f"**分析文件数**: {len(c_files)} 个\n\n") f.write(f"**共分析函数**: {len(all_functions)} 个\n\n") f.write(f"**启用规则数**: {len(rules)} 条\n\n---\n\n") # 按文件分组显示结果 files_dict = {} for item in all_results: func = item['function'] file_path = func['file_path'] if file_path not in files_dict: files_dict[file_path] = [] files_dict[file_path].append(item) for file_path, items in files_dict.items(): f.write(f"## 文件: `{os.path.basename(file_path)}`\n\n") f.write(f"**完整路径**: {file_path}\n\n") for item in items: func = item['function'] f.write(f"### 函数: `{func['name']}` (L{func['start_line']}-L{func['end_line']})\n\n") f.write("```c\n") snippet = func['code'][:1500] + ("..." if len(func['code']) > 1500 else "") f.write(snippet) f.write("\n```\n\n") for rule_id, res in item['rule_results'].items(): f.write(f"#### 规则 [{rule_id}] {res['rule_description']}\n\n") f.write(f"{res['analysis']}\n\n") f.write("---\n\n") print(f"\n分析完成!") print(f"Excel 报告已保存至: {excel_output_path}") print(f"Markdown 报告已保存至: {output_report}") if __name__ == "__main__": import os os.environ["QWEN_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" os.environ["QWEN_API_KEY"] = "" main( project_path="", rules_excel="审查规则.xlsx", output_report="audit_report.md" )