Files
linux_format_docs_check/app/analyzer.py
2026-05-19 13:22:25 +08:00

232 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import re
from collections import Counter
from typing import Protocol
import requests
from app.config import ProviderConfig
from app.docx_parser import ParsedDocument
from app.report_generator import AnalysisReport
from app.skill_loader import Skill
class SupportsPost(Protocol):
def post(self, url: str, **kwargs): ...
IMPORTANT_TERMS = {
"需求",
"接口",
"测试",
"合格性",
"追踪",
"追溯",
"配置",
"质量",
"部署",
"安装",
"验收",
"设计",
"资源",
"风险",
"计划",
"说明",
"文档",
"CSCI",
"GJB",
}
def _tokens(text: str) -> list[str]:
ascii_tokens = re.findall(r"[A-Za-z][A-Za-z0-9_-]{2,}", text)
chinese_terms = [term for term in IMPORTANT_TERMS if term in text]
return [token.lower() for token in ascii_tokens] + chinese_terms
def select_relevant_skills(parsed: ParsedDocument, skills: list[Skill], max_skills: int = 6) -> list[Skill]:
document_tokens = Counter(_tokens(parsed.text + "\n" + "\n".join(h.text for h in parsed.headings)))
scored: list[tuple[int, Skill]] = []
for skill in skills:
skill_text = f"{skill.slug}\n{skill.name}\n{skill.description}\n{skill.use_when}\n{skill.content[:3000]}"
score = 0
skill_tokens = set(_tokens(skill_text))
for token, count in document_tokens.items():
if token in skill_tokens:
score += count
if parsed.filename.lower().endswith(".docx"):
score += 1
if score > 0:
scored.append((score, skill))
scored.sort(key=lambda item: (-item[0], item[1].slug))
if not scored:
return skills[:max_skills]
return [skill for _, skill in scored[:max_skills]]
def normalize_selected_skill_slugs(selected_slugs: list[str] | None, skills: list[Skill]) -> list[Skill]:
if not selected_slugs:
return skills
available = {skill.slug: skill for skill in skills}
picked = [available[slug] for slug in selected_slugs if slug in available]
return picked or skills
def build_analysis_prompt(parsed: ParsedDocument, skills: list[Skill]) -> str:
skill_sections = []
for skill in skills:
skill_sections.append(
f"## {skill.slug}\n名称: {skill.name}\n描述: {skill.description}\n适用条件: {skill.use_when}\n规范内容:\n{skill.content[:6000]}"
)
document_outline = "\n".join(f"- H{heading.level} {heading.text}" for heading in parsed.headings[:80]) or "未识别到标题。"
document_text = parsed.text[:18000]
return f"""你是军用软件文档符合性审查助手。请依据给定 GJB438C/GJB2786 技能规范,分析上传 DOCX 是否符合规范。
请输出中文 Markdown必须包含以下小节
1. 总体结论
2. 符合项
3. 不符合项
4. 缺失章节或缺失证据
5. 整改建议
6. 需人工复核事项
要求:
- 每个问题尽量引用文档中的标题、关键词或证据摘要。
- 不要编造未在文档中出现的证据。
- 如果无法判断,标记为“需人工复核”。
# 文件
{parsed.filename}
# 文档目录
{document_outline}
# 待检查技能
{chr(10).join(skill_sections)}
# 文档正文摘录
{document_text}
"""
class LLMClient:
def __init__(self, provider: ProviderConfig, session: SupportsPost | None = None, timeout: int = 120) -> None:
self.provider = provider
self.session = session or requests.Session()
self.timeout = timeout
def complete(self, prompt: str) -> str:
headers = {"Content-Type": "application/json"}
if self.provider.api_key and self.provider.api_key != "EMPTY":
headers["Authorization"] = f"Bearer {self.provider.api_key}"
payload = {
"model": self.provider.model,
"messages": [{"role": "user", "content": prompt}],
"temperature": self.provider.temperature,
"max_tokens": self.provider.max_tokens,
}
response = self.session.post(
self.provider.chat_completions_url,
headers=headers,
json=payload,
timeout=self.timeout,
)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
def heuristic_analysis(parsed: ParsedDocument, skills: list[Skill]) -> AnalysisReport:
headings_text = "\n".join(h.text for h in parsed.headings)
full_text = parsed.text
findings: list[dict[str, str]] = []
recommendations: list[str] = []
required_terms = ["范围", "引用文档", "需求", "合格性", "追踪", "接口"]
for term in required_terms:
present = term in full_text or term in headings_text
findings.append(
{
"status": "符合" if present else "需整改",
"item": f"检查关键内容:{term}",
"evidence": "文档中已发现相关表述" if present else "未在解析文本中发现明确表述",
}
)
if not present:
recommendations.append(f"补充或明确“{term}”相关章节与证据。")
if not parsed.headings:
findings.append({"status": "需整改", "item": "章节结构", "evidence": "未识别到 Word 标题样式"})
recommendations.append("使用 Word 标题样式组织章节,便于目录和符合性审查。")
issue_count = sum(1 for item in findings if item["status"] != "符合")
summary = "通过" if issue_count == 0 else "部分通过,需人工复核"
raw_output = "未调用模型,已使用本地启发式规则生成初步分析。"
return AnalysisReport(
source_filename=parsed.filename,
provider_name="local",
model_name="heuristic",
matched_skills=[skill.slug for skill in skills],
summary=summary,
findings=findings,
recommendations=recommendations or ["保持现有章节结构,并由人工进行最终符合性确认。"],
raw_model_output=raw_output,
)
def report_from_model_output(
parsed: ParsedDocument,
skills: list[Skill],
provider_name: str,
model_name: str,
output: str,
) -> AnalysisReport:
findings = [{"status": "模型分析", "item": "完整分析结果", "evidence": output[:1200]}]
recommendations = _extract_recommendations(output)
return AnalysisReport(
source_filename=parsed.filename,
provider_name=provider_name,
model_name=model_name,
matched_skills=[skill.slug for skill in skills],
summary=_extract_summary(output),
findings=findings,
recommendations=recommendations,
raw_model_output=output,
)
def _extract_summary(output: str) -> str:
for line in output.splitlines():
normalized = line.strip(" #:")
if "总体结论" in normalized and len(normalized) > 4:
return normalized
return "模型已生成分析结果,需人工复核"
def _extract_recommendations(output: str) -> list[str]:
recommendations: list[str] = []
in_section = False
for line in output.splitlines():
stripped = line.strip()
if "整改建议" in stripped or "修改建议" in stripped:
in_section = True
continue
if in_section and stripped.startswith("#"):
break
if in_section and stripped.lstrip("-0123456789.、 "):
recommendations.append(stripped.lstrip("-0123456789.、 "))
return recommendations[:10] or ["按模型分析结果逐项整改,并进行人工复核。"]
def serialize_prompt_debug(prompt: str) -> str:
return json.dumps({"prompt_preview": prompt[:2000]}, ensure_ascii=False, indent=2)