2026-05-18 15:50:43 +08:00
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
|
import re
|
|
|
|
|
|
from collections import Counter
|
|
|
|
|
|
from typing import Protocol
|
|
|
|
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
|
|
|
|
|
|
|
from app.config import ProviderConfig
|
|
|
|
|
|
from app.docx_parser import ParsedDocument
|
|
|
|
|
|
from app.report_generator import AnalysisReport
|
|
|
|
|
|
from app.skill_loader import Skill
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SupportsPost(Protocol):
|
|
|
|
|
|
def post(self, url: str, **kwargs): ...
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
IMPORTANT_TERMS = {
|
|
|
|
|
|
"需求",
|
|
|
|
|
|
"接口",
|
|
|
|
|
|
"测试",
|
|
|
|
|
|
"合格性",
|
|
|
|
|
|
"追踪",
|
|
|
|
|
|
"追溯",
|
|
|
|
|
|
"配置",
|
|
|
|
|
|
"质量",
|
|
|
|
|
|
"部署",
|
|
|
|
|
|
"安装",
|
|
|
|
|
|
"验收",
|
|
|
|
|
|
"设计",
|
|
|
|
|
|
"资源",
|
|
|
|
|
|
"风险",
|
|
|
|
|
|
"计划",
|
|
|
|
|
|
"说明",
|
|
|
|
|
|
"文档",
|
|
|
|
|
|
"CSCI",
|
|
|
|
|
|
"GJB",
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _tokens(text: str) -> list[str]:
|
|
|
|
|
|
ascii_tokens = re.findall(r"[A-Za-z][A-Za-z0-9_-]{2,}", text)
|
|
|
|
|
|
chinese_terms = [term for term in IMPORTANT_TERMS if term in text]
|
|
|
|
|
|
return [token.lower() for token in ascii_tokens] + chinese_terms
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def select_relevant_skills(parsed: ParsedDocument, skills: list[Skill], max_skills: int = 6) -> list[Skill]:
|
|
|
|
|
|
document_tokens = Counter(_tokens(parsed.text + "\n" + "\n".join(h.text for h in parsed.headings)))
|
|
|
|
|
|
scored: list[tuple[int, Skill]] = []
|
|
|
|
|
|
|
|
|
|
|
|
for skill in skills:
|
|
|
|
|
|
skill_text = f"{skill.slug}\n{skill.name}\n{skill.description}\n{skill.use_when}\n{skill.content[:3000]}"
|
|
|
|
|
|
score = 0
|
|
|
|
|
|
skill_tokens = set(_tokens(skill_text))
|
|
|
|
|
|
for token, count in document_tokens.items():
|
|
|
|
|
|
if token in skill_tokens:
|
|
|
|
|
|
score += count
|
|
|
|
|
|
if parsed.filename.lower().endswith(".docx"):
|
|
|
|
|
|
score += 1
|
|
|
|
|
|
if score > 0:
|
|
|
|
|
|
scored.append((score, skill))
|
|
|
|
|
|
|
|
|
|
|
|
scored.sort(key=lambda item: (-item[0], item[1].slug))
|
|
|
|
|
|
if not scored:
|
|
|
|
|
|
return skills[:max_skills]
|
|
|
|
|
|
return [skill for _, skill in scored[:max_skills]]
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-05-19 13:22:25 +08:00
|
|
|
|
def normalize_selected_skill_slugs(selected_slugs: list[str] | None, skills: list[Skill]) -> list[Skill]:
|
|
|
|
|
|
if not selected_slugs:
|
|
|
|
|
|
return skills
|
|
|
|
|
|
|
|
|
|
|
|
available = {skill.slug: skill for skill in skills}
|
|
|
|
|
|
picked = [available[slug] for slug in selected_slugs if slug in available]
|
|
|
|
|
|
return picked or skills
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-05-18 15:50:43 +08:00
|
|
|
|
def build_analysis_prompt(parsed: ParsedDocument, skills: list[Skill]) -> str:
|
|
|
|
|
|
skill_sections = []
|
|
|
|
|
|
for skill in skills:
|
|
|
|
|
|
skill_sections.append(
|
|
|
|
|
|
f"## {skill.slug}\n名称: {skill.name}\n描述: {skill.description}\n适用条件: {skill.use_when}\n规范内容:\n{skill.content[:6000]}"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
document_outline = "\n".join(f"- H{heading.level} {heading.text}" for heading in parsed.headings[:80]) or "未识别到标题。"
|
|
|
|
|
|
document_text = parsed.text[:18000]
|
|
|
|
|
|
|
|
|
|
|
|
return f"""你是军用软件文档符合性审查助手。请依据给定 GJB438C/GJB2786 技能规范,分析上传 DOCX 是否符合规范。
|
|
|
|
|
|
|
|
|
|
|
|
请输出中文 Markdown,必须包含以下小节:
|
|
|
|
|
|
1. 总体结论
|
|
|
|
|
|
2. 符合项
|
|
|
|
|
|
3. 不符合项
|
|
|
|
|
|
4. 缺失章节或缺失证据
|
|
|
|
|
|
5. 整改建议
|
|
|
|
|
|
6. 需人工复核事项
|
|
|
|
|
|
|
|
|
|
|
|
要求:
|
|
|
|
|
|
- 每个问题尽量引用文档中的标题、关键词或证据摘要。
|
|
|
|
|
|
- 不要编造未在文档中出现的证据。
|
|
|
|
|
|
- 如果无法判断,标记为“需人工复核”。
|
|
|
|
|
|
|
|
|
|
|
|
# 文件
|
|
|
|
|
|
{parsed.filename}
|
|
|
|
|
|
|
|
|
|
|
|
# 文档目录
|
|
|
|
|
|
{document_outline}
|
|
|
|
|
|
|
|
|
|
|
|
# 待检查技能
|
|
|
|
|
|
{chr(10).join(skill_sections)}
|
|
|
|
|
|
|
|
|
|
|
|
# 文档正文摘录
|
|
|
|
|
|
{document_text}
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LLMClient:
|
|
|
|
|
|
def __init__(self, provider: ProviderConfig, session: SupportsPost | None = None, timeout: int = 120) -> None:
|
|
|
|
|
|
self.provider = provider
|
|
|
|
|
|
self.session = session or requests.Session()
|
|
|
|
|
|
self.timeout = timeout
|
|
|
|
|
|
|
|
|
|
|
|
def complete(self, prompt: str) -> str:
|
|
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
|
|
if self.provider.api_key and self.provider.api_key != "EMPTY":
|
|
|
|
|
|
headers["Authorization"] = f"Bearer {self.provider.api_key}"
|
|
|
|
|
|
|
|
|
|
|
|
payload = {
|
|
|
|
|
|
"model": self.provider.model,
|
|
|
|
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
|
|
|
|
"temperature": self.provider.temperature,
|
|
|
|
|
|
"max_tokens": self.provider.max_tokens,
|
|
|
|
|
|
}
|
|
|
|
|
|
response = self.session.post(
|
|
|
|
|
|
self.provider.chat_completions_url,
|
|
|
|
|
|
headers=headers,
|
|
|
|
|
|
json=payload,
|
|
|
|
|
|
timeout=self.timeout,
|
|
|
|
|
|
)
|
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
|
data = response.json()
|
|
|
|
|
|
return data["choices"][0]["message"]["content"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def heuristic_analysis(parsed: ParsedDocument, skills: list[Skill]) -> AnalysisReport:
|
|
|
|
|
|
headings_text = "\n".join(h.text for h in parsed.headings)
|
|
|
|
|
|
full_text = parsed.text
|
|
|
|
|
|
findings: list[dict[str, str]] = []
|
|
|
|
|
|
recommendations: list[str] = []
|
|
|
|
|
|
|
|
|
|
|
|
required_terms = ["范围", "引用文档", "需求", "合格性", "追踪", "接口"]
|
|
|
|
|
|
for term in required_terms:
|
|
|
|
|
|
present = term in full_text or term in headings_text
|
|
|
|
|
|
findings.append(
|
|
|
|
|
|
{
|
|
|
|
|
|
"status": "符合" if present else "需整改",
|
|
|
|
|
|
"item": f"检查关键内容:{term}",
|
|
|
|
|
|
"evidence": "文档中已发现相关表述" if present else "未在解析文本中发现明确表述",
|
|
|
|
|
|
}
|
|
|
|
|
|
)
|
|
|
|
|
|
if not present:
|
|
|
|
|
|
recommendations.append(f"补充或明确“{term}”相关章节与证据。")
|
|
|
|
|
|
|
|
|
|
|
|
if not parsed.headings:
|
|
|
|
|
|
findings.append({"status": "需整改", "item": "章节结构", "evidence": "未识别到 Word 标题样式"})
|
|
|
|
|
|
recommendations.append("使用 Word 标题样式组织章节,便于目录和符合性审查。")
|
|
|
|
|
|
|
|
|
|
|
|
issue_count = sum(1 for item in findings if item["status"] != "符合")
|
|
|
|
|
|
summary = "通过" if issue_count == 0 else "部分通过,需人工复核"
|
|
|
|
|
|
raw_output = "未调用模型,已使用本地启发式规则生成初步分析。"
|
|
|
|
|
|
|
|
|
|
|
|
return AnalysisReport(
|
|
|
|
|
|
source_filename=parsed.filename,
|
|
|
|
|
|
provider_name="local",
|
|
|
|
|
|
model_name="heuristic",
|
|
|
|
|
|
matched_skills=[skill.slug for skill in skills],
|
|
|
|
|
|
summary=summary,
|
|
|
|
|
|
findings=findings,
|
|
|
|
|
|
recommendations=recommendations or ["保持现有章节结构,并由人工进行最终符合性确认。"],
|
|
|
|
|
|
raw_model_output=raw_output,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def report_from_model_output(
|
|
|
|
|
|
parsed: ParsedDocument,
|
|
|
|
|
|
skills: list[Skill],
|
|
|
|
|
|
provider_name: str,
|
|
|
|
|
|
model_name: str,
|
|
|
|
|
|
output: str,
|
|
|
|
|
|
) -> AnalysisReport:
|
|
|
|
|
|
findings = [{"status": "模型分析", "item": "完整分析结果", "evidence": output[:1200]}]
|
|
|
|
|
|
recommendations = _extract_recommendations(output)
|
|
|
|
|
|
return AnalysisReport(
|
|
|
|
|
|
source_filename=parsed.filename,
|
|
|
|
|
|
provider_name=provider_name,
|
|
|
|
|
|
model_name=model_name,
|
|
|
|
|
|
matched_skills=[skill.slug for skill in skills],
|
|
|
|
|
|
summary=_extract_summary(output),
|
|
|
|
|
|
findings=findings,
|
|
|
|
|
|
recommendations=recommendations,
|
|
|
|
|
|
raw_model_output=output,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_summary(output: str) -> str:
|
|
|
|
|
|
for line in output.splitlines():
|
|
|
|
|
|
normalized = line.strip(" #::")
|
|
|
|
|
|
if "总体结论" in normalized and len(normalized) > 4:
|
|
|
|
|
|
return normalized
|
|
|
|
|
|
return "模型已生成分析结果,需人工复核"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_recommendations(output: str) -> list[str]:
|
|
|
|
|
|
recommendations: list[str] = []
|
|
|
|
|
|
in_section = False
|
|
|
|
|
|
for line in output.splitlines():
|
|
|
|
|
|
stripped = line.strip()
|
|
|
|
|
|
if "整改建议" in stripped or "修改建议" in stripped:
|
|
|
|
|
|
in_section = True
|
|
|
|
|
|
continue
|
|
|
|
|
|
if in_section and stripped.startswith("#"):
|
|
|
|
|
|
break
|
|
|
|
|
|
if in_section and stripped.lstrip("-0123456789.、 "):
|
|
|
|
|
|
recommendations.append(stripped.lstrip("-0123456789.、 "))
|
|
|
|
|
|
return recommendations[:10] or ["按模型分析结果逐项整改,并进行人工复核。"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def serialize_prompt_debug(prompt: str) -> str:
|
|
|
|
|
|
return json.dumps({"prompt_preview": prompt[:2000]}, ensure_ascii=False, indent=2)
|