from __future__ import annotations import json import re from collections import Counter from typing import Protocol import requests from app.config import ProviderConfig from app.docx_parser import ParsedDocument from app.report_generator import AnalysisReport from app.skill_loader import Skill class SupportsPost(Protocol): def post(self, url: str, **kwargs): ... IMPORTANT_TERMS = { "需求", "接口", "测试", "合格性", "追踪", "追溯", "配置", "质量", "部署", "安装", "验收", "设计", "资源", "风险", "计划", "说明", "文档", "CSCI", "GJB", } def _tokens(text: str) -> list[str]: ascii_tokens = re.findall(r"[A-Za-z][A-Za-z0-9_-]{2,}", text) chinese_terms = [term for term in IMPORTANT_TERMS if term in text] return [token.lower() for token in ascii_tokens] + chinese_terms def select_relevant_skills(parsed: ParsedDocument, skills: list[Skill], max_skills: int = 6) -> list[Skill]: document_tokens = Counter(_tokens(parsed.text + "\n" + "\n".join(h.text for h in parsed.headings))) scored: list[tuple[int, Skill]] = [] for skill in skills: skill_text = f"{skill.slug}\n{skill.name}\n{skill.description}\n{skill.use_when}\n{skill.content[:3000]}" score = 0 skill_tokens = set(_tokens(skill_text)) for token, count in document_tokens.items(): if token in skill_tokens: score += count if parsed.filename.lower().endswith(".docx"): score += 1 if score > 0: scored.append((score, skill)) scored.sort(key=lambda item: (-item[0], item[1].slug)) if not scored: return skills[:max_skills] return [skill for _, skill in scored[:max_skills]] def build_analysis_prompt(parsed: ParsedDocument, skills: list[Skill]) -> str: skill_sections = [] for skill in skills: skill_sections.append( f"## {skill.slug}\n名称: {skill.name}\n描述: {skill.description}\n适用条件: {skill.use_when}\n规范内容:\n{skill.content[:6000]}" ) document_outline = "\n".join(f"- H{heading.level} {heading.text}" for heading in parsed.headings[:80]) or "未识别到标题。" document_text = parsed.text[:18000] return f"""你是军用软件文档符合性审查助手。请依据给定 GJB438C/GJB2786 技能规范,分析上传 DOCX 是否符合规范。 请输出中文 Markdown,必须包含以下小节: 1. 总体结论 2. 符合项 3. 不符合项 4. 缺失章节或缺失证据 5. 整改建议 6. 需人工复核事项 要求: - 每个问题尽量引用文档中的标题、关键词或证据摘要。 - 不要编造未在文档中出现的证据。 - 如果无法判断,标记为“需人工复核”。 # 文件 {parsed.filename} # 文档目录 {document_outline} # 待检查技能 {chr(10).join(skill_sections)} # 文档正文摘录 {document_text} """ class LLMClient: def __init__(self, provider: ProviderConfig, session: SupportsPost | None = None, timeout: int = 120) -> None: self.provider = provider self.session = session or requests.Session() self.timeout = timeout def complete(self, prompt: str) -> str: headers = {"Content-Type": "application/json"} if self.provider.api_key and self.provider.api_key != "EMPTY": headers["Authorization"] = f"Bearer {self.provider.api_key}" payload = { "model": self.provider.model, "messages": [{"role": "user", "content": prompt}], "temperature": self.provider.temperature, "max_tokens": self.provider.max_tokens, } response = self.session.post( self.provider.chat_completions_url, headers=headers, json=payload, timeout=self.timeout, ) response.raise_for_status() data = response.json() return data["choices"][0]["message"]["content"] def heuristic_analysis(parsed: ParsedDocument, skills: list[Skill]) -> AnalysisReport: headings_text = "\n".join(h.text for h in parsed.headings) full_text = parsed.text findings: list[dict[str, str]] = [] recommendations: list[str] = [] required_terms = ["范围", "引用文档", "需求", "合格性", "追踪", "接口"] for term in required_terms: present = term in full_text or term in headings_text findings.append( { "status": "符合" if present else "需整改", "item": f"检查关键内容:{term}", "evidence": "文档中已发现相关表述" if present else "未在解析文本中发现明确表述", } ) if not present: recommendations.append(f"补充或明确“{term}”相关章节与证据。") if not parsed.headings: findings.append({"status": "需整改", "item": "章节结构", "evidence": "未识别到 Word 标题样式"}) recommendations.append("使用 Word 标题样式组织章节,便于目录和符合性审查。") issue_count = sum(1 for item in findings if item["status"] != "符合") summary = "通过" if issue_count == 0 else "部分通过,需人工复核" raw_output = "未调用模型,已使用本地启发式规则生成初步分析。" return AnalysisReport( source_filename=parsed.filename, provider_name="local", model_name="heuristic", matched_skills=[skill.slug for skill in skills], summary=summary, findings=findings, recommendations=recommendations or ["保持现有章节结构,并由人工进行最终符合性确认。"], raw_model_output=raw_output, ) def report_from_model_output( parsed: ParsedDocument, skills: list[Skill], provider_name: str, model_name: str, output: str, ) -> AnalysisReport: findings = [{"status": "模型分析", "item": "完整分析结果", "evidence": output[:1200]}] recommendations = _extract_recommendations(output) return AnalysisReport( source_filename=parsed.filename, provider_name=provider_name, model_name=model_name, matched_skills=[skill.slug for skill in skills], summary=_extract_summary(output), findings=findings, recommendations=recommendations, raw_model_output=output, ) def _extract_summary(output: str) -> str: for line in output.splitlines(): normalized = line.strip(" #::") if "总体结论" in normalized and len(normalized) > 4: return normalized return "模型已生成分析结果,需人工复核" def _extract_recommendations(output: str) -> list[str]: recommendations: list[str] = [] in_section = False for line in output.splitlines(): stripped = line.strip() if "整改建议" in stripped or "修改建议" in stripped: in_section = True continue if in_section and stripped.startswith("#"): break if in_section and stripped.lstrip("-0123456789.、 "): recommendations.append(stripped.lstrip("-0123456789.、 ")) return recommendations[:10] or ["按模型分析结果逐项整改,并进行人工复核。"] def serialize_prompt_debug(prompt: str) -> str: return json.dumps({"prompt_preview": prompt[:2000]}, ensure_ascii=False, indent=2)