finish app develop

This commit is contained in:
kuangji
2026-05-18 15:50:43 +08:00
parent 8f23a841f0
commit 17decab2fc
20 changed files with 2447 additions and 0 deletions

222
app/analyzer.py Normal file
View File

@@ -0,0 +1,222 @@
from __future__ import annotations
import json
import re
from collections import Counter
from typing import Protocol
import requests
from app.config import ProviderConfig
from app.docx_parser import ParsedDocument
from app.report_generator import AnalysisReport
from app.skill_loader import Skill
class SupportsPost(Protocol):
def post(self, url: str, **kwargs): ...
IMPORTANT_TERMS = {
"需求",
"接口",
"测试",
"合格性",
"追踪",
"追溯",
"配置",
"质量",
"部署",
"安装",
"验收",
"设计",
"资源",
"风险",
"计划",
"说明",
"文档",
"CSCI",
"GJB",
}
def _tokens(text: str) -> list[str]:
ascii_tokens = re.findall(r"[A-Za-z][A-Za-z0-9_-]{2,}", text)
chinese_terms = [term for term in IMPORTANT_TERMS if term in text]
return [token.lower() for token in ascii_tokens] + chinese_terms
def select_relevant_skills(parsed: ParsedDocument, skills: list[Skill], max_skills: int = 6) -> list[Skill]:
document_tokens = Counter(_tokens(parsed.text + "\n" + "\n".join(h.text for h in parsed.headings)))
scored: list[tuple[int, Skill]] = []
for skill in skills:
skill_text = f"{skill.slug}\n{skill.name}\n{skill.description}\n{skill.use_when}\n{skill.content[:3000]}"
score = 0
skill_tokens = set(_tokens(skill_text))
for token, count in document_tokens.items():
if token in skill_tokens:
score += count
if parsed.filename.lower().endswith(".docx"):
score += 1
if score > 0:
scored.append((score, skill))
scored.sort(key=lambda item: (-item[0], item[1].slug))
if not scored:
return skills[:max_skills]
return [skill for _, skill in scored[:max_skills]]
def build_analysis_prompt(parsed: ParsedDocument, skills: list[Skill]) -> str:
skill_sections = []
for skill in skills:
skill_sections.append(
f"## {skill.slug}\n名称: {skill.name}\n描述: {skill.description}\n适用条件: {skill.use_when}\n规范内容:\n{skill.content[:6000]}"
)
document_outline = "\n".join(f"- H{heading.level} {heading.text}" for heading in parsed.headings[:80]) or "未识别到标题。"
document_text = parsed.text[:18000]
return f"""你是军用软件文档符合性审查助手。请依据给定 GJB438C/GJB2786 技能规范,分析上传 DOCX 是否符合规范。
请输出中文 Markdown必须包含以下小节
1. 总体结论
2. 符合项
3. 不符合项
4. 缺失章节或缺失证据
5. 整改建议
6. 需人工复核事项
要求:
- 每个问题尽量引用文档中的标题、关键词或证据摘要。
- 不要编造未在文档中出现的证据。
- 如果无法判断,标记为“需人工复核”。
# 文件
{parsed.filename}
# 文档目录
{document_outline}
# 待检查技能
{chr(10).join(skill_sections)}
# 文档正文摘录
{document_text}
"""
class LLMClient:
def __init__(self, provider: ProviderConfig, session: SupportsPost | None = None, timeout: int = 120) -> None:
self.provider = provider
self.session = session or requests.Session()
self.timeout = timeout
def complete(self, prompt: str) -> str:
headers = {"Content-Type": "application/json"}
if self.provider.api_key and self.provider.api_key != "EMPTY":
headers["Authorization"] = f"Bearer {self.provider.api_key}"
payload = {
"model": self.provider.model,
"messages": [{"role": "user", "content": prompt}],
"temperature": self.provider.temperature,
"max_tokens": self.provider.max_tokens,
}
response = self.session.post(
self.provider.chat_completions_url,
headers=headers,
json=payload,
timeout=self.timeout,
)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
def heuristic_analysis(parsed: ParsedDocument, skills: list[Skill]) -> AnalysisReport:
headings_text = "\n".join(h.text for h in parsed.headings)
full_text = parsed.text
findings: list[dict[str, str]] = []
recommendations: list[str] = []
required_terms = ["范围", "引用文档", "需求", "合格性", "追踪", "接口"]
for term in required_terms:
present = term in full_text or term in headings_text
findings.append(
{
"status": "符合" if present else "需整改",
"item": f"检查关键内容:{term}",
"evidence": "文档中已发现相关表述" if present else "未在解析文本中发现明确表述",
}
)
if not present:
recommendations.append(f"补充或明确“{term}”相关章节与证据。")
if not parsed.headings:
findings.append({"status": "需整改", "item": "章节结构", "evidence": "未识别到 Word 标题样式"})
recommendations.append("使用 Word 标题样式组织章节,便于目录和符合性审查。")
issue_count = sum(1 for item in findings if item["status"] != "符合")
summary = "通过" if issue_count == 0 else "部分通过,需人工复核"
raw_output = "未调用模型,已使用本地启发式规则生成初步分析。"
return AnalysisReport(
source_filename=parsed.filename,
provider_name="local",
model_name="heuristic",
matched_skills=[skill.slug for skill in skills],
summary=summary,
findings=findings,
recommendations=recommendations or ["保持现有章节结构,并由人工进行最终符合性确认。"],
raw_model_output=raw_output,
)
def report_from_model_output(
parsed: ParsedDocument,
skills: list[Skill],
provider_name: str,
model_name: str,
output: str,
) -> AnalysisReport:
findings = [{"status": "模型分析", "item": "完整分析结果", "evidence": output[:1200]}]
recommendations = _extract_recommendations(output)
return AnalysisReport(
source_filename=parsed.filename,
provider_name=provider_name,
model_name=model_name,
matched_skills=[skill.slug for skill in skills],
summary=_extract_summary(output),
findings=findings,
recommendations=recommendations,
raw_model_output=output,
)
def _extract_summary(output: str) -> str:
for line in output.splitlines():
normalized = line.strip(" #:")
if "总体结论" in normalized and len(normalized) > 4:
return normalized
return "模型已生成分析结果,需人工复核"
def _extract_recommendations(output: str) -> list[str]:
recommendations: list[str] = []
in_section = False
for line in output.splitlines():
stripped = line.strip()
if "整改建议" in stripped or "修改建议" in stripped:
in_section = True
continue
if in_section and stripped.startswith("#"):
break
if in_section and stripped.lstrip("-0123456789.、 "):
recommendations.append(stripped.lstrip("-0123456789.、 "))
return recommendations[:10] or ["按模型分析结果逐项整改,并进行人工复核。"]
def serialize_prompt_debug(prompt: str) -> str:
return json.dumps({"prompt_preview": prompt[:2000]}, ensure_ascii=False, indent=2)