# static_analyzer.py import os import pandas as pd import json from pathlib import Path import chardet import tree_sitter import tree_sitter_c import tree_sitter_cpp from tree_sitter import Language, Parser from openai import OpenAI from config import QWEN_API_KEY, QWEN_API_URL # 从主程序配置导入 # 初始化 Tree-sitter CPP_LANGUAGE = Language(tree_sitter_cpp.language()) parser = Parser() parser.language = CPP_LANGUAGE # 常量定义 CPP_EXTENSIONS = {'.c', '.cpp', '.h', '.hpp', '.tcc'} IGNORE_DIRS = {'build', 'cmake-build', '.git', 'vendor', 'lib', 'external', 'Debug'} MAX_CHAR_LENGTH = 60000 # static_analyzer.py import os import pandas as pd import json from pathlib import Path import chardet import tree_sitter import tree_sitter_c import tree_sitter_cpp from tree_sitter import Language, Parser from openai import OpenAI # 注意:这里不直接导入 QWEN_API_KEY 和 QWEN_API_BASE # 而是通过 getattr 安全地获取 config 模块中的值 try: from config import QWEN_API_KEY, QWEN_API_BASE except ImportError: # 如果导入失败,设置默认值 QWEN_API_KEY = None QWEN_API_BASE = None # 初始化 Tree-sitter CPP_LANGUAGE = Language(tree_sitter_cpp.language()) parser = Parser() parser.language = CPP_LANGUAGE # 常量定义 CPP_EXTENSIONS = {'.c', '.cpp', '.h', '.hpp', '.tcc'} IGNORE_DIRS = {'build', 'cmake-build', '.git', 'vendor', 'lib', 'external', 'Debug'} MAX_CHAR_LENGTH = 60000 class StaticAnalyzer: """静态分析器,封装代码解析与规则审查逻辑""" def __init__(self, api_key=None, api_base=None): # 安全地获取配置值 config_api_key = None config_api_base = None try: # 尝试从 config 模块获取值 import config config_api_key = getattr(config, 'QWEN_API_KEY', None) config_api_base = getattr(config, 'QWEN_API_URL', None) except ImportError: # 如果导入失败,使用在文件开头定义的变量 config_api_key = QWEN_API_KEY config_api_base = QWEN_API_BASE # 优先使用传入的参数,然后使用 config 中的配置 actual_api_key = api_key or config_api_key actual_api_base = api_base or config_api_base # 验证 API 配置 if not actual_api_key: print("警告: API Key 未配置") print("请在 config.py 中设置 QWEN_API_KEY 或传入 api_key 参数") if not actual_api_base: print("警告: API Base URL 未配置") print("请在 config.py 中设置 QWEN_API_BASE 或传入 api_base 参数") # 设置默认的 API Base URL actual_api_base = "https://dashscope.aliyuncs.com/compatible-mode/v1" # 确保 api_base 是字符串,而不是函数 if callable(actual_api_base): print("警告: QWEN_API_BASE 是一个函数而不是字符串,使用默认值") actual_api_base = "https://dashscope.aliyuncs.com/compatible-mode/v1" print(f"API 配置信息:") print(f" - API Key: {'已设置' if actual_api_key else '未设置'}") print(f" - API Base URL: {actual_api_base}") # 初始化 OpenAI 客户端 self.client = OpenAI( base_url=actual_api_base, # 确保这是字符串 api_key=actual_api_key ) # ... 以下代码保持不变 ... def find_c_files(self, project_path): """递归查找项目文件夹中的所有 C/C++ 文件""" c_files = [] for root, dirs, files in os.walk(project_path): dirs[:] = [d for d in dirs if d not in IGNORE_DIRS] for file in files: if any(file.endswith(ext) for ext in CPP_EXTENSIONS): full_path = os.path.join(root, file) c_files.append(full_path) return c_files def extract_functions_from_file(self, file_path): """从单个文件中提取所有函数(包含GBK编码支持)""" try: with open(file_path, 'rb') as f: raw_data = f.read() encoding = chardet.detect(raw_data)['encoding'] if encoding.lower() in ('gbk', 'gb2312', 'cp936'): text_str = raw_data.decode('gbk') else: text_str = raw_data.decode('utf-8') code_bytes = text_str.encode('utf-8') functions = self._extract_functions(code_bytes) for func in functions: func['file_path'] = file_path func['file_name'] = os.path.basename(file_path) return functions except Exception as e: print(f"处理文件 {file_path} 时出错: {e}") return [] def _extract_functions(self, code_bytes: bytes): """使用 Tree-sitter 解析并提取函数定义""" tree = parser.parse(code_bytes) root_node = tree.root_node functions = [] def traverse(node): if node.type == 'function_definition': start = node.start_byte end = node.end_byte func_code = code_bytes[start:end].decode('utf-8') func_name = self._get_function_name(node, code_bytes) functions.append({ 'name': func_name, 'code': func_code, 'start_line': node.start_point[0] + 1, 'end_line': node.end_point[0] + 1 }) else: for child in node.children: traverse(child) traverse(root_node) return functions def _get_function_name(self, func_node, code_bytes: bytes): """提取函数名""" def find_declarator(node): if node.type in ('declarator', 'function_declarator', 'pointer_declarator'): return node for child in node.children: result = find_declarator(child) if result is not None: return result return None def find_identifier(node): if node.type == 'identifier': return node for child in node.children: found = find_identifier(child) if found: return found return None declarator = None for child in func_node.children: if child.type == 'function_declarator': declarator = child break elif child.type == 'declaration': declarator = find_declarator(child) if declarator: break if declarator is None: return "" ident_node = find_identifier(declarator) if ident_node: name_bytes = code_bytes[ident_node.start_byte:ident_node.end_byte] return name_bytes.decode('utf-8') else: return "" def load_review_rules(self, excel_path: str): """从 Excel 加载启用的评审规则""" df = pd.read_excel(excel_path, header=None, engine='openpyxl') rules = [] for idx, row in df.iterrows(): if len(row) < 3: continue desc = str(row[1]).strip() if pd.notna(row[1]) else "" enabled = str(row[2]).strip() if pd.notna(row[2]) else "" if desc and enabled == "是": rule_id = str(row[0]).strip() if pd.notna(row[0]) else f"R{idx + 1}" rules.append({'id': rule_id, 'description': desc}) return rules def split_long_function(self, func_code: str, max_lines: int = 1600): """将长函数代码分割为多个片段""" lines = func_code.splitlines(keepends=True) if len(lines) <= max_lines: return [func_code] chunks = [] current = [] brace_depth = 0 for line in lines: current.append(line) brace_depth += line.count('{') - line.count('}') if len(current) >= max_lines and brace_depth <= 0: chunks.append(''.join(current)) current = [] brace_depth = 0 if current: chunks.append(''.join(current)) if not chunks: for i in range(0, len(lines), max_lines): chunks.append(''.join(lines[i:i + max_lines])) return chunks def analyze_function_against_rules(self, func_info: dict, rules: list, model: str = "qwen-max"): """对单个函数应用所有启用的规则进行分析""" func_name = func_info['name'] full_code = func_info['code'] chunks = self.split_long_function(full_code) total = len(chunks) rule_results = {} RULE_BASED_PROMPT = """你是一名 C 语言安全审计专家。请根据以下特定编码或安全规则,严格审查提供的函数代码片段: 【审查规则】 {rule_description} 【代码上下文】 - 当前为函数 "{function_name}" 的第 {chunk_index}/{total_chunks} 片段 - 若函数被分片,请综合所有片段判断;若仅提供一片,则基于此片判断 【审查要求】 - 如果此代码片段**明确违反**上述规则,请指出:违规位置、风险说明、修复建议; - 如果**未发现违规**,请回答:"本片段未发现违反该规则的问题"; - 如果因**代码不完整无法判断**(如只看到函数开头),请回答:"需结合完整函数判断"; - 回答必须简洁、基于事实,用中文,不超过50字。 【回答要求】 -请严格按以下 JSON 格式回答,不要包含任何额外文本、注释或 Markdown: {{ "has_issue": true 或 false, "explanation": "简明解释为何存在或不存在该问题" }} 【函数代码片段】 c {code_chunk} """ for rule in rules: rule_id = rule['id'] rule_desc = rule['description'] chunk_results = [] for i, chunk in enumerate(chunks): prompt = RULE_BASED_PROMPT.format( rule_description=rule_desc, function_name=func_name, chunk_index=i + 1, total_chunks=total, code_chunk=chunk ) try: response = self.client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], temperature=0.1, max_tokens=500 ) raw_response = response.choices[0].message.content.strip() res = json.loads(raw_response) chunk_results.append({ 'has_issue': bool(res.get('has_issue', False)), 'explanation': f"【片段{i + 1}】{res.get('explanation', '')}" }) except Exception as e: chunk_results.append({ 'has_issue': False, 'explanation': f"【片段{i + 1}】[分析错误] {str(e)[:200]}" }) final_has_issue = any(item['has_issue'] for item in chunk_results) final_explanation = "\n".join(item['explanation'] for item in chunk_results) rule_results[rule_id] = { 'rule_description': rule_desc, 'has_issue': final_has_issue, 'analysis': final_explanation } return rule_results def save_results_to_excel(self, results, output_path): """将分析结果保存为 Excel 文件""" df = pd.DataFrame(results, columns=[ "文件名", "函数名", "规则名", "是否缺陷", "解释规则" ]) with pd.ExcelWriter(output_path, engine='openpyxl') as writer: df.to_excel(writer, index=False, sheet_name='静态分析结果') worksheet = writer.sheets['静态分析结果'] for column in worksheet.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = min(max_length + 2, 80) worksheet.column_dimensions[column_letter].width = adjusted_width print(f"分析结果已保存至: {output_path}") def run_static_analysis(project_path: str, rules_excel: str, output_report: str = "audit_report"): """ 静态分析主执行函数。 这是整合到主程序时需要调用的入口函数。 """ print("\n" + "=" * 60) print("启动静态分析") print("=" * 60) analyzer = StaticAnalyzer() print("正在加载评审规则...") rules = analyzer.load_review_rules(rules_excel) print(f"共加载 {len(rules)} 条启用的规则") print(f"正在扫描项目文件夹: {project_path}") c_files = analyzer.find_c_files(project_path) print(f"找到 {len(c_files)} 个 C/C++ 文件") if not c_files: print("未找到任何 C/C++ 文件,请检查项目路径") return False all_functions = [] for file_path in c_files: print(f"正在处理文件: {file_path}") functions = analyzer.extract_functions_from_file(file_path) all_functions.extend(functions) print(f" 找到 {len(functions)} 个函数") print(f"共找到 {len(all_functions)} 个函数") all_results = [] for i, func in enumerate(all_functions, 1): print(f"\n分析函数 {i}/{len(all_functions)}: {func['name']} (来自 {func['file_name']})") rule_results = analyzer.analyze_function_against_rules(func, rules) all_results.append({ 'function': func, 'rule_results': rule_results }) excel_rows = [] for item in all_results: func_info = item['function'] func_name = func_info['name'] file_name = func_info['file_name'] for rule_id, res in item['rule_results'].items(): issue_flag = "是" if res.get('has_issue', False) else "否" excel_rows.append({ "文件名": file_name, "函数名": func_name, "规则名": f"[{rule_id}] {res['rule_description']}", "是否缺陷": issue_flag, "解释规则": res['analysis'] }) excel_output_path = f"{output_report}.xlsx" analyzer.save_results_to_excel(excel_rows, excel_output_path) # 生成 Markdown 报告 md_output_path = f"{output_report}.md" with open(md_output_path, 'w', encoding='utf-8') as f: f.write("# 基于规则的 C 函数静态分析报告\n\n") f.write(f"**项目路径**: `{project_path}`\n\n") f.write(f"**规则来源**: `{rules_excel}`\n\n") f.write(f"**分析文件数**: {len(c_files)} 个\n\n") f.write(f"**共分析函数**: {len(all_functions)} 个\n\n") f.write(f"**启用规则数**: {len(rules)} 条\n\n---\n\n") files_dict = {} for item in all_results: func = item['function'] file_path = func['file_path'] if file_path not in files_dict: files_dict[file_path] = [] files_dict[file_path].append(item) for file_path, items in files_dict.items(): f.write(f"## 文件: `{os.path.basename(file_path)}`\n\n") f.write(f"**完整路径**: {file_path}\n\n") for item in items: func = item['function'] f.write(f"### 函数: `{func['name']}` (L{func['start_line']}-L{func['end_line']})\n\n") f.write("```c\n") snippet = func['code'][:1500] + ("..." if len(func['code']) > 1500 else "") f.write(snippet) f.write("\n```\n\n") for rule_id, res in item['rule_results'].items(): f.write(f"#### 规则 [{rule_id}] {res['rule_description']}\n\n") f.write(f"{res['analysis']}\n\n") f.write("---\n\n") print(f"\n分析完成!") print(f"Excel 报告已保存至: {excel_output_path}") print(f"Markdown 报告已保存至: {md_output_path}") return True