Files
rag_agent/RAG-TEST-TOOLS/static_analyzer.py

437 lines
16 KiB
Python
Raw Permalink Normal View History

# static_analyzer.py
import os
import pandas as pd
import json
from pathlib import Path
import chardet
import tree_sitter
import tree_sitter_c
import tree_sitter_cpp
from tree_sitter import Language, Parser
from openai import OpenAI
from config import QWEN_API_KEY, QWEN_API_URL # 从主程序配置导入
# 初始化 Tree-sitter
CPP_LANGUAGE = Language(tree_sitter_cpp.language())
parser = Parser()
parser.language = CPP_LANGUAGE
# 常量定义
CPP_EXTENSIONS = {'.c', '.cpp', '.h', '.hpp', '.tcc'}
IGNORE_DIRS = {'build', 'cmake-build', '.git', 'vendor', 'lib', 'external', 'Debug'}
MAX_CHAR_LENGTH = 60000
# static_analyzer.py
import os
import pandas as pd
import json
from pathlib import Path
import chardet
import tree_sitter
import tree_sitter_c
import tree_sitter_cpp
from tree_sitter import Language, Parser
from openai import OpenAI
# 注意:这里不直接导入 QWEN_API_KEY 和 QWEN_API_BASE
# 而是通过 getattr 安全地获取 config 模块中的值
try:
from config import QWEN_API_KEY, QWEN_API_BASE
except ImportError:
# 如果导入失败,设置默认值
QWEN_API_KEY = None
QWEN_API_BASE = None
# 初始化 Tree-sitter
CPP_LANGUAGE = Language(tree_sitter_cpp.language())
parser = Parser()
parser.language = CPP_LANGUAGE
# 常量定义
CPP_EXTENSIONS = {'.c', '.cpp', '.h', '.hpp', '.tcc'}
IGNORE_DIRS = {'build', 'cmake-build', '.git', 'vendor', 'lib', 'external', 'Debug'}
MAX_CHAR_LENGTH = 60000
class StaticAnalyzer:
"""静态分析器,封装代码解析与规则审查逻辑"""
def __init__(self, api_key=None, api_base=None):
# 安全地获取配置值
config_api_key = None
config_api_base = None
try:
# 尝试从 config 模块获取值
import config
config_api_key = getattr(config, 'QWEN_API_KEY', None)
config_api_base = getattr(config, 'QWEN_API_URL', None)
except ImportError:
# 如果导入失败,使用在文件开头定义的变量
config_api_key = QWEN_API_KEY
config_api_base = QWEN_API_BASE
# 优先使用传入的参数,然后使用 config 中的配置
actual_api_key = api_key or config_api_key
actual_api_base = api_base or config_api_base
# 验证 API 配置
if not actual_api_key:
print("警告: API Key 未配置")
print("请在 config.py 中设置 QWEN_API_KEY 或传入 api_key 参数")
if not actual_api_base:
print("警告: API Base URL 未配置")
print("请在 config.py 中设置 QWEN_API_BASE 或传入 api_base 参数")
# 设置默认的 API Base URL
actual_api_base = "https://dashscope.aliyuncs.com/compatible-mode/v1"
# 确保 api_base 是字符串,而不是函数
if callable(actual_api_base):
print("警告: QWEN_API_BASE 是一个函数而不是字符串,使用默认值")
actual_api_base = "https://dashscope.aliyuncs.com/compatible-mode/v1"
print(f"API 配置信息:")
print(f" - API Key: {'已设置' if actual_api_key else '未设置'}")
print(f" - API Base URL: {actual_api_base}")
# 初始化 OpenAI 客户端
self.client = OpenAI(
base_url=actual_api_base, # 确保这是字符串
api_key=actual_api_key
)
# ... 以下代码保持不变 ...
def find_c_files(self, project_path):
"""递归查找项目文件夹中的所有 C/C++ 文件"""
c_files = []
for root, dirs, files in os.walk(project_path):
dirs[:] = [d for d in dirs if d not in IGNORE_DIRS]
for file in files:
if any(file.endswith(ext) for ext in CPP_EXTENSIONS):
full_path = os.path.join(root, file)
c_files.append(full_path)
return c_files
def extract_functions_from_file(self, file_path):
"""从单个文件中提取所有函数包含GBK编码支持"""
try:
with open(file_path, 'rb') as f:
raw_data = f.read()
encoding = chardet.detect(raw_data)['encoding']
if encoding.lower() in ('gbk', 'gb2312', 'cp936'):
text_str = raw_data.decode('gbk')
else:
text_str = raw_data.decode('utf-8')
code_bytes = text_str.encode('utf-8')
functions = self._extract_functions(code_bytes)
for func in functions:
func['file_path'] = file_path
func['file_name'] = os.path.basename(file_path)
return functions
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
return []
def _extract_functions(self, code_bytes: bytes):
"""使用 Tree-sitter 解析并提取函数定义"""
tree = parser.parse(code_bytes)
root_node = tree.root_node
functions = []
def traverse(node):
if node.type == 'function_definition':
start = node.start_byte
end = node.end_byte
func_code = code_bytes[start:end].decode('utf-8')
func_name = self._get_function_name(node, code_bytes)
functions.append({
'name': func_name,
'code': func_code,
'start_line': node.start_point[0] + 1,
'end_line': node.end_point[0] + 1
})
else:
for child in node.children:
traverse(child)
traverse(root_node)
return functions
def _get_function_name(self, func_node, code_bytes: bytes):
"""提取函数名"""
def find_declarator(node):
if node.type in ('declarator', 'function_declarator', 'pointer_declarator'):
return node
for child in node.children:
result = find_declarator(child)
if result is not None:
return result
return None
def find_identifier(node):
if node.type == 'identifier':
return node
for child in node.children:
found = find_identifier(child)
if found:
return found
return None
declarator = None
for child in func_node.children:
if child.type == 'function_declarator':
declarator = child
break
elif child.type == 'declaration':
declarator = find_declarator(child)
if declarator:
break
if declarator is None:
return "<unknown>"
ident_node = find_identifier(declarator)
if ident_node:
name_bytes = code_bytes[ident_node.start_byte:ident_node.end_byte]
return name_bytes.decode('utf-8')
else:
return "<unnamed>"
def load_review_rules(self, excel_path: str):
"""从 Excel 加载启用的评审规则"""
df = pd.read_excel(excel_path, header=None, engine='openpyxl')
rules = []
for idx, row in df.iterrows():
if len(row) < 3:
continue
desc = str(row[1]).strip() if pd.notna(row[1]) else ""
enabled = str(row[2]).strip() if pd.notna(row[2]) else ""
if desc and enabled == "":
rule_id = str(row[0]).strip() if pd.notna(row[0]) else f"R{idx + 1}"
rules.append({'id': rule_id, 'description': desc})
return rules
def split_long_function(self, func_code: str, max_lines: int = 1600):
"""将长函数代码分割为多个片段"""
lines = func_code.splitlines(keepends=True)
if len(lines) <= max_lines:
return [func_code]
chunks = []
current = []
brace_depth = 0
for line in lines:
current.append(line)
brace_depth += line.count('{') - line.count('}')
if len(current) >= max_lines and brace_depth <= 0:
chunks.append(''.join(current))
current = []
brace_depth = 0
if current:
chunks.append(''.join(current))
if not chunks:
for i in range(0, len(lines), max_lines):
chunks.append(''.join(lines[i:i + max_lines]))
return chunks
def analyze_function_against_rules(self, func_info: dict, rules: list, model: str = "qwen-max"):
"""对单个函数应用所有启用的规则进行分析"""
func_name = func_info['name']
full_code = func_info['code']
chunks = self.split_long_function(full_code)
total = len(chunks)
rule_results = {}
RULE_BASED_PROMPT = """你是一名 C 语言安全审计专家。请根据以下特定编码或安全规则,严格审查提供的函数代码片段:
审查规则
{rule_description}
代码上下文
- 当前为函数 "{function_name}" 的第 {chunk_index}/{total_chunks} 片段
- 若函数被分片请综合所有片段判断若仅提供一片则基于此片判断
审查要求
- 如果此代码片段**明确违反**上述规则请指出违规位置风险说明修复建议
- 如果**未发现违规**请回答"本片段未发现违反该规则的问题"
- 如果因**代码不完整无法判断**如只看到函数开头请回答"需结合完整函数判断"
- 回答必须简洁基于事实用中文不超过50字
回答要求
-请严格按以下 JSON 格式回答不要包含任何额外文本注释或 Markdown
{{
"has_issue": true false,
"explanation": "简明解释为何存在或不存在该问题"
}}
函数代码片段
c
{code_chunk}
"""
for rule in rules:
rule_id = rule['id']
rule_desc = rule['description']
chunk_results = []
for i, chunk in enumerate(chunks):
prompt = RULE_BASED_PROMPT.format(
rule_description=rule_desc,
function_name=func_name,
chunk_index=i + 1,
total_chunks=total,
code_chunk=chunk
)
try:
response = self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
max_tokens=500
)
raw_response = response.choices[0].message.content.strip()
res = json.loads(raw_response)
chunk_results.append({
'has_issue': bool(res.get('has_issue', False)),
'explanation': f"【片段{i + 1}{res.get('explanation', '')}"
})
except Exception as e:
chunk_results.append({
'has_issue': False,
'explanation': f"【片段{i + 1}】[分析错误] {str(e)[:200]}"
})
final_has_issue = any(item['has_issue'] for item in chunk_results)
final_explanation = "\n".join(item['explanation'] for item in chunk_results)
rule_results[rule_id] = {
'rule_description': rule_desc,
'has_issue': final_has_issue,
'analysis': final_explanation
}
return rule_results
def save_results_to_excel(self, results, output_path):
"""将分析结果保存为 Excel 文件"""
df = pd.DataFrame(results, columns=[
"文件名", "函数名", "规则名", "是否缺陷", "解释规则"
])
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
df.to_excel(writer, index=False, sheet_name='静态分析结果')
worksheet = writer.sheets['静态分析结果']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 80)
worksheet.column_dimensions[column_letter].width = adjusted_width
print(f"分析结果已保存至: {output_path}")
def run_static_analysis(project_path: str, rules_excel: str, output_report: str = "audit_report"):
"""
静态分析主执行函数
这是整合到主程序时需要调用的入口函数
"""
print("\n" + "=" * 60)
print("启动静态分析")
print("=" * 60)
analyzer = StaticAnalyzer()
print("正在加载评审规则...")
rules = analyzer.load_review_rules(rules_excel)
print(f"共加载 {len(rules)} 条启用的规则")
print(f"正在扫描项目文件夹: {project_path}")
c_files = analyzer.find_c_files(project_path)
print(f"找到 {len(c_files)} 个 C/C++ 文件")
if not c_files:
print("未找到任何 C/C++ 文件,请检查项目路径")
return False
all_functions = []
for file_path in c_files:
print(f"正在处理文件: {file_path}")
functions = analyzer.extract_functions_from_file(file_path)
all_functions.extend(functions)
print(f" 找到 {len(functions)} 个函数")
print(f"共找到 {len(all_functions)} 个函数")
all_results = []
for i, func in enumerate(all_functions, 1):
print(f"\n分析函数 {i}/{len(all_functions)}: {func['name']} (来自 {func['file_name']})")
rule_results = analyzer.analyze_function_against_rules(func, rules)
all_results.append({
'function': func,
'rule_results': rule_results
})
excel_rows = []
for item in all_results:
func_info = item['function']
func_name = func_info['name']
file_name = func_info['file_name']
for rule_id, res in item['rule_results'].items():
issue_flag = "" if res.get('has_issue', False) else ""
excel_rows.append({
"文件名": file_name,
"函数名": func_name,
"规则名": f"[{rule_id}] {res['rule_description']}",
"是否缺陷": issue_flag,
"解释规则": res['analysis']
})
excel_output_path = f"{output_report}.xlsx"
analyzer.save_results_to_excel(excel_rows, excel_output_path)
# 生成 Markdown 报告
md_output_path = f"{output_report}.md"
with open(md_output_path, 'w', encoding='utf-8') as f:
f.write("# 基于规则的 C 函数静态分析报告\n\n")
f.write(f"**项目路径**: `{project_path}`\n\n")
f.write(f"**规则来源**: `{rules_excel}`\n\n")
f.write(f"**分析文件数**: {len(c_files)}\n\n")
f.write(f"**共分析函数**: {len(all_functions)}\n\n")
f.write(f"**启用规则数**: {len(rules)}\n\n---\n\n")
files_dict = {}
for item in all_results:
func = item['function']
file_path = func['file_path']
if file_path not in files_dict:
files_dict[file_path] = []
files_dict[file_path].append(item)
for file_path, items in files_dict.items():
f.write(f"## 文件: `{os.path.basename(file_path)}`\n\n")
f.write(f"**完整路径**: {file_path}\n\n")
for item in items:
func = item['function']
f.write(f"### 函数: `{func['name']}` (L{func['start_line']}-L{func['end_line']})\n\n")
f.write("```c\n")
snippet = func['code'][:1500] + ("..." if len(func['code']) > 1500 else "")
f.write(snippet)
f.write("\n```\n\n")
for rule_id, res in item['rule_results'].items():
f.write(f"#### 规则 [{rule_id}] {res['rule_description']}\n\n")
f.write(f"{res['analysis']}\n\n")
f.write("---\n\n")
print(f"\n分析完成!")
print(f"Excel 报告已保存至: {excel_output_path}")
print(f"Markdown 报告已保存至: {md_output_path}")
return True