Files
LlmStaticAnalyze/LlmStaticAnalyze.txt
2026-02-05 14:29:51 +08:00

517 lines
18 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import tree_sitter
import faiss
from openai import OpenAI
from tree_sitter import Language, Parser
import tree_sitter_c
import tree_sitter_cpp
import os
import json
import requests
import chardet
# Load C language
CPP_LANGUAGE = Language(tree_sitter_cpp.language())
parser = Parser()
parser.language = CPP_LANGUAGE
CPP_EXTENSIONS = {'.c', '.cpp', '.h', '.hpp', '.tcc'}
IGNORE_DIRS = {'build', 'cmake-build', '.git', 'vendor', 'lib', 'external', 'Debug'}
from tree_sitter import Language, Parser
def read_gbk_file(filepath):
with open(filepath, 'rb') as f:
raw_data = f.read()
encoding = chardet.detect(raw_data)['encoding']
if encoding.lower() in ('gbk', 'gb2312', 'cp936'):
return raw_data.decode('gbk')
else:
return raw_data.decode('utf-8') # fallback
def find_c_files(project_path):
"""
递归查找项目文件夹中的所有 C/C++ 文件
"""
c_files = []
for root, dirs, files in os.walk(project_path):
# 过滤需要忽略的目录
dirs[:] = [d for d in dirs if d not in IGNORE_DIRS]
for file in files:
if any(file.endswith(ext) for ext in CPP_EXTENSIONS):
full_path = os.path.join(root, file)
c_files.append(full_path)
return c_files
def extract_functions_from_file(file_path):
"""
从单个文件中提取所有函数
"""
try:
# 检测文件编码并读取
with open(file_path, 'rb') as f:
raw_data = f.read()
encoding = chardet.detect(raw_data)['encoding']
if encoding.lower() in ('gbk', 'gb2312', 'cp936'):
text_str = raw_data.decode('gbk')
else:
text_str = raw_data.decode('utf-8')
code_bytes = text_str.encode('utf-8')
functions = extract_functions(code_bytes)
# 为每个函数添加文件路径信息
for func in functions:
func['file_path'] = file_path
func['file_name'] = os.path.basename(file_path)
return functions
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
return []
def extract_functions(code_bytes: bytes):
tree = parser.parse(code_bytes)
root_node = tree.root_node
functions = []
def traverse(node):
if node.type == 'function_definition':
start = node.start_byte
end = node.end_byte
func_code = code_bytes[start:end].decode('utf-8')
func_name = get_function_name(node, code_bytes)
functions.append({
'name': func_name,
'code': func_code,
'start_line': node.start_point[0] + 1,
'end_line': node.end_point[0] + 1
})
else:
for child in node.children:
traverse(child)
traverse(root_node)
return functions
def find_declarator(node):
"""
递归查找 declarator 节点。
在 C 语言中,函数名位于 declarator -> ... -> identifier 中。
我们的目标是找到最内层的 declarator通常是 function_declarator
"""
# 如果当前节点本身就是 declarator 类型,直接返回
if node.type in ('declarator', 'function_declarator', 'pointer_declarator'):
return node
# 否则在其子节点中递归查找
for child in node.children:
# declarator 通常出现在 declaration 的第2个或之后的子节点中
# 常见结构: [storage_class?, type, declarator, ...]
result = find_declarator(child)
if result is not None:
return result
return None
def get_function_name(func_node, code_bytes: bytes):
"""
从 function_definition 节点中提取函数名。
code_bytes: 原始源码的 bytes 对象UTF-8 编码)
"""
declarator = None
for child in func_node.children:
if child.type == 'function_declarator':
declarator = child
break
elif child.type == 'declaration':
declarator = find_declarator(child)
if declarator:
break
if declarator is None:
return "<unknown>"
def find_identifier(node):
if node.type == 'identifier':
return node
for child in node.children:
found = find_identifier(child)
if found:
return found
return None
ident_node = find_identifier(declarator)
if ident_node:
name_bytes = code_bytes[ident_node.start_byte:ident_node.end_byte]
return name_bytes.decode('utf-8')
else:
return "<unnamed>"
MAX_CHAR_LENGTH = 60000 # 15k tokens
import pandas as pd
def load_review_rules(excel_path: str):
"""
从 Excel 加载评审规则。
返回列表:[{"id": 1, "description": "..."}, ...]
仅包含第3列为"是"的规则。
"""
df = pd.read_excel(excel_path, header=None, engine='openpyxl')
# 假设第0列=序号可选第1列=规则描述B列第2列=是否启用C列
rules = []
for idx, row in df.iterrows():
# 兼容不同长度行
if len(row) < 3:
continue
desc = str(row[1]).strip() if pd.notna(row[1]) else ""
enabled = str(row[2]).strip() if pd.notna(row[2]) else ""
if desc and enabled == "是":
rule_id = str(row[0]).strip() if pd.notna(row[0]) else f"R{idx + 1}"
rules.append({
'id': rule_id,
'description': desc
})
return rules
def save_results_to_excel(results, output_path):
"""
将分析结果保存为 Excel 文件。
:param results: List[Dict], 每个元素包含 keys:
'function_name', 'rule_name', 'has_issue', 'explanation'
:param output_path: str, 输出 Excel 路径,如 "audit_report.xlsx"
"""
df = pd.DataFrame(results, columns=[
"文件名",
"函数名",
"规则名",
"是否缺陷",
"解释规则"
])
# 自动调整列宽(可选,提升可读性)
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
df.to_excel(writer, index=False, sheet_name='静态分析结果')
worksheet = writer.sheets['静态分析结果']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 80) # 最大列宽限制为80
worksheet.column_dimensions[column_letter].width = adjusted_width
print(f"分析结果已保存至: {output_path}")
def split_long_function(func_code: str) -> list[str]:
if len(func_code) <= MAX_CHAR_LENGTH:
return [func_code]
# 简单按行分片(更优方式:按语句块分割)
lines = func_code.splitlines(keepends=True)
chunks = []
current = []
current_len = 0
for line in lines:
if current_len + len(line) > MAX_CHAR_LENGTH and current:
chunks.append(''.join(current))
current = [line]
current_len = len(line)
else:
current.append(line)
current_len += len(line)
if current:
chunks.append(''.join(current))
return chunks
import os
from openai import OpenAI
client = OpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key="sk-b51bf9fea0b3447896ce6d3a150f4a67"
)
RULE_BASED_PROMPT = """你是一名 C 语言安全审计专家。请根据以下特定编码或安全规则,严格审查提供的函数代码片段:
【审查规则】
{rule_description}
【代码上下文】
- 当前为函数 "{function_name}" 的第 {chunk_index}/{total_chunks} 片段
- 若函数被分片,请综合所有片段判断;若仅提供一片,则基于此片判断
【审查要求】
- 如果此代码片段**明确违反**上述规则,请指出:违规位置、风险说明、修复建议;
- 如果**未发现违规**,请回答:"本片段未发现违反该规则的问题"
- 如果因**代码不完整无法判断**(如只看到函数开头),请回答:"需结合完整函数判断"
- 回答必须简洁、基于事实用中文不超过50字。
【回答要求】
-请严格按以下 JSON 格式回答,不要包含任何额外文本、注释或 Markdown
{{
"has_issue": true 或 false,
"explanation": "简明解释为何存在或不存在该问题"
}}
【函数代码片段】
c
{code_chunk}
"""
def analyze_function_chunk_against_rule(
client,
func_name: str,
code_chunk: str,
rule_desc: str,
chunk_index: int,
total_chunks: int,
model: str = "qwen-max"
) -> str:
prompt = RULE_BASED_PROMPT.format(
rule_description=rule_desc,
function_name=func_name,
chunk_index=chunk_index,
total_chunks=total_chunks,
code_chunk=code_chunk
)
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
max_tokens=500
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"[API Error] {str(e)}"
def analyze_function_against_rules(func_info: dict, rules: list, client, model: str = "qwen-max") -> dict:
"""
对单个函数应用所有启用的规则。
若函数过长则分片,每片都送入同一条规则分析,最后合并结果。
"""
func_name = func_info['name']
full_code = func_info['code']
# 分片(复用之前的 split_long_function但保留行信息更好
chunks = split_long_function(full_code) # 返回 list[str]
total = len(chunks)
rule_results = {}
import json
# 假设 analyze_function_chunk_against_rule 已按新 prompt 返回纯 JSON 字符串
# 且 LLM 被强制要求只输出 JSON无多余文本
for rule in rules:
rule_id = rule['id']
rule_desc = rule['description']
chunk_results = [] # 存储每个 chunk 的结构化结果:{has_issue: bool, explanation: str}
if total == 1:
# 不分片
raw_response = analyze_function_chunk_against_rule(
client, func_name, chunks[0], rule_desc,
chunk_index=1, total_chunks=1, model=model
)
try:
res = json.loads(raw_response.strip())
chunk_results.append({
'has_issue': bool(res.get('has_issue', False)),
'explanation': str(res.get('explanation', ''))
})
except (json.JSONDecodeError, AttributeError):
# 解析失败时降级处理
chunk_results.append({
'has_issue': False,
'explanation': f"[解析失败] 原始响应: {raw_response[:200]}..."
})
else:
# 分片分析
for i, chunk in enumerate(chunks):
raw_response = analyze_function_chunk_against_rule(
client, func_name, chunk, rule_desc,
chunk_index=i + 1, total_chunks=total, model=model
)
try:
res = json.loads(raw_response.strip())
chunk_results.append({
'has_issue': bool(res.get('has_issue', False)),
'explanation': f"【片段{i + 1}】{res.get('explanation', '')}"
})
except (json.JSONDecodeError, AttributeError):
chunk_results.append({
'has_issue': False,
'explanation': f"【片段{i + 1}】[解析失败] 原始响应: {raw_response[:200]}..."
})
# 合并分片结果:只要有一个片段存在缺陷,整体视为有缺陷
final_has_issue = any(item['has_issue'] for item in chunk_results)
final_explanation = "\n".join(item['explanation'] for item in chunk_results)
rule_results[rule_id] = {
'rule_description': rule_desc,
'has_issue': final_has_issue, # 新增结构化字段
'analysis': final_explanation # 保留原始解释用于报告
}
return rule_results
def split_long_function(func_code: str, max_lines: int = 1600) -> list[str]:
"""
按行分片,尽量避免在 {} 中间切断(简单启发式)。
"""
lines = func_code.splitlines(keepends=True)
if len(lines) <= max_lines:
return [func_code]
chunks = []
current = []
brace_depth = 0
for line in lines:
current.append(line)
# 粗略跟踪花括号深度
brace_depth += line.count('{') - line.count('}')
if len(current) >= max_lines and brace_depth <= 0:
# 在块结束处分片
chunks.append(''.join(current))
current = []
brace_depth = 0
if current:
chunks.append(''.join(current))
# 防御:万一 brace_depth 始终 >0强制分片
if not chunks:
# fallback: 每 max_lines 行一切
for i in range(0, len(lines), max_lines):
chunks.append(''.join(lines[i:i + max_lines]))
return chunks
def main(project_path: str, rules_excel: str, output_report: str = "rule_based_report.md"):
print("正在加载评审规则...")
rules = load_review_rules(rules_excel)
print(f"共加载 {len(rules)} 条启用的规则")
print("正在读取 GBK 编码源文件...")
print(f"正在扫描项目文件夹: {project_path}")
c_files = find_c_files(project_path)
print(f"找到 {len(c_files)} 个 C/C++ 文件")
if not c_files:
print("未找到任何 C/C++ 文件,请检查项目路径")
return
# 提取所有文件中的函数
all_functions = []
for file_path in c_files:
print(f"正在处理文件: {file_path}")
functions = extract_functions_from_file(file_path)
all_functions.extend(functions)
print(f" 找到 {len(functions)} 个函数")
print(f"共找到 {len(all_functions)} 个函数")
if not all_functions:
print("未找到任何函数,分析结束")
return
# 初始化 OpenAI/Qwen 客户端
client = OpenAI(
base_url=os.getenv("QWEN_API_BASE", "https://dashscope.aliyuncs.com/compatible-mode/v1"),
api_key=os.getenv("QWEN_API_KEY", "sk-b51bf9fea0b3447896ce6d3a150f4a67")
)
all_results = []
for i, func in enumerate(all_functions, 1):
print(f"\n分析函数 {i}/{len(all_functions)}: {func['name']} (来自 {func['file_name']})")
rule_results = analyze_function_against_rules(func, rules, client)
print(rule_results)
all_results.append({
'function': func,
'rule_results': rule_results
})
# 生成 Excel 报告
excel_rows = []
for item in all_results:
func_info = item['function']
func_name = func_info['name']
file_name = func_info['file_name']
for rule_id, res in item['rule_results'].items():
issue_flag = "是" if res.get('has_issue', False) else "否"
excel_rows.append({
"文件名": file_name,
"函数名": func_name,
"规则名": f"[{rule_id}] {res['rule_description']}",
"是否缺陷": issue_flag,
"解释规则": res['analysis']
})
# 调用保存 Excel 的函数
excel_output_path = output_report.replace('.md', '.xlsx')
save_results_to_excel(excel_rows, excel_output_path)
# 生成 Markdown 报告
with open(output_report, 'w', encoding='utf-8') as f:
f.write("# 基于规则的 C 函数静态分析报告\n\n")
f.write(f"**项目路径**: `{project_path}`\n\n")
f.write(f"**规则来源**: `{rules_excel}`\n\n")
f.write(f"**分析文件数**: {len(c_files)} 个\n\n")
f.write(f"**共分析函数**: {len(all_functions)} 个\n\n")
f.write(f"**启用规则数**: {len(rules)} 条\n\n---\n\n")
# 按文件分组显示结果
files_dict = {}
for item in all_results:
func = item['function']
file_path = func['file_path']
if file_path not in files_dict:
files_dict[file_path] = []
files_dict[file_path].append(item)
for file_path, items in files_dict.items():
f.write(f"## 文件: `{os.path.basename(file_path)}`\n\n")
f.write(f"**完整路径**: {file_path}\n\n")
for item in items:
func = item['function']
f.write(f"### 函数: `{func['name']}` (L{func['start_line']}-L{func['end_line']})\n\n")
f.write("```c\n")
snippet = func['code'][:1500] + ("..." if len(func['code']) > 1500 else "")
f.write(snippet)
f.write("\n```\n\n")
for rule_id, res in item['rule_results'].items():
f.write(f"#### 规则 [{rule_id}] {res['rule_description']}\n\n")
f.write(f"{res['analysis']}\n\n")
f.write("---\n\n")
print(f"\n分析完成")
print(f"Excel 报告已保存至: {excel_output_path}")
print(f"Markdown 报告已保存至: {output_report}")
if __name__ == "__main__":
import os
os.environ["QWEN_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1"
os.environ["QWEN_API_KEY"] = ""
main(
project_path="",
rules_excel="审查规则.xlsx",
output_report="audit_report.md"
)