223 lines
7.3 KiB
Python
223 lines
7.3 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import re
|
||
from collections import Counter
|
||
from typing import Protocol
|
||
|
||
import requests
|
||
|
||
from app.config import ProviderConfig
|
||
from app.docx_parser import ParsedDocument
|
||
from app.report_generator import AnalysisReport
|
||
from app.skill_loader import Skill
|
||
|
||
|
||
class SupportsPost(Protocol):
|
||
def post(self, url: str, **kwargs): ...
|
||
|
||
|
||
IMPORTANT_TERMS = {
|
||
"需求",
|
||
"接口",
|
||
"测试",
|
||
"合格性",
|
||
"追踪",
|
||
"追溯",
|
||
"配置",
|
||
"质量",
|
||
"部署",
|
||
"安装",
|
||
"验收",
|
||
"设计",
|
||
"资源",
|
||
"风险",
|
||
"计划",
|
||
"说明",
|
||
"文档",
|
||
"CSCI",
|
||
"GJB",
|
||
}
|
||
|
||
|
||
def _tokens(text: str) -> list[str]:
|
||
ascii_tokens = re.findall(r"[A-Za-z][A-Za-z0-9_-]{2,}", text)
|
||
chinese_terms = [term for term in IMPORTANT_TERMS if term in text]
|
||
return [token.lower() for token in ascii_tokens] + chinese_terms
|
||
|
||
|
||
def select_relevant_skills(parsed: ParsedDocument, skills: list[Skill], max_skills: int = 6) -> list[Skill]:
|
||
document_tokens = Counter(_tokens(parsed.text + "\n" + "\n".join(h.text for h in parsed.headings)))
|
||
scored: list[tuple[int, Skill]] = []
|
||
|
||
for skill in skills:
|
||
skill_text = f"{skill.slug}\n{skill.name}\n{skill.description}\n{skill.use_when}\n{skill.content[:3000]}"
|
||
score = 0
|
||
skill_tokens = set(_tokens(skill_text))
|
||
for token, count in document_tokens.items():
|
||
if token in skill_tokens:
|
||
score += count
|
||
if parsed.filename.lower().endswith(".docx"):
|
||
score += 1
|
||
if score > 0:
|
||
scored.append((score, skill))
|
||
|
||
scored.sort(key=lambda item: (-item[0], item[1].slug))
|
||
if not scored:
|
||
return skills[:max_skills]
|
||
return [skill for _, skill in scored[:max_skills]]
|
||
|
||
|
||
def build_analysis_prompt(parsed: ParsedDocument, skills: list[Skill]) -> str:
|
||
skill_sections = []
|
||
for skill in skills:
|
||
skill_sections.append(
|
||
f"## {skill.slug}\n名称: {skill.name}\n描述: {skill.description}\n适用条件: {skill.use_when}\n规范内容:\n{skill.content[:6000]}"
|
||
)
|
||
|
||
document_outline = "\n".join(f"- H{heading.level} {heading.text}" for heading in parsed.headings[:80]) or "未识别到标题。"
|
||
document_text = parsed.text[:18000]
|
||
|
||
return f"""你是军用软件文档符合性审查助手。请依据给定 GJB438C/GJB2786 技能规范,分析上传 DOCX 是否符合规范。
|
||
|
||
请输出中文 Markdown,必须包含以下小节:
|
||
1. 总体结论
|
||
2. 符合项
|
||
3. 不符合项
|
||
4. 缺失章节或缺失证据
|
||
5. 整改建议
|
||
6. 需人工复核事项
|
||
|
||
要求:
|
||
- 每个问题尽量引用文档中的标题、关键词或证据摘要。
|
||
- 不要编造未在文档中出现的证据。
|
||
- 如果无法判断,标记为“需人工复核”。
|
||
|
||
# 文件
|
||
{parsed.filename}
|
||
|
||
# 文档目录
|
||
{document_outline}
|
||
|
||
# 待检查技能
|
||
{chr(10).join(skill_sections)}
|
||
|
||
# 文档正文摘录
|
||
{document_text}
|
||
"""
|
||
|
||
|
||
class LLMClient:
|
||
def __init__(self, provider: ProviderConfig, session: SupportsPost | None = None, timeout: int = 120) -> None:
|
||
self.provider = provider
|
||
self.session = session or requests.Session()
|
||
self.timeout = timeout
|
||
|
||
def complete(self, prompt: str) -> str:
|
||
headers = {"Content-Type": "application/json"}
|
||
if self.provider.api_key and self.provider.api_key != "EMPTY":
|
||
headers["Authorization"] = f"Bearer {self.provider.api_key}"
|
||
|
||
payload = {
|
||
"model": self.provider.model,
|
||
"messages": [{"role": "user", "content": prompt}],
|
||
"temperature": self.provider.temperature,
|
||
"max_tokens": self.provider.max_tokens,
|
||
}
|
||
response = self.session.post(
|
||
self.provider.chat_completions_url,
|
||
headers=headers,
|
||
json=payload,
|
||
timeout=self.timeout,
|
||
)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
return data["choices"][0]["message"]["content"]
|
||
|
||
|
||
def heuristic_analysis(parsed: ParsedDocument, skills: list[Skill]) -> AnalysisReport:
|
||
headings_text = "\n".join(h.text for h in parsed.headings)
|
||
full_text = parsed.text
|
||
findings: list[dict[str, str]] = []
|
||
recommendations: list[str] = []
|
||
|
||
required_terms = ["范围", "引用文档", "需求", "合格性", "追踪", "接口"]
|
||
for term in required_terms:
|
||
present = term in full_text or term in headings_text
|
||
findings.append(
|
||
{
|
||
"status": "符合" if present else "需整改",
|
||
"item": f"检查关键内容:{term}",
|
||
"evidence": "文档中已发现相关表述" if present else "未在解析文本中发现明确表述",
|
||
}
|
||
)
|
||
if not present:
|
||
recommendations.append(f"补充或明确“{term}”相关章节与证据。")
|
||
|
||
if not parsed.headings:
|
||
findings.append({"status": "需整改", "item": "章节结构", "evidence": "未识别到 Word 标题样式"})
|
||
recommendations.append("使用 Word 标题样式组织章节,便于目录和符合性审查。")
|
||
|
||
issue_count = sum(1 for item in findings if item["status"] != "符合")
|
||
summary = "通过" if issue_count == 0 else "部分通过,需人工复核"
|
||
raw_output = "未调用模型,已使用本地启发式规则生成初步分析。"
|
||
|
||
return AnalysisReport(
|
||
source_filename=parsed.filename,
|
||
provider_name="local",
|
||
model_name="heuristic",
|
||
matched_skills=[skill.slug for skill in skills],
|
||
summary=summary,
|
||
findings=findings,
|
||
recommendations=recommendations or ["保持现有章节结构,并由人工进行最终符合性确认。"],
|
||
raw_model_output=raw_output,
|
||
)
|
||
|
||
|
||
def report_from_model_output(
|
||
parsed: ParsedDocument,
|
||
skills: list[Skill],
|
||
provider_name: str,
|
||
model_name: str,
|
||
output: str,
|
||
) -> AnalysisReport:
|
||
findings = [{"status": "模型分析", "item": "完整分析结果", "evidence": output[:1200]}]
|
||
recommendations = _extract_recommendations(output)
|
||
return AnalysisReport(
|
||
source_filename=parsed.filename,
|
||
provider_name=provider_name,
|
||
model_name=model_name,
|
||
matched_skills=[skill.slug for skill in skills],
|
||
summary=_extract_summary(output),
|
||
findings=findings,
|
||
recommendations=recommendations,
|
||
raw_model_output=output,
|
||
)
|
||
|
||
|
||
def _extract_summary(output: str) -> str:
|
||
for line in output.splitlines():
|
||
normalized = line.strip(" #::")
|
||
if "总体结论" in normalized and len(normalized) > 4:
|
||
return normalized
|
||
return "模型已生成分析结果,需人工复核"
|
||
|
||
|
||
def _extract_recommendations(output: str) -> list[str]:
|
||
recommendations: list[str] = []
|
||
in_section = False
|
||
for line in output.splitlines():
|
||
stripped = line.strip()
|
||
if "整改建议" in stripped or "修改建议" in stripped:
|
||
in_section = True
|
||
continue
|
||
if in_section and stripped.startswith("#"):
|
||
break
|
||
if in_section and stripped.lstrip("-0123456789.、 "):
|
||
recommendations.append(stripped.lstrip("-0123456789.、 "))
|
||
return recommendations[:10] or ["按模型分析结果逐项整改,并进行人工复核。"]
|
||
|
||
|
||
def serialize_prompt_debug(prompt: str) -> str:
|
||
return json.dumps({"prompt_preview": prompt[:2000]}, ensure_ascii=False, indent=2)
|