Files
rag_agent/RAG-TEST-TOOLS/static_analyzer.py

437 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# static_analyzer.py
import os
import pandas as pd
import json
from pathlib import Path
import chardet
import tree_sitter
import tree_sitter_c
import tree_sitter_cpp
from tree_sitter import Language, Parser
from openai import OpenAI
from config import QWEN_API_KEY, QWEN_API_URL # 从主程序配置导入
# 初始化 Tree-sitter
CPP_LANGUAGE = Language(tree_sitter_cpp.language())
parser = Parser()
parser.language = CPP_LANGUAGE
# 常量定义
CPP_EXTENSIONS = {'.c', '.cpp', '.h', '.hpp', '.tcc'}
IGNORE_DIRS = {'build', 'cmake-build', '.git', 'vendor', 'lib', 'external', 'Debug'}
MAX_CHAR_LENGTH = 60000
# static_analyzer.py
import os
import pandas as pd
import json
from pathlib import Path
import chardet
import tree_sitter
import tree_sitter_c
import tree_sitter_cpp
from tree_sitter import Language, Parser
from openai import OpenAI
# 注意:这里不直接导入 QWEN_API_KEY 和 QWEN_API_BASE
# 而是通过 getattr 安全地获取 config 模块中的值
try:
from config import QWEN_API_KEY, QWEN_API_BASE
except ImportError:
# 如果导入失败,设置默认值
QWEN_API_KEY = None
QWEN_API_BASE = None
# 初始化 Tree-sitter
CPP_LANGUAGE = Language(tree_sitter_cpp.language())
parser = Parser()
parser.language = CPP_LANGUAGE
# 常量定义
CPP_EXTENSIONS = {'.c', '.cpp', '.h', '.hpp', '.tcc'}
IGNORE_DIRS = {'build', 'cmake-build', '.git', 'vendor', 'lib', 'external', 'Debug'}
MAX_CHAR_LENGTH = 60000
class StaticAnalyzer:
"""静态分析器,封装代码解析与规则审查逻辑"""
def __init__(self, api_key=None, api_base=None):
# 安全地获取配置值
config_api_key = None
config_api_base = None
try:
# 尝试从 config 模块获取值
import config
config_api_key = getattr(config, 'QWEN_API_KEY', None)
config_api_base = getattr(config, 'QWEN_API_URL', None)
except ImportError:
# 如果导入失败,使用在文件开头定义的变量
config_api_key = QWEN_API_KEY
config_api_base = QWEN_API_BASE
# 优先使用传入的参数,然后使用 config 中的配置
actual_api_key = api_key or config_api_key
actual_api_base = api_base or config_api_base
# 验证 API 配置
if not actual_api_key:
print("警告: API Key 未配置")
print("请在 config.py 中设置 QWEN_API_KEY 或传入 api_key 参数")
if not actual_api_base:
print("警告: API Base URL 未配置")
print("请在 config.py 中设置 QWEN_API_BASE 或传入 api_base 参数")
# 设置默认的 API Base URL
actual_api_base = "https://dashscope.aliyuncs.com/compatible-mode/v1"
# 确保 api_base 是字符串,而不是函数
if callable(actual_api_base):
print("警告: QWEN_API_BASE 是一个函数而不是字符串,使用默认值")
actual_api_base = "https://dashscope.aliyuncs.com/compatible-mode/v1"
print(f"API 配置信息:")
print(f" - API Key: {'已设置' if actual_api_key else '未设置'}")
print(f" - API Base URL: {actual_api_base}")
# 初始化 OpenAI 客户端
self.client = OpenAI(
base_url=actual_api_base, # 确保这是字符串
api_key=actual_api_key
)
# ... 以下代码保持不变 ...
def find_c_files(self, project_path):
"""递归查找项目文件夹中的所有 C/C++ 文件"""
c_files = []
for root, dirs, files in os.walk(project_path):
dirs[:] = [d for d in dirs if d not in IGNORE_DIRS]
for file in files:
if any(file.endswith(ext) for ext in CPP_EXTENSIONS):
full_path = os.path.join(root, file)
c_files.append(full_path)
return c_files
def extract_functions_from_file(self, file_path):
"""从单个文件中提取所有函数包含GBK编码支持"""
try:
with open(file_path, 'rb') as f:
raw_data = f.read()
encoding = chardet.detect(raw_data)['encoding']
if encoding.lower() in ('gbk', 'gb2312', 'cp936'):
text_str = raw_data.decode('gbk')
else:
text_str = raw_data.decode('utf-8')
code_bytes = text_str.encode('utf-8')
functions = self._extract_functions(code_bytes)
for func in functions:
func['file_path'] = file_path
func['file_name'] = os.path.basename(file_path)
return functions
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
return []
def _extract_functions(self, code_bytes: bytes):
"""使用 Tree-sitter 解析并提取函数定义"""
tree = parser.parse(code_bytes)
root_node = tree.root_node
functions = []
def traverse(node):
if node.type == 'function_definition':
start = node.start_byte
end = node.end_byte
func_code = code_bytes[start:end].decode('utf-8')
func_name = self._get_function_name(node, code_bytes)
functions.append({
'name': func_name,
'code': func_code,
'start_line': node.start_point[0] + 1,
'end_line': node.end_point[0] + 1
})
else:
for child in node.children:
traverse(child)
traverse(root_node)
return functions
def _get_function_name(self, func_node, code_bytes: bytes):
"""提取函数名"""
def find_declarator(node):
if node.type in ('declarator', 'function_declarator', 'pointer_declarator'):
return node
for child in node.children:
result = find_declarator(child)
if result is not None:
return result
return None
def find_identifier(node):
if node.type == 'identifier':
return node
for child in node.children:
found = find_identifier(child)
if found:
return found
return None
declarator = None
for child in func_node.children:
if child.type == 'function_declarator':
declarator = child
break
elif child.type == 'declaration':
declarator = find_declarator(child)
if declarator:
break
if declarator is None:
return "<unknown>"
ident_node = find_identifier(declarator)
if ident_node:
name_bytes = code_bytes[ident_node.start_byte:ident_node.end_byte]
return name_bytes.decode('utf-8')
else:
return "<unnamed>"
def load_review_rules(self, excel_path: str):
"""从 Excel 加载启用的评审规则"""
df = pd.read_excel(excel_path, header=None, engine='openpyxl')
rules = []
for idx, row in df.iterrows():
if len(row) < 3:
continue
desc = str(row[1]).strip() if pd.notna(row[1]) else ""
enabled = str(row[2]).strip() if pd.notna(row[2]) else ""
if desc and enabled == "":
rule_id = str(row[0]).strip() if pd.notna(row[0]) else f"R{idx + 1}"
rules.append({'id': rule_id, 'description': desc})
return rules
def split_long_function(self, func_code: str, max_lines: int = 1600):
"""将长函数代码分割为多个片段"""
lines = func_code.splitlines(keepends=True)
if len(lines) <= max_lines:
return [func_code]
chunks = []
current = []
brace_depth = 0
for line in lines:
current.append(line)
brace_depth += line.count('{') - line.count('}')
if len(current) >= max_lines and brace_depth <= 0:
chunks.append(''.join(current))
current = []
brace_depth = 0
if current:
chunks.append(''.join(current))
if not chunks:
for i in range(0, len(lines), max_lines):
chunks.append(''.join(lines[i:i + max_lines]))
return chunks
def analyze_function_against_rules(self, func_info: dict, rules: list, model: str = "qwen-max"):
"""对单个函数应用所有启用的规则进行分析"""
func_name = func_info['name']
full_code = func_info['code']
chunks = self.split_long_function(full_code)
total = len(chunks)
rule_results = {}
RULE_BASED_PROMPT = """你是一名 C 语言安全审计专家。请根据以下特定编码或安全规则,严格审查提供的函数代码片段:
【审查规则】
{rule_description}
【代码上下文】
- 当前为函数 "{function_name}" 的第 {chunk_index}/{total_chunks} 片段
- 若函数被分片,请综合所有片段判断;若仅提供一片,则基于此片判断
【审查要求】
- 如果此代码片段**明确违反**上述规则,请指出:违规位置、风险说明、修复建议;
- 如果**未发现违规**,请回答:"本片段未发现违反该规则的问题"
- 如果因**代码不完整无法判断**(如只看到函数开头),请回答:"需结合完整函数判断"
- 回答必须简洁、基于事实用中文不超过50字。
【回答要求】
-请严格按以下 JSON 格式回答,不要包含任何额外文本、注释或 Markdown
{{
"has_issue": true 或 false,
"explanation": "简明解释为何存在或不存在该问题"
}}
【函数代码片段】
c
{code_chunk}
"""
for rule in rules:
rule_id = rule['id']
rule_desc = rule['description']
chunk_results = []
for i, chunk in enumerate(chunks):
prompt = RULE_BASED_PROMPT.format(
rule_description=rule_desc,
function_name=func_name,
chunk_index=i + 1,
total_chunks=total,
code_chunk=chunk
)
try:
response = self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
max_tokens=500
)
raw_response = response.choices[0].message.content.strip()
res = json.loads(raw_response)
chunk_results.append({
'has_issue': bool(res.get('has_issue', False)),
'explanation': f"【片段{i + 1}{res.get('explanation', '')}"
})
except Exception as e:
chunk_results.append({
'has_issue': False,
'explanation': f"【片段{i + 1}】[分析错误] {str(e)[:200]}"
})
final_has_issue = any(item['has_issue'] for item in chunk_results)
final_explanation = "\n".join(item['explanation'] for item in chunk_results)
rule_results[rule_id] = {
'rule_description': rule_desc,
'has_issue': final_has_issue,
'analysis': final_explanation
}
return rule_results
def save_results_to_excel(self, results, output_path):
"""将分析结果保存为 Excel 文件"""
df = pd.DataFrame(results, columns=[
"文件名", "函数名", "规则名", "是否缺陷", "解释规则"
])
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
df.to_excel(writer, index=False, sheet_name='静态分析结果')
worksheet = writer.sheets['静态分析结果']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 80)
worksheet.column_dimensions[column_letter].width = adjusted_width
print(f"分析结果已保存至: {output_path}")
def run_static_analysis(project_path: str, rules_excel: str, output_report: str = "audit_report"):
"""
静态分析主执行函数。
这是整合到主程序时需要调用的入口函数。
"""
print("\n" + "=" * 60)
print("启动静态分析")
print("=" * 60)
analyzer = StaticAnalyzer()
print("正在加载评审规则...")
rules = analyzer.load_review_rules(rules_excel)
print(f"共加载 {len(rules)} 条启用的规则")
print(f"正在扫描项目文件夹: {project_path}")
c_files = analyzer.find_c_files(project_path)
print(f"找到 {len(c_files)} 个 C/C++ 文件")
if not c_files:
print("未找到任何 C/C++ 文件,请检查项目路径")
return False
all_functions = []
for file_path in c_files:
print(f"正在处理文件: {file_path}")
functions = analyzer.extract_functions_from_file(file_path)
all_functions.extend(functions)
print(f" 找到 {len(functions)} 个函数")
print(f"共找到 {len(all_functions)} 个函数")
all_results = []
for i, func in enumerate(all_functions, 1):
print(f"\n分析函数 {i}/{len(all_functions)}: {func['name']} (来自 {func['file_name']})")
rule_results = analyzer.analyze_function_against_rules(func, rules)
all_results.append({
'function': func,
'rule_results': rule_results
})
excel_rows = []
for item in all_results:
func_info = item['function']
func_name = func_info['name']
file_name = func_info['file_name']
for rule_id, res in item['rule_results'].items():
issue_flag = "" if res.get('has_issue', False) else ""
excel_rows.append({
"文件名": file_name,
"函数名": func_name,
"规则名": f"[{rule_id}] {res['rule_description']}",
"是否缺陷": issue_flag,
"解释规则": res['analysis']
})
excel_output_path = f"{output_report}.xlsx"
analyzer.save_results_to_excel(excel_rows, excel_output_path)
# 生成 Markdown 报告
md_output_path = f"{output_report}.md"
with open(md_output_path, 'w', encoding='utf-8') as f:
f.write("# 基于规则的 C 函数静态分析报告\n\n")
f.write(f"**项目路径**: `{project_path}`\n\n")
f.write(f"**规则来源**: `{rules_excel}`\n\n")
f.write(f"**分析文件数**: {len(c_files)}\n\n")
f.write(f"**共分析函数**: {len(all_functions)}\n\n")
f.write(f"**启用规则数**: {len(rules)}\n\n---\n\n")
files_dict = {}
for item in all_results:
func = item['function']
file_path = func['file_path']
if file_path not in files_dict:
files_dict[file_path] = []
files_dict[file_path].append(item)
for file_path, items in files_dict.items():
f.write(f"## 文件: `{os.path.basename(file_path)}`\n\n")
f.write(f"**完整路径**: {file_path}\n\n")
for item in items:
func = item['function']
f.write(f"### 函数: `{func['name']}` (L{func['start_line']}-L{func['end_line']})\n\n")
f.write("```c\n")
snippet = func['code'][:1500] + ("..." if len(func['code']) > 1500 else "")
f.write(snippet)
f.write("\n```\n\n")
for rule_id, res in item['rule_results'].items():
f.write(f"#### 规则 [{rule_id}] {res['rule_description']}\n\n")
f.write(f"{res['analysis']}\n\n")
f.write("---\n\n")
print(f"\n分析完成!")
print(f"Excel 报告已保存至: {excel_output_path}")
print(f"Markdown 报告已保存至: {md_output_path}")
return True