Files
StaticAnalysisCensor/StaticAnalysisCensor.py
2026-02-05 16:32:48 +08:00

565 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
import re
import time
import pandas as pd
from typing import List, Dict, Optional, Tuple
import numpy as np
import faiss
import openai
from posthog import project_root
from tree_sitter import Language, Parser
import tree_sitter_cpp
from openai import OpenAI
# 配置指向 DashScope 的 OpenAI 兼容 endpoint
DASHSCOPE_API_KEY = "sk-b51bf9fea0b3447896ce6d3a150f4a67"
BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
# 创建客户端(替代旧的 openai.api_key / openai.api_base
client = OpenAI(
api_key=DASHSCOPE_API_KEY,
base_url=BASE_URL
)
EMBEDDING_MODEL_NAME = "text-embedding-v4"
KB_INDEX_PATH = "C:/Users\surface\Desktop/vector-knowledge/satellite_rag.faiss"
KB_META_PATH = "C:/Users\surface\Desktop/vector-knowledge/satellite_rag_metadata.json"
CPP_LANGUAGE = Language(tree_sitter_cpp.language())
parser = Parser()
parser.language = CPP_LANGUAGE
# LLM 判断模型(仍可用 qwen-agent 或直接调 DashScope
LLM_MODEL_NAME = "qwen-max"
# 输入输出
INPUT_XLSX = ("")
OUTPUT_JSON = "filtered_defects.json"
Project_path = ""
# 加载 FAISS 知识库
print("Loading FAISS index...")
index = faiss.read_index(KB_INDEX_PATH)
with open(KB_META_PATH, "r", encoding="utf-8") as f:
kb_meta = json.load(f)
# 校验维度v4 是 1024 维)
assert index.d == 1024, f"FAISS 维度应为 1024但实际为 {index.d}。请确认由 text-embedding-v4 构建!"
assert len(kb_meta) == index.ntotal, "meta.json 条目数与 FAISS 向量数不一致!"
print(f"Knowledge base loaded: {len(kb_meta)} entries, dim={index.d}")
# ============================
# 新增:通过 OpenAI 兼容 API 获取 Embedding
# ============================
def embed_text(text: str) -> np.ndarray:
""" 使用新版 OpenAI 客户端调用 DashScope embedding """
try:
response = client.embeddings.create(
model=EMBEDDING_MODEL_NAME, # "text-embedding-v4"
input=text
)
# 新版 response 是 Pydantic 模型,不是 dict
embedding = response.data[0].embedding # 注意:.data[0].embedding
emb_np = np.array(embedding, dtype=np.float32).reshape(1, -1)
return emb_np
except Exception as e:
print(f"Embedding API error: {e}")
return np.zeros((1, 1024), dtype=np.float32)
def get_function_context(file_path: str, line_number: int) -> Optional[Tuple[str, str]]:
import chardet # 或 from charset_normalizer import from_path
try:
with open(file_path, 'rb') as f:
raw_data = f.read()
detected = chardet.detect(raw_data)
encoding = detected['encoding']
if encoding is None:
encoding = 'utf-8'
# 容错:某些检测结果如 'ascii' 可安全视为 utf-8
if encoding.lower() in ('ascii', 'utf-8', 'utf-8-sig'):
encoding = 'utf-8'
elif 'gb' in encoding.lower():
encoding = 'gb18030' # 兼容 gbk/gb2312
else:
encoding = 'utf-8' # 默认 fallback
code = raw_data.decode(encoding, errors='replace')
print("successfully decode the code text with " + encoding)
except Exception as e:
print(f"Failed to read {file_path}: {e}")
return None
tree = parser.parse(bytes(code, 'utf-8')) # 注意tree-sitter 内部要求输入是 UTF-8 bytes
def find_function_node(node):
if node.type == "function_definition":
start_line = node.start_point[0] + 1
end_line = node.end_point[0] + 1
if start_line <= line_number <= end_line:
func_name_node = node.child_by_field_name("declarator")
if func_name_node:
name = func_name_node.text.decode("utf-8").split("(")[0].strip().split()[-1]
func_code = code[node.start_byte: node.end_byte]
return name, func_code
for child in node.children:
res = find_function_node(child)
if res:
return res
return None
return find_function_node(tree.root_node)
def get_fallback_context(file_path: str, line_number: int, window: int = 10) -> str:
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
except Exception:
return f"// Failed to read file around line {line_number}"
start = max(0, line_number - 1 - window)
end = min(len(lines), line_number - 1 + window + 1)
snippet = "".join(lines[start:end])
return f"// Context around line {line_number} (non-function):\n{snippet}"
def retrieve_knowledge(query_text: str, top_k: int = 1) -> List[Dict]:
emb = embed_text(query_text)
D, I = index.search(emb, top_k)
results = []
for idx in I[0]:
if 0 <= idx < len(kb_meta):
results.append(kb_meta[idx])
return results
def retrieve_related_summaries(main_func_info: Dict, max_related: int = 3) -> Dict[str, str]:
related = {"called_by": [], "calls": []}
def fetch_summary(func_name, file_path):
query = f"{func_name} in {file_path}"
hits = retrieve_knowledge(query, top_k=1)
if hits:
hit = hits[0]
return f"{hit['function_name']} in {hit['file_path']}: {hit.get('summary', 'No summary')}"
return f"{func_name} in {file_path}: Summary not found"
for item in main_func_info.get("called_by", [])[:max_related]:
if isinstance(item, dict) and "function" in item and "file" in item:
related["called_by"].append(fetch_summary(item["function"], item["file"]))
for item in main_func_info.get("calls", [])[:max_related]:
if isinstance(item, dict) and "function" in item and "file" in item:
related["calls"].append(fetch_summary(item["function"], item["file"]))
return related
def get_urgency_score_for_A(defect_desc: str, reason: str) -> int:
"""为高风险缺陷计算紧急修复分数"""
prompt = f"""你是一位资深 C/C++ 静态分析专家和航天嵌入式系统安全工程师。
以下是一个已被判定为高风险缺陷的问题,请根据其**严重性、可触发概率、后果影响(如崩溃、数据损坏、安全漏洞等)**
给出一个 0 到 100 的紧急修复分数urgency score
- 100 分:必然触发、导致系统崩溃或严重安全漏洞(如缓冲区溢出、除零、空指针解引用在关键路径)
- 7090 分:高概率触发,影响核心功能
- 4060 分:可能触发,影响次要功能
- 030 分:极难触发,或后果轻微
缺陷描述:
{defect_desc}
分析理由:
{reason}
请仅输出一个整数0 到 100 之间),不要包含任何其他文字。"""
try:
response = client.chat.completions.create(
model=LLM_MODEL_NAME,
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
max_tokens=5 # 足够输出一个数字
)
answer = response.choices[0].message.content.strip()
# 使用正则提取第一个整数
match = re.search(r'\d+', answer)
if match:
score = int(match.group())
score = max(0, min(100, score)) # 限制在 0-100
return score
else:
print(f"无法从模型响应中提取数字,使用默认值 50。原始响应: '{answer}'")
return 50
except Exception as e:
print(f"获取 urgency_score 出错,使用默认值 50: {e}")
return 50
# ============================
# 新增:影响域分析功能
# ============================
def retrieve_relevant_functions(query_text: str, top_k: int = 5) -> List[str]:
"""
根据查询文本,从知识库中检索最相关的函数 context_text 列表
"""
try:
emb = embed_text(query_text)
D, I = index.search(emb, top_k)
contexts = []
for idx in I[0]:
if 0 <= idx < len(kb_meta):
meta = kb_meta[idx]
# 重建 context_text与知识库构建时一致
context = (
f"【实体类型】函数\n"
f"【函数名】{meta.get('function_name', 'N/A')}\n"
f"【所在文件】{os.path.basename(meta.get('file_path', 'N/A'))}\n"
f"【功能摘要】{meta.get('summary', '')}\n"
f"【调用的函数】{', '.join(meta.get('calls', [])) if meta.get('calls') else ''}\n"
f"【被以下函数调用】{', '.join(meta.get('called_by', [])) if meta.get('called_by') else ''}\n"
f"【包含的头文件】{', '.join(meta.get('includes', [])) if meta.get('includes') else ''}\n"
f"{'-' * 40}"
)
contexts.append(context)
return contexts
except Exception as e:
print(f"知识库检索出错: {e}")
return []
def get_functions_to_modify_with_knowledge(
defect_desc: str,
reason: str,
file_path: str,
line_number: int
) -> List[str]:
"""
利用知识库检索上下文,让大模型返回需修改的函数列表
"""
# 构造查询文本:包含缺陷中的函数名、文件名等关键词
query_text = f"{defect_desc}\n{reason}\n文件: {os.path.basename(file_path)}"
# 检索相关函数上下文
retrieved_contexts = retrieve_relevant_functions(query_text, top_k=5)
knowledge_context_block = "\n".join(retrieved_contexts) if retrieved_contexts else "无相关函数知识库条目。"
prompt = f"""你是一位资深 C/C++ 航天嵌入式软件工程师。请根据以下缺陷信息和**知识库检索到的函数上下文**,分析此缺陷影响了哪些功能实现和组件的工作。
【缺陷描述】
{defect_desc}
【分析理由】
{reason}
【缺陷位置】
文件: {file_path}
行号: {line_number}
【知识库检索结果】
{knowledge_context_block}
请严格按以下格式输出(示例):
ObtSunVecI
InitAttEnv
(若无,直接返回空)"""
try:
response = client.chat.completions.create(
model=LLM_MODEL_NAME,
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
max_tokens=200
)
answer = response.choices[0].message.content.strip()
if not answer or any(w in answer for w in ["", "", "没有", "未找到"]):
return []
functions = []
for line in answer.splitlines():
func = line.strip()
if func and (func[0].isalpha() or func[0] == '_'):
func = re.split(r'[^a-zA-Z0-9_]', func)[0]
if func:
functions.append(func)
return functions
except Exception as e:
print(f"获取 functions_to_modify_with_knowledge 出错: {e}")
return []
def build_enhanced_prompt(
defect_desc: str,
file_path: str,
line_number: int,
main_context: str,
main_knowledge: Optional[Dict],
related_summaries: Optional[Dict],
is_in_function: bool,
) -> str:
prompt = f"""你是一名资深 航天软件测试专家、C语言测试专家请根据以下信息判断给出的缺陷告警是否为真实缺陷True Positive注意只关注当下源代码和知识库中的信息以确认该缺陷是否是真正的逻辑硬伤对于可能导致潜在问题的非直接缺陷以及单纯的编码不规范问题进行忽视。
【缺陷描述】
{defect_desc}
【缺陷位置】
文件:{file_path}
行号:{line_number}
【代码上下文】
{main_context}
"""
if is_in_function and main_knowledge:
prompt += f"""【主函数知识库信息】
- 函数名: {main_knowledge.get('function_name', 'N/A')}
- 文件: {main_knowledge.get('file_path', 'N/A')}
- 功能摘要: {main_knowledge.get('summary', 'N/A')}
- 包含头文件: {', '.join(main_knowledge.get('includes', [])) or 'None'}
"""
if related_summaries:
if related_summaries["called_by"]:
prompt += "\n【调用此函数的关键函数摘要】\n" + "\n".join(related_summaries["called_by"])
if related_summaries["calls"]:
prompt += "\n\n【此函数调用的关键函数摘要】\n" + "\n".join(related_summaries["calls"])
else:
prompt += "注意:该缺陷位于非函数上下文(如全局变量、宏定义等),请谨慎判断。\n"
prompt += """
请严格按以下 JSON 格式输出,不要包含其他内容:
{
"is_real_defect": true 或 false,
"reason": "简要说明原因",
"risk_zone": ["影响域分析"],
"suggestion": "修复建议"
}
"""
return prompt
def is_pure_style_issue(defect_desc: str) -> bool:
"""
快速判断缺陷描述是否仅为编码风格/规范问题(非逻辑缺陷)。
若是,则可跳过后续源码分析和知识库检索,节省资源。
返回 True 表示是纯风格问题应舍弃False 表示可能涉及逻辑,需进一步分析。
"""
style_prompt = f"""你是一名资深 C 语言航天软件测试专家。请判断以下静态分析工具报告的缺陷描述是否**仅涉及编码风格、格式、命名规范等非功能性问题**,而不涉及任何逻辑错误、内存安全、数值计算、状态机、控制流等实质性风险。
【缺陷描述】
{defect_desc}
请严格按以下 JSON 格式输出:
{{ "is_pure_style": true 或 false, "reason": "简要说明" }}"""
try:
completion = client.chat.completions.create(
model=LLM_MODEL_NAME,
messages=[{"role": "user", "content": style_prompt}],
temperature=0.0,
max_tokens=128
)
content = completion.choices[0].message.content
# 尝试提取 JSON
json_match = re.search(r"```(?:json)?\s*({.*?})\s*```", content, re.DOTALL)
if json_match:
result = json.loads(json_match.group(1))
else:
result = json.loads(content)
return bool(result.get("is_pure_style", False))
except Exception as e:
print(f"Style filter LLM call failed: {e}. Treating as NOT pure style (proceed to full analysis).")
return False # 出错时保守处理:进入完整分析
def analyze_defect(defect_desc: str, file_path: str, line_number: int) -> Dict:
# Step 1: 获取函数上下文
func_info = get_function_context(file_path, line_number)
if func_info:
func_name, func_code = func_info
main_context = f"// Function: {func_name}\n{func_code}"
is_in_function = True
# Step 2: 检索主函数知识
query = f"{func_name} in {file_path}"
main_knowledge_hits = retrieve_knowledge(query, top_k=1)
main_knowledge = main_knowledge_hits[0] if main_knowledge_hits else None
# Step 3: 获取相关调用摘要
related_summaries = retrieve_related_summaries(main_knowledge) if main_knowledge else None
else:
# Fallback to raw context
main_context = get_fallback_context(file_path, line_number)
is_in_function = False
main_knowledge = None
related_summaries = None
# Step 4: 构建增强 prompt
prompt = build_enhanced_prompt(
defect_desc=defect_desc,
file_path=file_path,
line_number=line_number,
main_context=main_context,
main_knowledge=main_knowledge,
related_summaries=related_summaries,
is_in_function=is_in_function
)
# Step 5: 调用 LLM
try:
completion = client.chat.completions.create(
model=LLM_MODEL_NAME, # "qwen-max"
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
max_tokens=512
)
# 新版:通过属性访问,而非字典
content = completion.choices[0].message.content
# JSON 解析逻辑保持不变
json_match = re.search(r"```(?:json)?\s*({.*?})\s*```", content, re.DOTALL)
if json_match:
result = json.loads(json_match.group(1))
else:
result = json.loads(content)
# Step 6: 新增 - 为真实缺陷计算紧急分数
if result.get("is_real_defect") is True:
urgency_score = get_urgency_score_for_A(defect_desc, result.get("reason", ""))
result["urgency_score"] = urgency_score
# Step 7: 新增 - 为真实缺陷进行影响域分析
if(urgency_score>70):
affected_functions = get_functions_to_modify_with_knowledge(
defect_desc, result.get("reason", ""), file_path, line_number
)
result["affected_functions"] = affected_functions
else:
result["affected_functions"] =""
else:
result["urgency_score"] = 0 # 非真实缺陷分数为0
result["affected_functions"] = [] # 非真实缺陷无影响函数
return result
except Exception as e:
return {
"is_real_defect": None,
"reason": f"LLM call failed: {str(e)}",
"risk_points": [],
"suggestion": "大模型调用失败",
"urgency_score": 0, # 出错时分数为0
"affected_functions": [] # 出错时无影响函数
}
def process_defects_from_excel(input_xlsx: str, output_json: str):
print(f"Loading defects from {input_xlsx}...")
df = pd.read_excel(input_xlsx, engine="openpyxl")
if df.shape[1] < 13:
raise ValueError("Excel 至少需要 M 列第13列")
results = []
for idx, row in df.iterrows():
try:
file_path = Project_path + "/" + row.iloc[10] # K
line_str = row.iloc[11] # L
defect_desc = row.iloc[12] # M
if pd.isna(file_path) or pd.isna(defect_desc):
print(f"Skip row {idx + 2}: missing file or description")
continue
file_path = str(file_path).strip()
defect_desc = str(defect_desc).strip()
try:
line_number = int(float(line_str))
except (ValueError, TypeError):
print(f"Invalid line number at row {idx + 2}: {line_str}")
continue
print(f"Processing row {idx + 2}: {file_path}:{line_number}")
# >>>> 新增:快速风格过滤 <<<<
if is_pure_style_issue(defect_desc):
print(f" → Skipped (pure style issue): {defect_desc[:60]}...")
analysis = {
"is_real_defect": False,
"reason": "该问题仅为编码风格或规范问题,无实际逻辑风险。",
"risk_points": [],
"suggestion": "可忽略此类静态分析告警,或通过代码格式化工具统一处理。",
"urgency_score": 0, # 风格问题分数为0
"affected_functions": [] # 风格问题无影响函数
}
else:
# 原有完整分析流程
analysis = analyze_defect(defect_desc, file_path, line_number)
results.append({
"row_index": idx + 2,
"file_path": file_path,
"line_number": line_number,
"defect_description": defect_desc,
"analysis_result": analysis
})
# 可选:避免 API 限流DashScope 免费版有 QPM 限制)
time.sleep(0.1)
except Exception as e:
print(f"Error processing row {idx + 2}: {e}")
results.append({
"row_index": idx + 2,
"file_path": str(row.iloc[10]) if not pd.isna(row.iloc[10]) else "",
"line_number": str(row.iloc[11]) if not pd.isna(row.iloc[11]) else "",
"defect_description": str(row.iloc[12]) if not pd.isna(row.iloc[12]) else "",
"analysis_result": {
"is_real_defect": None,
"reason": f"Unexpected error: {str(e)}",
"risk_points": [],
"suggestion": "处理过程中发生异常",
"urgency_score": 0, # 出错时分数为0
"affected_functions": []
}
})
with open(output_json, "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\nCompleted! Results saved to {output_json}")
true_positives = sum(1 for r in results if r["analysis_result"].get("is_real_defect") is True)
false_positives = sum(1 for r in results if r["analysis_result"].get("is_real_defect") is False)
unknown = len(results) - true_positives - false_positives
# 统计紧急分数分布
high_urgency = sum(1 for r in results if r["analysis_result"].get("urgency_score", 0) >= 70)
medium_urgency = sum(1 for r in results if 40 <= r["analysis_result"].get("urgency_score", 0) < 70)
low_urgency = sum(1 for r in results if 0 < r["analysis_result"].get("urgency_score", 0) < 40)
# 统计影响函数数量
total_affected_functions = sum(len(r["analysis_result"].get("affected_functions", [])) for r in results)
defects_with_affected_functions = sum(1 for r in results if r["analysis_result"].get("affected_functions"))
print(f"统计:真实缺陷 {true_positives} 条,误报 {false_positives} 条,未知 {unknown}")
print(f"紧急程度分布:高紧急({high_urgency}条) 中紧急({medium_urgency}条) 低紧急({low_urgency}条)")
print(f"影响域分析:{defects_with_affected_functions} 个缺陷影响了 {total_affected_functions} 个函数")
# ============================
# 主程序
# ============================
if __name__ == "__main__":
process_defects_from_excel(INPUT_XLSX, OUTPUT_JSON)