Files

1203 lines
47 KiB
Python
Raw Permalink Normal View History

import os
import sys
import json
import logging
import shutil
from pathlib import Path
import faiss
import numpy as np
# 导入自定义模块
from graph_builder import build_rag_database_interactive
from feature_retriever import FeatureRetriever
from static_analyzer import run_static_analysis
from issue_filter import run_issue_filtering
# ======================
# 导入原有 config.py 配置
# ======================
from config import (
QWEN_API_KEY, QWEN_API_URL,
PROJECT_ROOT, VECTOR_DB_PATH, METADATA_PATH, GRAPH_DATA_PATH, KNOWLEDGE_GRAPH_PATH,
Default_PROJECT_ROOT, Default_VECTOR_DB_PATH, Default_METADATA_PATH,
Default_GRAPH_DATA_PATH, Default_KNOWLEDGE_GRAPH_PATH,
MAX_CODE_LENGTH, CPP_EXTENSIONS, IGNORE_DIRS,
TOP_K, MAX_HOPS, MIN_SIMILARITY_THRESHOLD
)
USER_PROJECT_ROOT = PROJECT_ROOT
USER_VECTOR_DB_PATH = VECTOR_DB_PATH
USER_METADATA_PATH = METADATA_PATH
USER_GRAPH_DATA_PATH = GRAPH_DATA_PATH
USER_KNOWLEDGE_GRAPH_PATH = KNOWLEDGE_GRAPH_PATH
INTERNAL_DEFAULT_PROJECT_ROOT = Default_PROJECT_ROOT
INTERNAL_DEFAULT_VECTOR_DB_PATH = Default_VECTOR_DB_PATH
INTERNAL_DEFAULT_METADATA_PATH = Default_METADATA_PATH
INTERNAL_DEFAULT_GRAPH_DATA_PATH = Default_GRAPH_DATA_PATH
INTERNAL_DEFAULT_KNOWLEDGE_GRAPH_PATH = Default_KNOWLEDGE_GRAPH_PATH
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("satellite_rag_system.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 保存用户运行时配置的文件名
RUNTIME_CONFIG_FILE = "user_runtime_config.json"
def load_runtime_config():
"""加载用户运行时保存的配置"""
if os.path.exists(RUNTIME_CONFIG_FILE):
try:
with open(RUNTIME_CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.warning(f"加载运行时配置文件失败: {e}")
return {}
def save_runtime_config(config):
"""保存用户运行时配置"""
try:
with open(RUNTIME_CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
logger.info(f"运行时配置已保存至: {RUNTIME_CONFIG_FILE}")
return True
except Exception as e:
logger.error(f"保存运行时配置失败: {e}")
return False
def get_final_path(config_key, user_input, config_user_path, internal_default_path, runtime_config):
"""
统一的路径解析函数
优先级: 1.用户本次输入 > 2.已保存的运行时配置 > 3.config.py中的主路径变量 > 4.config.py中的Default_*后备路径
"""
# 优先级1: 用户本次运行的输入
if user_input and str(user_input).strip():
final_path_str = str(user_input).strip()
# 立即更新运行时配置
runtime_config[config_key] = final_path_str
final_path = Path(final_path_str)
# 优先级2: 之前保存的运行时配置
elif config_key in runtime_config and runtime_config[config_key]:
final_path_str = runtime_config[config_key]
final_path = Path(final_path_str)
# 优先级3: config.py中预设的主路径变量 (映射为USER_*)
elif config_user_path and str(config_user_path).strip():
final_path = Path(config_user_path)
# 优先级4: config.py中预设的Default_*后备路径 (映射为INTERNAL_DEFAULT_*)
else:
final_path = Path(internal_default_path)
# 如果内部默认路径是相对路径,尝试基于项目根目录解析
if not final_path.is_absolute() and config_key != 'PROJECT_ROOT':
project_root = get_final_path(
'PROJECT_ROOT', '', USER_PROJECT_ROOT,
INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config
)
# 注意:避免创建不存在的路径,仅在 project_root 存在时拼接
if project_root.exists():
final_path = project_root / final_path
else:
# 如果 project_root 也不存在,回退到当前工作目录
final_path = Path.cwd() / final_path
# 关键修复:确保不返回空路径
if not str(final_path).strip():
# 如果最终路径仍然是空的,使用当前目录的默认文件名
default_filenames = {
'VECTOR_DB_PATH': 'satellite_rag.faiss',
'METADATA_PATH': 'satellite_rag_metadata.json',
'GRAPH_DATA_PATH': 'code_knowledge_graph.json',
'PROJECT_ROOT': '.'
}
if config_key in default_filenames:
final_path = Path(default_filenames[config_key])
else:
final_path = Path('.') # 回退到当前目录
# 尝试解析为绝对路径,并规范化
try:
return final_path.resolve()
except:
# 如果路径无效如包含不存在的父目录返回原始Path对象
return final_path
def ensure_directory_exists(file_path):
"""确保文件所在的目录存在"""
directory = os.path.dirname(file_path)
if directory and not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
logger.info(f"创建目录: {directory}")
return file_path
def check_api_key():
"""检查API Key配置"""
print("=" * 60)
print("检查API Key配置")
print("=" * 60)
if not QWEN_API_KEY or QWEN_API_KEY == "sk-your-api-key-here":
print("⚠ 警告: API Key未配置或为默认值")
print(f"当前API Key: {QWEN_API_KEY[:20]}..." if QWEN_API_KEY else "")
print(f"API URL: {QWEN_API_URL}")
response = input("是否继续? (y/n): ").strip().lower()
if response != 'y':
print("请先配置 config.py 中的 QWEN_API_KEY 字段")
sys.exit(1)
else:
print(f"✓ API Key已配置: {QWEN_API_KEY[:20]}...")
print(f"✓ API URL: {QWEN_API_URL}")
return True
def check_knowledge_base_exists(runtime_config):
"""检查知识库是否已建设"""
print("\n检查知识库文件...")
# 使用统一的路径解析函数获取最终路径
vector_db_path = get_final_path(
'VECTOR_DB_PATH', '', USER_VECTOR_DB_PATH,
INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config
)
metadata_path = get_final_path(
'METADATA_PATH', '', USER_METADATA_PATH,
INTERNAL_DEFAULT_METADATA_PATH, runtime_config
)
required_files = [vector_db_path, metadata_path]
existing_files = []
missing_files = []
for file_path in required_files:
if os.path.exists(file_path):
existing_files.append(file_path)
else:
missing_files.append(file_path)
if existing_files:
print(f"✓ 找到 {len(existing_files)} 个知识库文件:")
for file in existing_files:
print(f" - {file}")
if missing_files:
print(f"⚠ 缺失 {len(missing_files)} 个知识库文件:")
for file in missing_files:
print(f" - {file}")
return len(missing_files) == 0, existing_files, missing_files, vector_db_path, metadata_path
def manual_configure_knowledge_base(runtime_config):
"""手动配置知识库文件地址"""
print("\n" + "=" * 60)
print("手动配置知识库文件")
print("=" * 60)
config_changes = {}
# 获取当前默认值用于显示
current_vector_db = get_final_path(
'VECTOR_DB_PATH', '', USER_VECTOR_DB_PATH,
INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config
)
current_metadata = get_final_path(
'METADATA_PATH', '', USER_METADATA_PATH,
INTERNAL_DEFAULT_METADATA_PATH, runtime_config
)
# 询问向量数据库路径
vector_input = input(f"请输入FAISS向量数据库文件路径 (当前: {current_vector_db}): ").strip()
if vector_input:
vector_path = Path(vector_input)
config_changes['VECTOR_DB_PATH'] = str(vector_path)
else:
vector_path = current_vector_db
# 询问元数据文件路径
metadata_input = input(f"请输入元数据文件路径 (当前: {current_metadata}): ").strip()
if metadata_input:
metadata_path = Path(metadata_input)
config_changes['METADATA_PATH'] = str(metadata_path)
else:
metadata_path = current_metadata
# 验证文件
print(f"\n验证配置的文件...")
all_valid = True
try:
if os.path.exists(vector_path):
index = faiss.read_index(str(vector_path))
dimension = index.d
print(f"✓ FAISS索引: {vector_path} (维度: {dimension})")
else:
print(f"⚠ 文件不存在: {vector_path}")
all_valid = False
except Exception as e:
print(f"⚠ 加载FAISS索引失败: {e}")
all_valid = False
try:
if os.path.exists(metadata_path):
with open(metadata_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
print(f"✓ 元数据文件: {metadata_path} (包含 {len(metadata)} 个函数)")
else:
print(f"⚠ 文件不存在: {metadata_path}")
all_valid = False
except Exception as e:
print(f"⚠ 加载元数据失败: {e}")
all_valid = False
if config_changes and all_valid:
# 更新运行时配置
runtime_config.update(config_changes)
save_config = input("\n是否保存此配置供下次使用? (y/n): ").strip().lower()
if save_config == 'y':
if save_runtime_config(runtime_config):
print(f"✓ 配置已保存到运行时文件: {RUNTIME_CONFIG_FILE}")
return True
elif not all_valid:
print("\n配置验证失败,请检查文件路径。")
return False
else:
print("\n使用现有配置,无更改。")
return True
def load_knowledge_base(runtime_config):
"""加载知识库文件"""
print("\n" + "=" * 60)
print("加载知识库")
print("=" * 60)
# 检查知识库
kb_exists, existing_files, missing_files, vector_db_path, metadata_path = check_knowledge_base_exists(
runtime_config)
if kb_exists:
print("\n✓ 使用现有知识库配置")
# 验证路径是否为空
if not str(vector_db_path).strip():
print("⚠ 错误: 向量数据库路径为空!")
print("正在尝试使用默认值...")
vector_db_path = Path("satellite_rag.faiss")
if not str(metadata_path).strip():
print("⚠ 错误: 元数据文件路径为空!")
print("正在尝试使用默认值...")
metadata_path = Path("satellite_rag_metadata.json")
return {
'vector_db_path': str(vector_db_path),
'metadata_path': str(metadata_path),
'graph_data_path': str(get_final_path(
'GRAPH_DATA_PATH', '', USER_GRAPH_DATA_PATH,
INTERNAL_DEFAULT_GRAPH_DATA_PATH, runtime_config
))
}
else:
print("\n知识库文件不完整,需要配置。")
if manual_configure_knowledge_base(runtime_config):
# 重新检查
kb_exists, existing_files, missing_files, vector_db_path, metadata_path = check_knowledge_base_exists(
runtime_config)
if kb_exists:
return {
'vector_db_path': str(vector_db_path),
'metadata_path': str(metadata_path),
'graph_data_path': str(get_final_path(
'GRAPH_DATA_PATH', '', USER_GRAPH_DATA_PATH,
INTERNAL_DEFAULT_GRAPH_DATA_PATH, runtime_config
))
}
return None
def feature_retrieval_mode(runtime_config):
"""功能需求检索模式(主动引导路径输入版)"""
print("\n" + "=" * 60)
print("功能需求检索分析")
print("=" * 60)
# 第一步:主动引导用户配置或确认知识库路径
print("请配置知识库路径直接按Enter键将使用括号内显示的默认路径")
# 获取当前根据优先级计算出的默认路径(用于提示用户)
default_vector_path = get_final_path(
'VECTOR_DB_PATH', '', USER_VECTOR_DB_PATH,
INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config
)
default_metadata_path = get_final_path(
'METADATA_PATH', '', USER_METADATA_PATH,
INTERNAL_DEFAULT_METADATA_PATH, runtime_config
)
# 引导输入向量数据库路径
vector_prompt = f"请输入FAISS向量数据库文件路径 (默认: {default_vector_path}): "
vector_input = input(vector_prompt).strip()
# 引导输入元数据文件路径
metadata_prompt = f"请输入元数据文件路径 (默认: {default_metadata_path}): "
metadata_input = input(metadata_prompt).strip()
# 使用用户输入(或空输入)重新计算最终路径
# 注意:这里将用户输入传入 get_final_path它会自动更新 runtime_config
final_vector_path = get_final_path(
'VECTOR_DB_PATH', vector_input, USER_VECTOR_DB_PATH,
INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config
)
final_metadata_path = get_final_path(
'METADATA_PATH', metadata_input, USER_METADATA_PATH,
INTERNAL_DEFAULT_METADATA_PATH, runtime_config
)
# 检查文件是否存在
print(f"\n检查知识库文件...")
files_exist = True
if not os.path.exists(final_vector_path):
print(f"⚠ 向量数据库文件不存在: {final_vector_path}")
files_exist = False
else:
print(f"✓ 向量数据库: {final_vector_path}")
if not os.path.exists(final_metadata_path):
print(f"⚠ 元数据文件不存在: {final_metadata_path}")
files_exist = False
else:
print(f"✓ 元数据文件: {final_metadata_path}")
if not files_exist:
print("\n知识库文件缺失。")
choice = input("请选择: 1. 手动指定其他路径 2. 返回主菜单\n选择 (1-2): ").strip()
if choice == "1":
# 递归调用自身,重新引导配置
return feature_retrieval_mode(runtime_config)
else:
print("功能检索取消,返回主菜单。")
return
# 构建传递给检索器的配置
config = {
'vector_db_path': str(final_vector_path),
'metadata_path': str(final_metadata_path),
'graph_data_path': str(get_final_path(
'GRAPH_DATA_PATH', '', USER_GRAPH_DATA_PATH,
INTERNAL_DEFAULT_GRAPH_DATA_PATH, runtime_config
))
}
# 询问用户是否保存此次配置供下次使用
if vector_input or metadata_input: # 只有当用户本次有输入时才询问保存
save_choice = input(f"\n是否将此次的路径配置保存,供下次程序启动时默认使用? (y/n): ").strip().lower()
if save_choice == 'y':
if save_runtime_config(runtime_config):
print(f"✓ 配置已保存至: {RUNTIME_CONFIG_FILE}")
else:
print("⚠ 配置保存失败。")
# 第二步:加载知识库并进入检索循环
print("\n" + "=" * 60)
print("加载知识库并初始化检索器...")
print("=" * 60)
try:
retriever = FeatureRetriever(config)
if not retriever.load_knowledge_base():
print("加载知识库失败!可能文件格式不正确或已损坏。")
retry = input("是否重新配置路径? (y/n): ").strip().lower()
if retry == 'y':
return feature_retrieval_mode(runtime_config) # 重新开始
else:
print("功能检索取消,返回主菜单。")
return
print(f"知识库加载成功: {len(retriever.metadatas)} 个函数")
print("\n请输入自然语言描述功能需求进行分析")
print("输入 'quit''exit' 返回主菜单")
while True:
query = input("\n功能需求: ").strip()
if query.lower() in ["quit", "exit"]:
break
if not query:
continue
# 执行分析
result = retriever.analyze_with_multiple_constraints(query)
# 显示结果
print(f"\n分析结果:")
print(f"实现状态: {'✅ 已实现' if result['implemented'] else '❌ 未实现'}")
print(f"综合评分: {result.get('total_score', 0):.3f}")
print(f"判断理由: {result.get('reason', '无理由')}")
if result.get("most_relevant_function"):
rel_func = result["most_relevant_function"]
print(f"最相关函数: {rel_func.get('name')}")
except Exception as e:
print(f"分析失败: {e}")
logger.error(f"功能检索失败: {e}", exc_info=True)
# 发生异常时,也提供重新配置的选项
retry = input("\n发生错误,是否重新配置知识库路径? (y/n): ").strip().lower()
if retry == 'y':
feature_retrieval_mode(runtime_config)
def issue_filter_mode(runtime_config):
"""问题单过滤模式(修正知识库传递问题)"""
print("\n" + "=" * 60)
print("问题单过滤模式")
print("=" * 60)
# 第一步:首先引导用户配置知识库路径(先输入,不输入则默认)
print("第一步:配置知识库路径")
# 获取当前默认的知识库路径
default_vector_path = get_final_path(
'VECTOR_DB_PATH', '', USER_VECTOR_DB_PATH,
INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config
)
default_metadata_path = get_final_path(
'METADATA_PATH', '', USER_METADATA_PATH,
INTERNAL_DEFAULT_METADATA_PATH, runtime_config
)
# 引导用户输入向量数据库路径
vector_prompt = f"请输入向量数据库文件路径 (FAISS索引文件)\n(直接按Enter使用当前默认值: {default_vector_path}): "
vector_input = input(vector_prompt).strip()
# 使用用户输入重新计算最终路径
final_vector_path = get_final_path(
'VECTOR_DB_PATH', vector_input, USER_VECTOR_DB_PATH,
INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config
)
if not vector_input:
print(f"使用默认向量数据库: {final_vector_path}")
# 引导用户输入元数据文件路径
metadata_prompt = f"请输入元数据文件路径\n(直接按Enter使用当前默认值: {default_metadata_path}): "
metadata_input = input(metadata_prompt).strip()
# 使用用户输入重新计算最终路径
final_metadata_path = get_final_path(
'METADATA_PATH', metadata_input, USER_METADATA_PATH,
INTERNAL_DEFAULT_METADATA_PATH, runtime_config
)
if not metadata_input:
print(f"使用默认元数据文件: {final_metadata_path}")
# 检查知识库文件是否存在
print(f"\n检查知识库文件...")
files_exist = True
if not os.path.exists(final_vector_path):
print(f"⚠ 向量数据库文件不存在: {final_vector_path}")
files_exist = False
else:
print(f"✓ 向量数据库: {final_vector_path}")
if not os.path.exists(final_metadata_path):
print(f"⚠ 元数据文件不存在: {final_metadata_path}")
files_exist = False
else:
print(f"✓ 元数据文件: {final_metadata_path}")
if not files_exist:
print("\n知识库文件缺失,无法进行问题单过滤。")
print("请先使用主菜单选项1创建知识库或确保文件路径正确。")
return
# 构建知识库配置字典
kb_config = {
'vector_db_path': str(final_vector_path),
'metadata_path': str(final_metadata_path),
'graph_data_path': str(get_final_path(
'GRAPH_DATA_PATH', '', USER_GRAPH_DATA_PATH,
INTERNAL_DEFAULT_GRAPH_DATA_PATH, runtime_config
))
}
# 询问是否保存此次配置
if vector_input or metadata_input:
save_choice = input(f"\n是否将此知识库配置保存供下次使用? (y/n): ").strip().lower()
if save_choice == 'y':
if save_runtime_config(runtime_config):
print(f"✓ 配置已保存至运行时配置文件")
print("\n✓ 知识库配置完成")
print("\n第二步:配置问题单过滤参数")
# 第二步:配置问题单过滤的其他参数
default_input_excel = "科代问题单样例.xlsx"
# 首先检查默认文件是否存在
if not os.path.exists(default_input_excel):
print(f"⚠ 注意: 默认输入文件 '{default_input_excel}' 不存在")
input_prompt = f"请输入待过滤的问题单Excel文件路径\n(直接按Enter使用默认值: {default_input_excel}): "
input_excel = input(input_prompt).strip()
# 用户不输入时使用默认值
if not input_excel:
input_excel = default_input_excel
print(f"使用默认输入文件: {input_excel}")
# 路径检查和修正逻辑...
# [保持您原有的路径检查逻辑,但这里简化展示]
if not os.path.exists(input_excel):
print(f"文件不存在: {input_excel}")
print("问题单过滤取消。")
return
# 输出文件路径
default_output_json = "filtered_defects.json"
output_prompt = f"请输入输出JSON文件路径\n(直接按Enter使用默认值: {default_output_json}): "
output_json = input(output_prompt).strip()
if not output_json:
output_json = default_output_json
print(f"使用默认输出文件: {output_json}")
# 项目根目录路径
default_project_root = get_final_path(
'PROJECT_ROOT', '', USER_PROJECT_ROOT,
INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config
)
project_prompt = f"请输入问题单中代码文件所对应的项目根目录路径\n(直接按Enter使用当前默认值: {default_project_root}): "
project_input = input(project_prompt).strip()
final_project_root = get_final_path(
'PROJECT_ROOT', project_input, USER_PROJECT_ROOT,
INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config
)
if not project_input:
print(f"使用默认项目路径: {final_project_root}")
if not os.path.isdir(final_project_root):
print(f"项目路径无效: {final_project_root}")
print("问题单过滤取消。")
return
# 保存项目路径配置(如果用户有输入)
if project_input:
runtime_config['PROJECT_ROOT'] = str(final_project_root)
save_runtime_config(runtime_config)
print("\n" + "=" * 60)
print("开始问题单过滤分析...")
print("=" * 60)
print(f"输入文件: {input_excel}")
print(f"输出文件: {output_json}")
print(f"项目路径: {final_project_root}")
print("=" * 60)
try:
# 关键:确保传递知识库路径参数
success = run_issue_filtering(
input_xlsx_path=input_excel,
output_json_path=output_json,
project_path_override=str(final_project_root),
# 明确传递知识库路径
kb_vector_path=str(final_vector_path), # 确保这个变量存在且正确
kb_metadata_path=str(final_metadata_path) # 确保这个变量存在且正确
)
if success:
print(f"\n" + "=" * 60)
print("✓ 问题单过滤完成!")
print(f"结果已保存至: {output_json}")
print("=" + "=" * 60)
# 结果查看逻辑
while True:
view_option = input(
"\n请选择操作:\n1. 查看统计摘要\n2. 打开结果文件\n3. 返回主菜单\n选择 (1-3): ").strip()
if view_option == "1":
try:
with open(output_json, 'r', encoding='utf-8') as f:
data = json.load(f)
if not data:
print("结果文件为空。")
continue
true_positives = sum(1 for r in data if r["analysis_result"].get("is_real_defect") is True)
false_positives = sum(1 for r in data if r["analysis_result"].get("is_real_defect") is False)
unknown = len(data) - true_positives - false_positives
high_urgency = sum(1 for r in data if r["analysis_result"].get("urgency_score", 0) >= 70)
medium_urgency = sum(1 for r in data if 40 <= r["analysis_result"].get("urgency_score", 0) < 70)
low_urgency = sum(1 for r in data if 0 < r["analysis_result"].get("urgency_score", 0) < 40)
total_affected = sum(len(r["analysis_result"].get("affected_functions", [])) for r in data)
defects_with_affected = sum(1 for r in data if r["analysis_result"].get("affected_functions"))
print(f"\n统计摘要:")
print(f" 总缺陷数: {len(data)}")
print(f" 真实缺陷: {true_positives} 条 ({true_positives / len(data) * 100:.1f}%)")
print(f" 误报缺陷: {false_positives} 条 ({false_positives / len(data) * 100:.1f}%)")
print(f" 未知/错误: {unknown} 条 ({unknown / len(data) * 100:.1f}%)")
print(f"\n紧急程度分布:")
print(f" 高紧急(≥70): {high_urgency}")
print(f" 中紧急(40-69): {medium_urgency}")
print(f" 低紧急(1-39): {low_urgency}")
print(f" 零分(非缺陷): {false_positives}")
print(f"\n影响域分析:")
print(f" {defects_with_affected} 个缺陷影响了 {total_affected} 个函数")
sheet_counts = {}
for item in data:
sheet_name = item.get("sheet_name", "unknown")
sheet_counts[sheet_name] = sheet_counts.get(sheet_name, 0) + 1
if sheet_counts:
print(f"\n工作表分布:")
for sheet, count in sheet_counts.items():
print(f" {sheet}: {count}")
except Exception as e:
print(f"读取结果文件失败: {e}")
elif view_option == "2":
try:
import subprocess
import platform
with open(output_json, 'r', encoding='utf-8') as f:
data = json.load(f)
if not data:
print("结果文件为空。")
continue
print(f"\n结果文件包含 {len(data)} 条记录。")
view_method = input(
"查看方式:\n1. 完整JSON文件\n2. 仅真实缺陷\n3. 高紧急缺陷(≥70分)\n选择 (1-3): ").strip()
if view_method == "1":
if platform.system() == "Windows":
os.startfile(output_json)
elif platform.system() == "Darwin":
subprocess.run(["open", output_json])
else:
subprocess.run(["xdg-open", output_json])
print(f"已使用默认程序打开: {output_json}")
elif view_method in ["2", "3"]:
if view_method == "2":
filtered_data = [item for item in data if
item["analysis_result"].get("is_real_defect") is True]
print(f"\n真实缺陷 ({len(filtered_data)} 条):")
else:
filtered_data = [item for item in data if
item["analysis_result"].get("urgency_score", 0) >= 70]
print(f"\n高紧急缺陷 ({len(filtered_data)} 条):")
for i, item in enumerate(filtered_data[:10], 1):
print(f"\n{i}. 工作表: {item.get('sheet_name')}, 行: {item.get('row_index')}")
print(
f" 文件: {os.path.basename(item.get('file_path', ''))}:{item.get('line_number')}")
print(f" 描述: {item.get('defect_description', '')[:80]}...")
print(f" 真实缺陷: {item['analysis_result'].get('is_real_defect')}")
print(f" 紧急分数: {item['analysis_result'].get('urgency_score', 0)}")
if item['analysis_result'].get('affected_functions'):
print(
f" 影响函数: {', '.join(item['analysis_result'].get('affected_functions', []))}")
if len(filtered_data) > 10:
print(f"\n... 还有 {len(filtered_data) - 10} 条未显示")
save_filtered = input(f"\n是否将筛选结果保存为新文件? (y/n): ").strip().lower()
if save_filtered == 'y':
filtered_file = output_json.replace('.json', '_filtered.json')
with open(filtered_file, 'w', encoding='utf-8') as f:
json.dump(filtered_data, f, indent=2, ensure_ascii=False)
print(f"筛选结果已保存至: {filtered_file}")
except Exception as e:
print(f"打开或处理结果文件失败: {e}")
elif view_option == "3":
break
else:
print("无效选择,请重新输入")
else:
print("\n" + "=" * 60)
print("✗ 问题单过滤过程失败。")
print("=" + "=" * 60)
except Exception as e:
print(f"\n问题单过滤执行失败: {e}")
import traceback
traceback.print_exc()
def static_analysis_mode(runtime_config):
"""静态分析模式(遵循"先输入,不输入则默认"原则)"""
print("\n" + "=" * 60)
print("静态分析模式")
print("=" * 60)
# 1. 首先检查并配置知识库(静态分析前必须指定知识库)
print("第一步:配置知识库")
# --- 修改开始:使用与项目路径相同的交互逻辑 ---
# 使用get_final_path计算当前默认的知识库路径
default_vector_path = get_final_path(
'VECTOR_DB_PATH', '', USER_VECTOR_DB_PATH,
INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config
)
default_metadata_path = get_final_path(
'METADATA_PATH', '', USER_METADATA_PATH,
INTERNAL_DEFAULT_METADATA_PATH, runtime_config
)
# 引导输入向量数据库路径
vector_prompt = f"请输入FAISS向量数据库文件路径\n(直接按Enter使用当前默认值: {default_vector_path}): "
vector_input = input(vector_prompt).strip()
# 引导输入元数据文件路径
metadata_prompt = f"请输入元数据文件路径\n(直接按Enter使用当前默认值: {default_metadata_path}): "
metadata_input = input(metadata_prompt).strip()
# 使用用户输入(或空输入)重新计算最终路径
final_vector_path = get_final_path(
'VECTOR_DB_PATH', vector_input, USER_VECTOR_DB_PATH,
INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config
)
final_metadata_path = get_final_path(
'METADATA_PATH', metadata_input, USER_METADATA_PATH,
INTERNAL_DEFAULT_METADATA_PATH, runtime_config
)
# 检查文件是否存在
print(f"\n检查知识库文件...")
files_exist = True
if not os.path.exists(final_vector_path):
print(f"⚠ 向量数据库文件不存在: {final_vector_path}")
files_exist = False
else:
print(f"✓ 向量数据库: {final_vector_path}")
if not os.path.exists(final_metadata_path):
print(f"⚠ 元数据文件不存在: {final_metadata_path}")
files_exist = False
else:
print(f"✓ 元数据文件: {final_metadata_path}")
if not files_exist:
print("\n知识库文件缺失。")
choice = input("请选择: 1. 手动指定其他路径 2. 返回主菜单\n选择 (1-2): ").strip()
if choice == "1":
# 递归调用自身,重新引导配置
return static_analysis_mode(runtime_config)
else:
print("静态分析取消,返回主菜单。")
return
# 构建知识库配置字典
config = {
'vector_db_path': str(final_vector_path),
'metadata_path': str(final_metadata_path),
'graph_data_path': str(get_final_path(
'GRAPH_DATA_PATH', '', USER_GRAPH_DATA_PATH,
INTERNAL_DEFAULT_GRAPH_DATA_PATH, runtime_config
))
}
# 询问是否保存此次配置(如果用户本次有输入)
if vector_input or metadata_input:
save_choice = input(f"\n是否将此次的知识库配置保存,供下次程序启动时默认使用? (y/n): ").strip().lower()
if save_choice == 'y':
if save_runtime_config(runtime_config):
print(f"✓ 配置已保存至: {RUNTIME_CONFIG_FILE}")
else:
print("⚠ 配置保存失败。")
# --- 修改结束 ---
print("✓ 知识库配置成功")
print("\n请配置静态分析参数:")
# 2. 获取要分析的项目路径 - 先显示默认值,再提示输入
# 使用get_final_path计算当前默认项目路径
default_project_root = get_final_path(
'PROJECT_ROOT', '', USER_PROJECT_ROOT,
INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config
)
project_prompt = f"请输入要分析的C/C++项目根目录路径\n(直接按Enter使用当前默认值: {default_project_root}): "
project_input = input(project_prompt).strip()
# ...(后续代码保持不变)...
# 使用用户输入重新计算最终路径
project_path = get_final_path(
'PROJECT_ROOT', project_input, USER_PROJECT_ROOT,
INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config
)
# 如果用户没有输入,使用默认值
if not project_input:
print(f"使用默认项目路径: {project_path}")
# 路径验证
if not project_path.exists():
print(f"⚠ 路径不存在: {project_path}")
retry = input("是否重新输入? (y/n): ").strip().lower()
if retry != 'y':
print("静态分析取消。")
return
# 重新获取输入
while True:
new_input = input("请输入正确的项目根目录路径: ").strip()
if not new_input:
print("路径不能为空")
continue
new_path = get_final_path(
'PROJECT_ROOT', new_input, USER_PROJECT_ROOT,
INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config
)
if not new_path.exists():
print(f"路径不存在: {new_path}")
continue
if not new_path.is_dir():
print(f"路径不是目录: {new_path}")
continue
project_path = new_path
project_input = new_input
break
if not project_path.is_dir():
print(f"⚠ 路径不是目录: {project_path}")
retry = input("是否重新输入? (y/n): ").strip().lower()
if retry != 'y':
print("静态分析取消。")
return
# 重新获取输入
while True:
new_input = input("请输入正确的项目根目录路径: ").strip()
if not new_input:
print("路径不能为空")
continue
new_path = get_final_path(
'PROJECT_ROOT', new_input, USER_PROJECT_ROOT,
INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config
)
if not new_path.is_dir():
print(f"路径不是目录: {new_path}")
continue
project_path = new_path
project_input = new_input
break
# 3. 获取规则文件路径 - 先显示默认值,再提示输入
default_rules = "审查规则.xlsx"
# 首先检查默认文件是否存在
if not os.path.exists(default_rules):
print(f"⚠ 注意: 默认规则文件 '{default_rules}' 不存在")
rules_prompt = f"请输入审查规则Excel文件路径\n(直接按Enter使用默认值: {default_rules}): "
rules_input = input(rules_prompt).strip()
# 用户不输入时使用默认值
if not rules_input:
rules_excel = default_rules
print(f"使用默认规则文件: {rules_excel}")
else:
rules_excel = rules_input
# 路径验证
rules_path = Path(rules_excel)
if not rules_path.exists():
print(f"⚠ 规则文件不存在: {rules_path}")
# 如果用户输入了但文件不存在,询问是否重新输入
if rules_excel != default_rules: # 用户输入了自定义路径
retry = input("文件不存在,是否重新输入路径? (y/n): ").strip().lower()
if retry == 'y':
# 重新获取输入
while True:
new_input = input("请输入正确的规则文件路径: ").strip()
if not new_input:
print("路径不能为空")
continue
new_path = Path(new_input)
if not new_path.exists():
print(f"文件不存在: {new_path}")
continue
rules_excel = new_input
rules_path = new_path
break
else:
print("静态分析取消。")
return
else: # 用户使用的是默认路径
print(f"默认规则文件 {default_rules} 不存在。")
manual_input = input("是否手动输入文件路径? (y/n): ").strip().lower()
if manual_input != 'y':
print("静态分析取消。")
return
while True:
new_input = input("请输入正确的规则文件路径: ").strip()
if not new_input:
print("路径不能为空")
continue
new_path = Path(new_input)
if not new_path.exists():
print(f"文件不存在: {new_path}")
continue
rules_excel = new_input
rules_path = new_path
break
# 4. 获取输出报告名称 - 先显示默认值,再提示输入
default_output = "audit_report"
output_prompt = f"请输入输出报告文件的基础名称 (不包含扩展名)\n(直接按Enter使用默认值: {default_output}): "
output_input = input(output_prompt).strip()
# 用户不输入时使用默认值
if not output_input:
output_base = default_output
print(f"使用默认输出报告名称: {output_base}")
else:
output_base = output_input
print("\n" + "=" * 60)
print("开始静态分析...")
print("=" * 60)
print(f"项目路径: {project_path}")
print(f"规则文件: {rules_path}")
print(f"输出报告: {output_base}")
print("=" * 60)
try:
success = run_static_analysis(
project_path=str(project_path),
rules_excel=str(rules_path),
output_report=output_base
)
if success:
print("\n" + "=" * 60)
print("✓ 静态分析执行成功!")
print("=" + "=" * 60)
# 保存更新后的运行时配置(如果用户有输入项目路径)
if project_input:
runtime_config['PROJECT_ROOT'] = str(project_path)
save_runtime_config(runtime_config)
print(f"✓ 项目路径已保存到运行时配置")
# 提供查看结果的选项
result_files = [
f"{output_base}.html",
f"{output_base}.pdf",
f"{output_base}.json"
]
existing_files = []
for file in result_files:
if os.path.exists(file):
existing_files.append(file)
if existing_files:
print(f"\n生成的报告文件:")
for file in existing_files:
print(f" - {file}")
view_option = input(f"\n是否打开HTML报告文件? (y/n): ").strip().lower()
if view_option == 'y' and os.path.exists(f"{output_base}.html"):
try:
import subprocess
import platform
html_file = f"{output_base}.html"
if platform.system() == "Windows":
os.startfile(html_file)
elif platform.system() == "Darwin":
subprocess.run(["open", html_file])
else:
subprocess.run(["xdg-open", html_file])
print(f"已使用默认程序打开: {html_file}")
except Exception as e:
print(f"打开报告文件失败: {e}")
else:
print("\n" + "=" * 60)
print("✗ 静态分析执行过程中遇到问题。")
print("=" + "=" * 60)
except Exception as e:
print(f"\n静态分析执行失败: {e}")
import traceback
traceback.print_exc()
def main_menu():
"""主菜单界面"""
print("\n" + "=" * 60)
print("卫星星务软件代码RAG知识库系统")
print("=" * 60)
print("版本: 3.0")
print("功能: 支持代码知识库构建、功能检索、问题单过滤、静态分析")
print("=" * 60)
# 加载运行时配置
runtime_config = load_runtime_config()
if runtime_config:
print(f"已加载运行时配置 ({len(runtime_config)} 项)")
# 检查API Key
if not check_api_key():
return
while True:
print("\n" + "=" * 60)
print("主菜单")
print("=" * 60)
print("1. 创建项目知识库")
print("2. 功能需求检索")
print("3. 问题单过滤")
print("4. 静态分析")
print("5. 管理配置文件")
print("6. 退出系统")
print("=" * 60)
choice = input("\n请选择功能 (1-6): ").strip()
if choice == "1":
print("\n" + "=" * 60)
print("创建项目知识库")
print("=" * 60)
print("此功能将分析代码项目,构建知识图谱和向量数据库")
confirm = input("\n确认开始知识库构建? (y/n): ").strip().lower()
if confirm == 'y':
try:
result = build_rag_database_interactive()
if result:
print("\n知识库构建完成!")
except Exception as e:
print(f"知识库构建失败: {e}")
logger.error(f"知识库构建失败: {e}", exc_info=True)
elif choice == "2":
print("\n进入功能需求检索模式...")
kb_exists, _, _, _, _ = check_knowledge_base_exists(runtime_config)
if not kb_exists:
print("\n⚠ 尚未建设项目知识库!")
response = input("是否先创建或配置知识库? (y/n): ").strip().lower()
if response == 'y':
if not manual_configure_knowledge_base(runtime_config):
continue
else:
continue
feature_retrieval_mode(runtime_config)
elif choice == "3":
print("\n进入问题单过滤模式...")
kb_exists, _, _, _, _ = check_knowledge_base_exists(runtime_config)
if not kb_exists:
print("\n⚠ 尚未建设项目知识库!")
response = input("是否配置已有知识库文件? (y/n): ").strip().lower()
if response == 'y':
if not manual_configure_knowledge_base(runtime_config):
continue
else:
print("请先使用选项1创建知识库")
continue
issue_filter_mode(runtime_config)
elif choice == "4":
print("\n进入静态分析模式...")
static_analysis_mode(runtime_config)
elif choice == "5":
print("\n" + "=" * 60)
print("配置文件管理")
print("=" * 60)
print("1. 查看当前运行时配置")
print("2. 清除运行时配置")
print("3. 返回主菜单")
config_choice = input("\n请选择操作 (1-3): ").strip()
if config_choice == "1":
print(f"\n当前运行时配置 ({RUNTIME_CONFIG_FILE}):")
if runtime_config:
for key, value in runtime_config.items():
print(f" {key}: {value}")
else:
print(" 无运行时配置")
print(f"\n配置文件 (config.py) 中的主路径变量:")
print(f" PROJECT_ROOT: {PROJECT_ROOT}")
print(f" VECTOR_DB_PATH: {VECTOR_DB_PATH}")
print(f" METADATA_PATH: {METADATA_PATH}")
print(f" GRAPH_DATA_PATH: {GRAPH_DATA_PATH}")
print(f" KNOWLEDGE_GRAPH_PATH: {KNOWLEDGE_GRAPH_PATH}")
print(f"\n配置文件 (config.py) 中的默认后备路径 (Default_*):")
print(f" Default_PROJECT_ROOT: {Default_PROJECT_ROOT}")
print(f" Default_VECTOR_DB_PATH: {Default_VECTOR_DB_PATH}")
print(f" Default_METADATA_PATH: {Default_METADATA_PATH}")
input("\n按Enter键继续...")
elif config_choice == "2":
if os.path.exists(RUNTIME_CONFIG_FILE):
confirm = input(f"确定要删除运行时配置文件 {RUNTIME_CONFIG_FILE} 吗? (y/n): ").strip().lower()
if confirm == 'y':
os.remove(RUNTIME_CONFIG_FILE)
runtime_config.clear()
print("运行时配置文件已删除。")
else:
print("运行时配置文件不存在。")
elif config_choice == "3":
continue
else:
print("无效选择")
elif choice == "6":
print("\n感谢使用,再见!")
# 退出前保存配置
if runtime_config:
save_runtime_config(runtime_config)
break
else:
print("无效的选择,请重新输入")
if __name__ == "__main__":
try:
main_menu()
except KeyboardInterrupt:
print("\n\n程序被用户中断。")
except Exception as e:
print(f"\n程序执行出错: {e}")
import traceback
traceback.print_exc()
logger.error(f"主程序异常: {e}", exc_info=True)