import os import sys import json import logging import shutil from pathlib import Path import faiss import numpy as np # 导入自定义模块 from graph_builder import build_rag_database_interactive from feature_retriever import FeatureRetriever from static_analyzer import run_static_analysis from issue_filter import run_issue_filtering # ====================== # 导入原有 config.py 配置 # ====================== from config import ( QWEN_API_KEY, QWEN_API_URL, PROJECT_ROOT, VECTOR_DB_PATH, METADATA_PATH, GRAPH_DATA_PATH, KNOWLEDGE_GRAPH_PATH, Default_PROJECT_ROOT, Default_VECTOR_DB_PATH, Default_METADATA_PATH, Default_GRAPH_DATA_PATH, Default_KNOWLEDGE_GRAPH_PATH, MAX_CODE_LENGTH, CPP_EXTENSIONS, IGNORE_DIRS, TOP_K, MAX_HOPS, MIN_SIMILARITY_THRESHOLD ) USER_PROJECT_ROOT = PROJECT_ROOT USER_VECTOR_DB_PATH = VECTOR_DB_PATH USER_METADATA_PATH = METADATA_PATH USER_GRAPH_DATA_PATH = GRAPH_DATA_PATH USER_KNOWLEDGE_GRAPH_PATH = KNOWLEDGE_GRAPH_PATH INTERNAL_DEFAULT_PROJECT_ROOT = Default_PROJECT_ROOT INTERNAL_DEFAULT_VECTOR_DB_PATH = Default_VECTOR_DB_PATH INTERNAL_DEFAULT_METADATA_PATH = Default_METADATA_PATH INTERNAL_DEFAULT_GRAPH_DATA_PATH = Default_GRAPH_DATA_PATH INTERNAL_DEFAULT_KNOWLEDGE_GRAPH_PATH = Default_KNOWLEDGE_GRAPH_PATH # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("satellite_rag_system.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 保存用户运行时配置的文件名 RUNTIME_CONFIG_FILE = "user_runtime_config.json" def load_runtime_config(): """加载用户运行时保存的配置""" if os.path.exists(RUNTIME_CONFIG_FILE): try: with open(RUNTIME_CONFIG_FILE, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.warning(f"加载运行时配置文件失败: {e}") return {} def save_runtime_config(config): """保存用户运行时配置""" try: with open(RUNTIME_CONFIG_FILE, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2, ensure_ascii=False) logger.info(f"运行时配置已保存至: {RUNTIME_CONFIG_FILE}") return True except Exception as e: logger.error(f"保存运行时配置失败: {e}") return False def get_final_path(config_key, user_input, config_user_path, internal_default_path, runtime_config): """ 统一的路径解析函数 优先级: 1.用户本次输入 > 2.已保存的运行时配置 > 3.config.py中的主路径变量 > 4.config.py中的Default_*后备路径 """ # 优先级1: 用户本次运行的输入 if user_input and str(user_input).strip(): final_path_str = str(user_input).strip() # 立即更新运行时配置 runtime_config[config_key] = final_path_str final_path = Path(final_path_str) # 优先级2: 之前保存的运行时配置 elif config_key in runtime_config and runtime_config[config_key]: final_path_str = runtime_config[config_key] final_path = Path(final_path_str) # 优先级3: config.py中预设的主路径变量 (映射为USER_*) elif config_user_path and str(config_user_path).strip(): final_path = Path(config_user_path) # 优先级4: config.py中预设的Default_*后备路径 (映射为INTERNAL_DEFAULT_*) else: final_path = Path(internal_default_path) # 如果内部默认路径是相对路径,尝试基于项目根目录解析 if not final_path.is_absolute() and config_key != 'PROJECT_ROOT': project_root = get_final_path( 'PROJECT_ROOT', '', USER_PROJECT_ROOT, INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config ) # 注意:避免创建不存在的路径,仅在 project_root 存在时拼接 if project_root.exists(): final_path = project_root / final_path else: # 如果 project_root 也不存在,回退到当前工作目录 final_path = Path.cwd() / final_path # 关键修复:确保不返回空路径 if not str(final_path).strip(): # 如果最终路径仍然是空的,使用当前目录的默认文件名 default_filenames = { 'VECTOR_DB_PATH': 'satellite_rag.faiss', 'METADATA_PATH': 'satellite_rag_metadata.json', 'GRAPH_DATA_PATH': 'code_knowledge_graph.json', 'PROJECT_ROOT': '.' } if config_key in default_filenames: final_path = Path(default_filenames[config_key]) else: final_path = Path('.') # 回退到当前目录 # 尝试解析为绝对路径,并规范化 try: return final_path.resolve() except: # 如果路径无效(如包含不存在的父目录),返回原始Path对象 return final_path def ensure_directory_exists(file_path): """确保文件所在的目录存在""" directory = os.path.dirname(file_path) if directory and not os.path.exists(directory): os.makedirs(directory, exist_ok=True) logger.info(f"创建目录: {directory}") return file_path def check_api_key(): """检查API Key配置""" print("=" * 60) print("检查API Key配置") print("=" * 60) if not QWEN_API_KEY or QWEN_API_KEY == "sk-your-api-key-here": print("⚠ 警告: API Key未配置或为默认值") print(f"当前API Key: {QWEN_API_KEY[:20]}..." if QWEN_API_KEY else "空") print(f"API URL: {QWEN_API_URL}") response = input("是否继续? (y/n): ").strip().lower() if response != 'y': print("请先配置 config.py 中的 QWEN_API_KEY 字段") sys.exit(1) else: print(f"✓ API Key已配置: {QWEN_API_KEY[:20]}...") print(f"✓ API URL: {QWEN_API_URL}") return True def check_knowledge_base_exists(runtime_config): """检查知识库是否已建设""" print("\n检查知识库文件...") # 使用统一的路径解析函数获取最终路径 vector_db_path = get_final_path( 'VECTOR_DB_PATH', '', USER_VECTOR_DB_PATH, INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config ) metadata_path = get_final_path( 'METADATA_PATH', '', USER_METADATA_PATH, INTERNAL_DEFAULT_METADATA_PATH, runtime_config ) required_files = [vector_db_path, metadata_path] existing_files = [] missing_files = [] for file_path in required_files: if os.path.exists(file_path): existing_files.append(file_path) else: missing_files.append(file_path) if existing_files: print(f"✓ 找到 {len(existing_files)} 个知识库文件:") for file in existing_files: print(f" - {file}") if missing_files: print(f"⚠ 缺失 {len(missing_files)} 个知识库文件:") for file in missing_files: print(f" - {file}") return len(missing_files) == 0, existing_files, missing_files, vector_db_path, metadata_path def manual_configure_knowledge_base(runtime_config): """手动配置知识库文件地址""" print("\n" + "=" * 60) print("手动配置知识库文件") print("=" * 60) config_changes = {} # 获取当前默认值用于显示 current_vector_db = get_final_path( 'VECTOR_DB_PATH', '', USER_VECTOR_DB_PATH, INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config ) current_metadata = get_final_path( 'METADATA_PATH', '', USER_METADATA_PATH, INTERNAL_DEFAULT_METADATA_PATH, runtime_config ) # 询问向量数据库路径 vector_input = input(f"请输入FAISS向量数据库文件路径 (当前: {current_vector_db}): ").strip() if vector_input: vector_path = Path(vector_input) config_changes['VECTOR_DB_PATH'] = str(vector_path) else: vector_path = current_vector_db # 询问元数据文件路径 metadata_input = input(f"请输入元数据文件路径 (当前: {current_metadata}): ").strip() if metadata_input: metadata_path = Path(metadata_input) config_changes['METADATA_PATH'] = str(metadata_path) else: metadata_path = current_metadata # 验证文件 print(f"\n验证配置的文件...") all_valid = True try: if os.path.exists(vector_path): index = faiss.read_index(str(vector_path)) dimension = index.d print(f"✓ FAISS索引: {vector_path} (维度: {dimension})") else: print(f"⚠ 文件不存在: {vector_path}") all_valid = False except Exception as e: print(f"⚠ 加载FAISS索引失败: {e}") all_valid = False try: if os.path.exists(metadata_path): with open(metadata_path, 'r', encoding='utf-8') as f: metadata = json.load(f) print(f"✓ 元数据文件: {metadata_path} (包含 {len(metadata)} 个函数)") else: print(f"⚠ 文件不存在: {metadata_path}") all_valid = False except Exception as e: print(f"⚠ 加载元数据失败: {e}") all_valid = False if config_changes and all_valid: # 更新运行时配置 runtime_config.update(config_changes) save_config = input("\n是否保存此配置供下次使用? (y/n): ").strip().lower() if save_config == 'y': if save_runtime_config(runtime_config): print(f"✓ 配置已保存到运行时文件: {RUNTIME_CONFIG_FILE}") return True elif not all_valid: print("\n配置验证失败,请检查文件路径。") return False else: print("\n使用现有配置,无更改。") return True def load_knowledge_base(runtime_config): """加载知识库文件""" print("\n" + "=" * 60) print("加载知识库") print("=" * 60) # 检查知识库 kb_exists, existing_files, missing_files, vector_db_path, metadata_path = check_knowledge_base_exists( runtime_config) if kb_exists: print("\n✓ 使用现有知识库配置") # 验证路径是否为空 if not str(vector_db_path).strip(): print("⚠ 错误: 向量数据库路径为空!") print("正在尝试使用默认值...") vector_db_path = Path("satellite_rag.faiss") if not str(metadata_path).strip(): print("⚠ 错误: 元数据文件路径为空!") print("正在尝试使用默认值...") metadata_path = Path("satellite_rag_metadata.json") return { 'vector_db_path': str(vector_db_path), 'metadata_path': str(metadata_path), 'graph_data_path': str(get_final_path( 'GRAPH_DATA_PATH', '', USER_GRAPH_DATA_PATH, INTERNAL_DEFAULT_GRAPH_DATA_PATH, runtime_config )) } else: print("\n知识库文件不完整,需要配置。") if manual_configure_knowledge_base(runtime_config): # 重新检查 kb_exists, existing_files, missing_files, vector_db_path, metadata_path = check_knowledge_base_exists( runtime_config) if kb_exists: return { 'vector_db_path': str(vector_db_path), 'metadata_path': str(metadata_path), 'graph_data_path': str(get_final_path( 'GRAPH_DATA_PATH', '', USER_GRAPH_DATA_PATH, INTERNAL_DEFAULT_GRAPH_DATA_PATH, runtime_config )) } return None def feature_retrieval_mode(runtime_config): """功能需求检索模式(主动引导路径输入版)""" print("\n" + "=" * 60) print("功能需求检索分析") print("=" * 60) # 第一步:主动引导用户配置或确认知识库路径 print("请配置知识库路径(直接按Enter键将使用括号内显示的默认路径):") # 获取当前根据优先级计算出的默认路径(用于提示用户) default_vector_path = get_final_path( 'VECTOR_DB_PATH', '', USER_VECTOR_DB_PATH, INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config ) default_metadata_path = get_final_path( 'METADATA_PATH', '', USER_METADATA_PATH, INTERNAL_DEFAULT_METADATA_PATH, runtime_config ) # 引导输入向量数据库路径 vector_prompt = f"请输入FAISS向量数据库文件路径 (默认: {default_vector_path}): " vector_input = input(vector_prompt).strip() # 引导输入元数据文件路径 metadata_prompt = f"请输入元数据文件路径 (默认: {default_metadata_path}): " metadata_input = input(metadata_prompt).strip() # 使用用户输入(或空输入)重新计算最终路径 # 注意:这里将用户输入传入 get_final_path,它会自动更新 runtime_config final_vector_path = get_final_path( 'VECTOR_DB_PATH', vector_input, USER_VECTOR_DB_PATH, INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config ) final_metadata_path = get_final_path( 'METADATA_PATH', metadata_input, USER_METADATA_PATH, INTERNAL_DEFAULT_METADATA_PATH, runtime_config ) # 检查文件是否存在 print(f"\n检查知识库文件...") files_exist = True if not os.path.exists(final_vector_path): print(f"⚠ 向量数据库文件不存在: {final_vector_path}") files_exist = False else: print(f"✓ 向量数据库: {final_vector_path}") if not os.path.exists(final_metadata_path): print(f"⚠ 元数据文件不存在: {final_metadata_path}") files_exist = False else: print(f"✓ 元数据文件: {final_metadata_path}") if not files_exist: print("\n知识库文件缺失。") choice = input("请选择: 1. 手动指定其他路径 2. 返回主菜单\n选择 (1-2): ").strip() if choice == "1": # 递归调用自身,重新引导配置 return feature_retrieval_mode(runtime_config) else: print("功能检索取消,返回主菜单。") return # 构建传递给检索器的配置 config = { 'vector_db_path': str(final_vector_path), 'metadata_path': str(final_metadata_path), 'graph_data_path': str(get_final_path( 'GRAPH_DATA_PATH', '', USER_GRAPH_DATA_PATH, INTERNAL_DEFAULT_GRAPH_DATA_PATH, runtime_config )) } # 询问用户是否保存此次配置供下次使用 if vector_input or metadata_input: # 只有当用户本次有输入时才询问保存 save_choice = input(f"\n是否将此次的路径配置保存,供下次程序启动时默认使用? (y/n): ").strip().lower() if save_choice == 'y': if save_runtime_config(runtime_config): print(f"✓ 配置已保存至: {RUNTIME_CONFIG_FILE}") else: print("⚠ 配置保存失败。") # 第二步:加载知识库并进入检索循环 print("\n" + "=" * 60) print("加载知识库并初始化检索器...") print("=" * 60) try: retriever = FeatureRetriever(config) if not retriever.load_knowledge_base(): print("加载知识库失败!可能文件格式不正确或已损坏。") retry = input("是否重新配置路径? (y/n): ").strip().lower() if retry == 'y': return feature_retrieval_mode(runtime_config) # 重新开始 else: print("功能检索取消,返回主菜单。") return print(f"知识库加载成功: {len(retriever.metadatas)} 个函数") print("\n请输入自然语言描述功能需求进行分析") print("输入 'quit' 或 'exit' 返回主菜单") while True: query = input("\n功能需求: ").strip() if query.lower() in ["quit", "exit"]: break if not query: continue # 执行分析 result = retriever.analyze_with_multiple_constraints(query) # 显示结果 print(f"\n分析结果:") print(f"实现状态: {'✅ 已实现' if result['implemented'] else '❌ 未实现'}") print(f"综合评分: {result.get('total_score', 0):.3f}") print(f"判断理由: {result.get('reason', '无理由')}") if result.get("most_relevant_function"): rel_func = result["most_relevant_function"] print(f"最相关函数: {rel_func.get('name')}") except Exception as e: print(f"分析失败: {e}") logger.error(f"功能检索失败: {e}", exc_info=True) # 发生异常时,也提供重新配置的选项 retry = input("\n发生错误,是否重新配置知识库路径? (y/n): ").strip().lower() if retry == 'y': feature_retrieval_mode(runtime_config) def issue_filter_mode(runtime_config): """问题单过滤模式(修正知识库传递问题)""" print("\n" + "=" * 60) print("问题单过滤模式") print("=" * 60) # 第一步:首先引导用户配置知识库路径(先输入,不输入则默认) print("第一步:配置知识库路径") # 获取当前默认的知识库路径 default_vector_path = get_final_path( 'VECTOR_DB_PATH', '', USER_VECTOR_DB_PATH, INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config ) default_metadata_path = get_final_path( 'METADATA_PATH', '', USER_METADATA_PATH, INTERNAL_DEFAULT_METADATA_PATH, runtime_config ) # 引导用户输入向量数据库路径 vector_prompt = f"请输入向量数据库文件路径 (FAISS索引文件)\n(直接按Enter使用当前默认值: {default_vector_path}): " vector_input = input(vector_prompt).strip() # 使用用户输入重新计算最终路径 final_vector_path = get_final_path( 'VECTOR_DB_PATH', vector_input, USER_VECTOR_DB_PATH, INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config ) if not vector_input: print(f"使用默认向量数据库: {final_vector_path}") # 引导用户输入元数据文件路径 metadata_prompt = f"请输入元数据文件路径\n(直接按Enter使用当前默认值: {default_metadata_path}): " metadata_input = input(metadata_prompt).strip() # 使用用户输入重新计算最终路径 final_metadata_path = get_final_path( 'METADATA_PATH', metadata_input, USER_METADATA_PATH, INTERNAL_DEFAULT_METADATA_PATH, runtime_config ) if not metadata_input: print(f"使用默认元数据文件: {final_metadata_path}") # 检查知识库文件是否存在 print(f"\n检查知识库文件...") files_exist = True if not os.path.exists(final_vector_path): print(f"⚠ 向量数据库文件不存在: {final_vector_path}") files_exist = False else: print(f"✓ 向量数据库: {final_vector_path}") if not os.path.exists(final_metadata_path): print(f"⚠ 元数据文件不存在: {final_metadata_path}") files_exist = False else: print(f"✓ 元数据文件: {final_metadata_path}") if not files_exist: print("\n知识库文件缺失,无法进行问题单过滤。") print("请先使用主菜单选项1创建知识库,或确保文件路径正确。") return # 构建知识库配置字典 kb_config = { 'vector_db_path': str(final_vector_path), 'metadata_path': str(final_metadata_path), 'graph_data_path': str(get_final_path( 'GRAPH_DATA_PATH', '', USER_GRAPH_DATA_PATH, INTERNAL_DEFAULT_GRAPH_DATA_PATH, runtime_config )) } # 询问是否保存此次配置 if vector_input or metadata_input: save_choice = input(f"\n是否将此知识库配置保存供下次使用? (y/n): ").strip().lower() if save_choice == 'y': if save_runtime_config(runtime_config): print(f"✓ 配置已保存至运行时配置文件") print("\n✓ 知识库配置完成") print("\n第二步:配置问题单过滤参数") # 第二步:配置问题单过滤的其他参数 default_input_excel = "科代问题单样例.xlsx" # 首先检查默认文件是否存在 if not os.path.exists(default_input_excel): print(f"⚠ 注意: 默认输入文件 '{default_input_excel}' 不存在") input_prompt = f"请输入待过滤的问题单Excel文件路径\n(直接按Enter使用默认值: {default_input_excel}): " input_excel = input(input_prompt).strip() # 用户不输入时使用默认值 if not input_excel: input_excel = default_input_excel print(f"使用默认输入文件: {input_excel}") # 路径检查和修正逻辑... # [保持您原有的路径检查逻辑,但这里简化展示] if not os.path.exists(input_excel): print(f"文件不存在: {input_excel}") print("问题单过滤取消。") return # 输出文件路径 default_output_json = "filtered_defects.json" output_prompt = f"请输入输出JSON文件路径\n(直接按Enter使用默认值: {default_output_json}): " output_json = input(output_prompt).strip() if not output_json: output_json = default_output_json print(f"使用默认输出文件: {output_json}") # 项目根目录路径 default_project_root = get_final_path( 'PROJECT_ROOT', '', USER_PROJECT_ROOT, INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config ) project_prompt = f"请输入问题单中代码文件所对应的项目根目录路径\n(直接按Enter使用当前默认值: {default_project_root}): " project_input = input(project_prompt).strip() final_project_root = get_final_path( 'PROJECT_ROOT', project_input, USER_PROJECT_ROOT, INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config ) if not project_input: print(f"使用默认项目路径: {final_project_root}") if not os.path.isdir(final_project_root): print(f"项目路径无效: {final_project_root}") print("问题单过滤取消。") return # 保存项目路径配置(如果用户有输入) if project_input: runtime_config['PROJECT_ROOT'] = str(final_project_root) save_runtime_config(runtime_config) print("\n" + "=" * 60) print("开始问题单过滤分析...") print("=" * 60) print(f"输入文件: {input_excel}") print(f"输出文件: {output_json}") print(f"项目路径: {final_project_root}") print("=" * 60) try: # 关键:确保传递知识库路径参数 success = run_issue_filtering( input_xlsx_path=input_excel, output_json_path=output_json, project_path_override=str(final_project_root), # 明确传递知识库路径 kb_vector_path=str(final_vector_path), # 确保这个变量存在且正确 kb_metadata_path=str(final_metadata_path) # 确保这个变量存在且正确 ) if success: print(f"\n" + "=" * 60) print("✓ 问题单过滤完成!") print(f"结果已保存至: {output_json}") print("=" + "=" * 60) # 结果查看逻辑 while True: view_option = input( "\n请选择操作:\n1. 查看统计摘要\n2. 打开结果文件\n3. 返回主菜单\n选择 (1-3): ").strip() if view_option == "1": try: with open(output_json, 'r', encoding='utf-8') as f: data = json.load(f) if not data: print("结果文件为空。") continue true_positives = sum(1 for r in data if r["analysis_result"].get("is_real_defect") is True) false_positives = sum(1 for r in data if r["analysis_result"].get("is_real_defect") is False) unknown = len(data) - true_positives - false_positives high_urgency = sum(1 for r in data if r["analysis_result"].get("urgency_score", 0) >= 70) medium_urgency = sum(1 for r in data if 40 <= r["analysis_result"].get("urgency_score", 0) < 70) low_urgency = sum(1 for r in data if 0 < r["analysis_result"].get("urgency_score", 0) < 40) total_affected = sum(len(r["analysis_result"].get("affected_functions", [])) for r in data) defects_with_affected = sum(1 for r in data if r["analysis_result"].get("affected_functions")) print(f"\n统计摘要:") print(f" 总缺陷数: {len(data)}") print(f" 真实缺陷: {true_positives} 条 ({true_positives / len(data) * 100:.1f}%)") print(f" 误报缺陷: {false_positives} 条 ({false_positives / len(data) * 100:.1f}%)") print(f" 未知/错误: {unknown} 条 ({unknown / len(data) * 100:.1f}%)") print(f"\n紧急程度分布:") print(f" 高紧急(≥70): {high_urgency} 条") print(f" 中紧急(40-69): {medium_urgency} 条") print(f" 低紧急(1-39): {low_urgency} 条") print(f" 零分(非缺陷): {false_positives} 条") print(f"\n影响域分析:") print(f" {defects_with_affected} 个缺陷影响了 {total_affected} 个函数") sheet_counts = {} for item in data: sheet_name = item.get("sheet_name", "unknown") sheet_counts[sheet_name] = sheet_counts.get(sheet_name, 0) + 1 if sheet_counts: print(f"\n工作表分布:") for sheet, count in sheet_counts.items(): print(f" {sheet}: {count} 条") except Exception as e: print(f"读取结果文件失败: {e}") elif view_option == "2": try: import subprocess import platform with open(output_json, 'r', encoding='utf-8') as f: data = json.load(f) if not data: print("结果文件为空。") continue print(f"\n结果文件包含 {len(data)} 条记录。") view_method = input( "查看方式:\n1. 完整JSON文件\n2. 仅真实缺陷\n3. 高紧急缺陷(≥70分)\n选择 (1-3): ").strip() if view_method == "1": if platform.system() == "Windows": os.startfile(output_json) elif platform.system() == "Darwin": subprocess.run(["open", output_json]) else: subprocess.run(["xdg-open", output_json]) print(f"已使用默认程序打开: {output_json}") elif view_method in ["2", "3"]: if view_method == "2": filtered_data = [item for item in data if item["analysis_result"].get("is_real_defect") is True] print(f"\n真实缺陷 ({len(filtered_data)} 条):") else: filtered_data = [item for item in data if item["analysis_result"].get("urgency_score", 0) >= 70] print(f"\n高紧急缺陷 ({len(filtered_data)} 条):") for i, item in enumerate(filtered_data[:10], 1): print(f"\n{i}. 工作表: {item.get('sheet_name')}, 行: {item.get('row_index')}") print( f" 文件: {os.path.basename(item.get('file_path', ''))}:{item.get('line_number')}") print(f" 描述: {item.get('defect_description', '')[:80]}...") print(f" 真实缺陷: {item['analysis_result'].get('is_real_defect')}") print(f" 紧急分数: {item['analysis_result'].get('urgency_score', 0)}") if item['analysis_result'].get('affected_functions'): print( f" 影响函数: {', '.join(item['analysis_result'].get('affected_functions', []))}") if len(filtered_data) > 10: print(f"\n... 还有 {len(filtered_data) - 10} 条未显示") save_filtered = input(f"\n是否将筛选结果保存为新文件? (y/n): ").strip().lower() if save_filtered == 'y': filtered_file = output_json.replace('.json', '_filtered.json') with open(filtered_file, 'w', encoding='utf-8') as f: json.dump(filtered_data, f, indent=2, ensure_ascii=False) print(f"筛选结果已保存至: {filtered_file}") except Exception as e: print(f"打开或处理结果文件失败: {e}") elif view_option == "3": break else: print("无效选择,请重新输入") else: print("\n" + "=" * 60) print("✗ 问题单过滤过程失败。") print("=" + "=" * 60) except Exception as e: print(f"\n问题单过滤执行失败: {e}") import traceback traceback.print_exc() def static_analysis_mode(runtime_config): """静态分析模式(遵循"先输入,不输入则默认"原则)""" print("\n" + "=" * 60) print("静态分析模式") print("=" * 60) # 1. 首先检查并配置知识库(静态分析前必须指定知识库) print("第一步:配置知识库") # --- 修改开始:使用与项目路径相同的交互逻辑 --- # 使用get_final_path计算当前默认的知识库路径 default_vector_path = get_final_path( 'VECTOR_DB_PATH', '', USER_VECTOR_DB_PATH, INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config ) default_metadata_path = get_final_path( 'METADATA_PATH', '', USER_METADATA_PATH, INTERNAL_DEFAULT_METADATA_PATH, runtime_config ) # 引导输入向量数据库路径 vector_prompt = f"请输入FAISS向量数据库文件路径\n(直接按Enter使用当前默认值: {default_vector_path}): " vector_input = input(vector_prompt).strip() # 引导输入元数据文件路径 metadata_prompt = f"请输入元数据文件路径\n(直接按Enter使用当前默认值: {default_metadata_path}): " metadata_input = input(metadata_prompt).strip() # 使用用户输入(或空输入)重新计算最终路径 final_vector_path = get_final_path( 'VECTOR_DB_PATH', vector_input, USER_VECTOR_DB_PATH, INTERNAL_DEFAULT_VECTOR_DB_PATH, runtime_config ) final_metadata_path = get_final_path( 'METADATA_PATH', metadata_input, USER_METADATA_PATH, INTERNAL_DEFAULT_METADATA_PATH, runtime_config ) # 检查文件是否存在 print(f"\n检查知识库文件...") files_exist = True if not os.path.exists(final_vector_path): print(f"⚠ 向量数据库文件不存在: {final_vector_path}") files_exist = False else: print(f"✓ 向量数据库: {final_vector_path}") if not os.path.exists(final_metadata_path): print(f"⚠ 元数据文件不存在: {final_metadata_path}") files_exist = False else: print(f"✓ 元数据文件: {final_metadata_path}") if not files_exist: print("\n知识库文件缺失。") choice = input("请选择: 1. 手动指定其他路径 2. 返回主菜单\n选择 (1-2): ").strip() if choice == "1": # 递归调用自身,重新引导配置 return static_analysis_mode(runtime_config) else: print("静态分析取消,返回主菜单。") return # 构建知识库配置字典 config = { 'vector_db_path': str(final_vector_path), 'metadata_path': str(final_metadata_path), 'graph_data_path': str(get_final_path( 'GRAPH_DATA_PATH', '', USER_GRAPH_DATA_PATH, INTERNAL_DEFAULT_GRAPH_DATA_PATH, runtime_config )) } # 询问是否保存此次配置(如果用户本次有输入) if vector_input or metadata_input: save_choice = input(f"\n是否将此次的知识库配置保存,供下次程序启动时默认使用? (y/n): ").strip().lower() if save_choice == 'y': if save_runtime_config(runtime_config): print(f"✓ 配置已保存至: {RUNTIME_CONFIG_FILE}") else: print("⚠ 配置保存失败。") # --- 修改结束 --- print("✓ 知识库配置成功") print("\n请配置静态分析参数:") # 2. 获取要分析的项目路径 - 先显示默认值,再提示输入 # 使用get_final_path计算当前默认项目路径 default_project_root = get_final_path( 'PROJECT_ROOT', '', USER_PROJECT_ROOT, INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config ) project_prompt = f"请输入要分析的C/C++项目根目录路径\n(直接按Enter使用当前默认值: {default_project_root}): " project_input = input(project_prompt).strip() # ...(后续代码保持不变)... # 使用用户输入重新计算最终路径 project_path = get_final_path( 'PROJECT_ROOT', project_input, USER_PROJECT_ROOT, INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config ) # 如果用户没有输入,使用默认值 if not project_input: print(f"使用默认项目路径: {project_path}") # 路径验证 if not project_path.exists(): print(f"⚠ 路径不存在: {project_path}") retry = input("是否重新输入? (y/n): ").strip().lower() if retry != 'y': print("静态分析取消。") return # 重新获取输入 while True: new_input = input("请输入正确的项目根目录路径: ").strip() if not new_input: print("路径不能为空") continue new_path = get_final_path( 'PROJECT_ROOT', new_input, USER_PROJECT_ROOT, INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config ) if not new_path.exists(): print(f"路径不存在: {new_path}") continue if not new_path.is_dir(): print(f"路径不是目录: {new_path}") continue project_path = new_path project_input = new_input break if not project_path.is_dir(): print(f"⚠ 路径不是目录: {project_path}") retry = input("是否重新输入? (y/n): ").strip().lower() if retry != 'y': print("静态分析取消。") return # 重新获取输入 while True: new_input = input("请输入正确的项目根目录路径: ").strip() if not new_input: print("路径不能为空") continue new_path = get_final_path( 'PROJECT_ROOT', new_input, USER_PROJECT_ROOT, INTERNAL_DEFAULT_PROJECT_ROOT, runtime_config ) if not new_path.is_dir(): print(f"路径不是目录: {new_path}") continue project_path = new_path project_input = new_input break # 3. 获取规则文件路径 - 先显示默认值,再提示输入 default_rules = "审查规则.xlsx" # 首先检查默认文件是否存在 if not os.path.exists(default_rules): print(f"⚠ 注意: 默认规则文件 '{default_rules}' 不存在") rules_prompt = f"请输入审查规则Excel文件路径\n(直接按Enter使用默认值: {default_rules}): " rules_input = input(rules_prompt).strip() # 用户不输入时使用默认值 if not rules_input: rules_excel = default_rules print(f"使用默认规则文件: {rules_excel}") else: rules_excel = rules_input # 路径验证 rules_path = Path(rules_excel) if not rules_path.exists(): print(f"⚠ 规则文件不存在: {rules_path}") # 如果用户输入了但文件不存在,询问是否重新输入 if rules_excel != default_rules: # 用户输入了自定义路径 retry = input("文件不存在,是否重新输入路径? (y/n): ").strip().lower() if retry == 'y': # 重新获取输入 while True: new_input = input("请输入正确的规则文件路径: ").strip() if not new_input: print("路径不能为空") continue new_path = Path(new_input) if not new_path.exists(): print(f"文件不存在: {new_path}") continue rules_excel = new_input rules_path = new_path break else: print("静态分析取消。") return else: # 用户使用的是默认路径 print(f"默认规则文件 {default_rules} 不存在。") manual_input = input("是否手动输入文件路径? (y/n): ").strip().lower() if manual_input != 'y': print("静态分析取消。") return while True: new_input = input("请输入正确的规则文件路径: ").strip() if not new_input: print("路径不能为空") continue new_path = Path(new_input) if not new_path.exists(): print(f"文件不存在: {new_path}") continue rules_excel = new_input rules_path = new_path break # 4. 获取输出报告名称 - 先显示默认值,再提示输入 default_output = "audit_report" output_prompt = f"请输入输出报告文件的基础名称 (不包含扩展名)\n(直接按Enter使用默认值: {default_output}): " output_input = input(output_prompt).strip() # 用户不输入时使用默认值 if not output_input: output_base = default_output print(f"使用默认输出报告名称: {output_base}") else: output_base = output_input print("\n" + "=" * 60) print("开始静态分析...") print("=" * 60) print(f"项目路径: {project_path}") print(f"规则文件: {rules_path}") print(f"输出报告: {output_base}") print("=" * 60) try: success = run_static_analysis( project_path=str(project_path), rules_excel=str(rules_path), output_report=output_base ) if success: print("\n" + "=" * 60) print("✓ 静态分析执行成功!") print("=" + "=" * 60) # 保存更新后的运行时配置(如果用户有输入项目路径) if project_input: runtime_config['PROJECT_ROOT'] = str(project_path) save_runtime_config(runtime_config) print(f"✓ 项目路径已保存到运行时配置") # 提供查看结果的选项 result_files = [ f"{output_base}.html", f"{output_base}.pdf", f"{output_base}.json" ] existing_files = [] for file in result_files: if os.path.exists(file): existing_files.append(file) if existing_files: print(f"\n生成的报告文件:") for file in existing_files: print(f" - {file}") view_option = input(f"\n是否打开HTML报告文件? (y/n): ").strip().lower() if view_option == 'y' and os.path.exists(f"{output_base}.html"): try: import subprocess import platform html_file = f"{output_base}.html" if platform.system() == "Windows": os.startfile(html_file) elif platform.system() == "Darwin": subprocess.run(["open", html_file]) else: subprocess.run(["xdg-open", html_file]) print(f"已使用默认程序打开: {html_file}") except Exception as e: print(f"打开报告文件失败: {e}") else: print("\n" + "=" * 60) print("✗ 静态分析执行过程中遇到问题。") print("=" + "=" * 60) except Exception as e: print(f"\n静态分析执行失败: {e}") import traceback traceback.print_exc() def main_menu(): """主菜单界面""" print("\n" + "=" * 60) print("卫星星务软件代码RAG知识库系统") print("=" * 60) print("版本: 3.0") print("功能: 支持代码知识库构建、功能检索、问题单过滤、静态分析") print("=" * 60) # 加载运行时配置 runtime_config = load_runtime_config() if runtime_config: print(f"已加载运行时配置 ({len(runtime_config)} 项)") # 检查API Key if not check_api_key(): return while True: print("\n" + "=" * 60) print("主菜单") print("=" * 60) print("1. 创建项目知识库") print("2. 功能需求检索") print("3. 问题单过滤") print("4. 静态分析") print("5. 管理配置文件") print("6. 退出系统") print("=" * 60) choice = input("\n请选择功能 (1-6): ").strip() if choice == "1": print("\n" + "=" * 60) print("创建项目知识库") print("=" * 60) print("此功能将分析代码项目,构建知识图谱和向量数据库") confirm = input("\n确认开始知识库构建? (y/n): ").strip().lower() if confirm == 'y': try: result = build_rag_database_interactive() if result: print("\n知识库构建完成!") except Exception as e: print(f"知识库构建失败: {e}") logger.error(f"知识库构建失败: {e}", exc_info=True) elif choice == "2": print("\n进入功能需求检索模式...") kb_exists, _, _, _, _ = check_knowledge_base_exists(runtime_config) if not kb_exists: print("\n⚠ 尚未建设项目知识库!") response = input("是否先创建或配置知识库? (y/n): ").strip().lower() if response == 'y': if not manual_configure_knowledge_base(runtime_config): continue else: continue feature_retrieval_mode(runtime_config) elif choice == "3": print("\n进入问题单过滤模式...") kb_exists, _, _, _, _ = check_knowledge_base_exists(runtime_config) if not kb_exists: print("\n⚠ 尚未建设项目知识库!") response = input("是否配置已有知识库文件? (y/n): ").strip().lower() if response == 'y': if not manual_configure_knowledge_base(runtime_config): continue else: print("请先使用选项1创建知识库") continue issue_filter_mode(runtime_config) elif choice == "4": print("\n进入静态分析模式...") static_analysis_mode(runtime_config) elif choice == "5": print("\n" + "=" * 60) print("配置文件管理") print("=" * 60) print("1. 查看当前运行时配置") print("2. 清除运行时配置") print("3. 返回主菜单") config_choice = input("\n请选择操作 (1-3): ").strip() if config_choice == "1": print(f"\n当前运行时配置 ({RUNTIME_CONFIG_FILE}):") if runtime_config: for key, value in runtime_config.items(): print(f" {key}: {value}") else: print(" 无运行时配置") print(f"\n配置文件 (config.py) 中的主路径变量:") print(f" PROJECT_ROOT: {PROJECT_ROOT}") print(f" VECTOR_DB_PATH: {VECTOR_DB_PATH}") print(f" METADATA_PATH: {METADATA_PATH}") print(f" GRAPH_DATA_PATH: {GRAPH_DATA_PATH}") print(f" KNOWLEDGE_GRAPH_PATH: {KNOWLEDGE_GRAPH_PATH}") print(f"\n配置文件 (config.py) 中的默认后备路径 (Default_*):") print(f" Default_PROJECT_ROOT: {Default_PROJECT_ROOT}") print(f" Default_VECTOR_DB_PATH: {Default_VECTOR_DB_PATH}") print(f" Default_METADATA_PATH: {Default_METADATA_PATH}") input("\n按Enter键继续...") elif config_choice == "2": if os.path.exists(RUNTIME_CONFIG_FILE): confirm = input(f"确定要删除运行时配置文件 {RUNTIME_CONFIG_FILE} 吗? (y/n): ").strip().lower() if confirm == 'y': os.remove(RUNTIME_CONFIG_FILE) runtime_config.clear() print("运行时配置文件已删除。") else: print("运行时配置文件不存在。") elif config_choice == "3": continue else: print("无效选择") elif choice == "6": print("\n感谢使用,再见!") # 退出前保存配置 if runtime_config: save_runtime_config(runtime_config) break else: print("无效的选择,请重新输入") if __name__ == "__main__": try: main_menu() except KeyboardInterrupt: print("\n\n程序被用户中断。") except Exception as e: print(f"\n程序执行出错: {e}") import traceback traceback.print_exc() logger.error(f"主程序异常: {e}", exc_info=True)