Files
TBgen_App/autoline/energy_allocator.py
2026-03-30 16:46:48 +08:00

788 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Description : Energy Allocation Layer (Layer 4)
- Adaptive Resource Scheduling
- Dynamic energy distribution based on function point importance
Author : CGA Enhancement Project
Time : 2026/03/11
"""
import logging
from typing import List, Dict, Optional, Any, Tuple
from dataclasses import dataclass, field
from enum import Enum
logger = logging.getLogger(__name__)
# ============================================================================
# 数据结构定义
# ============================================================================
class EnergyState(Enum):
"""能量状态枚举"""
ACTIVE = "active" # 活跃,有剩余能量
DEPLETED = "depleted" # 能量耗尽
COMPLETED = "completed" # 已完成覆盖
SUSPENDED = "suspended" # 暂停(连续失败过多)
@dataclass
class EnergyAllocation:
"""
能量分配记录
Attributes:
function_point: 功能点名称
importance: 重要性评分 (0.0 - 1.0)
allocated: 分配的总能量
consumed: 已消耗的能量
remaining: 剩余能量
consecutive_failures: 连续失败次数
state: 当前能量状态
"""
function_point: str
importance: float
allocated: float = 0.0
consumed: float = 0.0
remaining: float = 0.0
consecutive_failures: int = 0
state: EnergyState = EnergyState.ACTIVE
total_attempts: int = 0
successful_attempts: int = 0
@dataclass
class GenerationResult:
"""
生成结果记录
Attributes:
function_point: 目标功能点
success: 是否成功覆盖
coverage_delta: 覆盖率变化
energy_cost: 消耗的能量
code_generated: 生成的代码
quality_score: 代码质量分数
"""
function_point: str
success: bool
coverage_delta: float = 0.0
energy_cost: float = 1.0
code_generated: str = ""
quality_score: float = 0.0
# ============================================================================
# 能量初始化器
# ============================================================================
class EnergyInitializer:
"""
能量初始化器
根据总能量预算和功能点重要性评分,初始化各功能点的能量分配
"""
# 默认配置
DEFAULT_TOTAL_ENERGY = 10.0 # 默认总能量(对应最大迭代次数)
MIN_ENERGY_PER_FP = 1.0 # 每个功能点最小能量
ENERGY_BUFFER_RATIO = 0.1 # 能量缓冲比例(保留用于重分配)
def __init__(self,
total_energy: float = None,
min_energy: float = None,
buffer_ratio: float = None):
"""
Args:
total_energy: 总能量预算(默认为 max_iter
min_energy: 每个功能点最小能量
buffer_ratio: 能量缓冲比例
"""
self.total_energy = total_energy or self.DEFAULT_TOTAL_ENERGY
self.min_energy = min_energy or self.MIN_ENERGY_PER_FP
self.buffer_ratio = buffer_ratio or self.ENERGY_BUFFER_RATIO
def initialize(self,
function_points: List[Dict],
max_iterations: int = None) -> Dict[str, EnergyAllocation]:
"""
初始化能量分配
Args:
function_points: 功能点列表,每个元素包含 name, importance, covered 等
max_iterations: 最大迭代次数(用于设置总能量)
Returns:
功能点名称 -> 能量分配记录 的字典
"""
# 如果提供了最大迭代次数,使用它作为总能量
if max_iterations:
self.total_energy = float(max_iterations)
# 过滤出未覆盖的功能点
uncovered_fps = [fp for fp in function_points if not fp.get('covered', False)]
if not uncovered_fps:
logger.info("All function points are covered. No energy allocation needed.")
return {}
# 计算总重要性
total_importance = sum(fp.get('importance', 0.5) for fp in uncovered_fps)
# 预留缓冲能量
buffer_energy = self.total_energy * self.buffer_ratio
available_energy = self.total_energy - buffer_energy
# 按重要性比例分配能量
allocations = {}
for fp in uncovered_fps:
name = fp.get('name', 'unknown')
importance = fp.get('importance', 0.5)
# 按比例计算分配能量,但不少于最小值
if total_importance > 0:
proportional_energy = (importance / total_importance) * available_energy
else:
proportional_energy = available_energy / len(uncovered_fps)
allocated = max(self.min_energy, proportional_energy)
allocations[name] = EnergyAllocation(
function_point=name,
importance=importance,
allocated=allocated,
consumed=0.0,
remaining=allocated,
consecutive_failures=0,
state=EnergyState.ACTIVE,
total_attempts=0,
successful_attempts=0
)
# 记录分配情况
logger.info(f"Energy initialized: total={self.total_energy:.1f}, "
f"allocated={sum(a.allocated for a in allocations.values()):.1f}, "
f"buffer={buffer_energy:.1f}, "
f"targets={len(allocations)}")
return allocations
# ============================================================================
# 目标选择器
# ============================================================================
class TargetSelector:
"""
目标选择器
选择下一个需要生成测试的目标功能点
采用优先级策略:重要性 × (剩余能量/分配能量)
"""
# 连续失败阈值
MAX_CONSECUTIVE_FAILURES = 3
def __init__(self, allocations: Dict[str, EnergyAllocation]):
"""
Args:
allocations: 能量分配字典
"""
self.allocations = allocations
def select_next_target(self) -> Optional[EnergyAllocation]:
"""
选择下一个目标功能点
优先级计算importance × (remaining / allocated) × (1 / (1 + consecutive_failures))
Returns:
选中的能量分配记录,如果没有可用目标则返回 None
"""
# 筛选候选:未覆盖、有剩余能量、非暂停状态
candidates = [
alloc for alloc in self.allocations.values()
if alloc.state == EnergyState.ACTIVE
and alloc.remaining > 0
]
if not candidates:
logger.info("No active targets with remaining energy.")
return None
# 计算优先级并排序
def calculate_priority(alloc: EnergyAllocation) -> float:
# 重要性权重
importance_weight = alloc.importance
# 能量剩余比例
energy_ratio = alloc.remaining / alloc.allocated if alloc.allocated > 0 else 0
# 失败惩罚因子
failure_penalty = 1.0 / (1.0 + alloc.consecutive_failures * 0.5)
# 综合优先级
priority = importance_weight * energy_ratio * failure_penalty
return priority
candidates.sort(key=calculate_priority, reverse=True)
selected = candidates[0]
logger.debug(f"Selected target: {selected.function_point} "
f"(importance={selected.importance:.2f}, "
f"remaining={selected.remaining:.1f}, "
f"failures={selected.consecutive_failures})")
return selected
def get_candidates_count(self) -> int:
"""获取候选目标数量"""
return len([a for a in self.allocations.values()
if a.state == EnergyState.ACTIVE and a.remaining > 0])
def get_top_candidates(self, n: int = 3) -> List[EnergyAllocation]:
"""获取优先级最高的 N 个候选目标"""
candidates = [
alloc for alloc in self.allocations.values()
if alloc.state == EnergyState.ACTIVE and alloc.remaining > 0
]
def calculate_priority(alloc: EnergyAllocation) -> float:
importance_weight = alloc.importance
energy_ratio = alloc.remaining / alloc.allocated if alloc.allocated > 0 else 0
failure_penalty = 1.0 / (1.0 + alloc.consecutive_failures * 0.5)
return importance_weight * energy_ratio * failure_penalty
candidates.sort(key=calculate_priority, reverse=True)
return candidates[:n]
# ============================================================================
# 能量消耗跟踪器
# ============================================================================
class EnergyConsumptionTracker:
"""
能量消耗跟踪器
跟踪每次生成尝试的能量消耗,根据结果更新状态
"""
# 能量衰减因子(连续失败时)
ENERGY_DECAY_FACTOR = 0.7
def __init__(self, allocations: Dict[str, EnergyAllocation]):
"""
Args:
allocations: 能量分配字典
"""
self.allocations = allocations
self.history: List[GenerationResult] = []
def record_generation(self, result: GenerationResult) -> Dict[str, Any]:
"""
记录一次生成尝试
Args:
result: 生成结果
Returns:
更新后的状态信息
"""
self.history.append(result)
fp_name = result.function_point
if fp_name not in self.allocations:
logger.warning(f"Unknown function point: {fp_name}")
return {'status': 'unknown', 'message': 'Unknown function point'}
alloc = self.allocations[fp_name]
alloc.total_attempts += 1
# 消耗能量
energy_cost = result.energy_cost
alloc.consumed += energy_cost
alloc.remaining = max(0, alloc.remaining - energy_cost)
if result.success:
# 成功:重置失败计数,标记完成
alloc.consecutive_failures = 0
alloc.successful_attempts += 1
alloc.state = EnergyState.COMPLETED
logger.info(f"[SUCCESS] Target covered: {fp_name} (attempts={alloc.total_attempts}, "
f"energy_used={alloc.consumed:.1f})")
return {
'status': 'completed',
'function_point': fp_name,
'attempts': alloc.total_attempts,
'energy_used': alloc.consumed
}
else:
# 失败:增加失败计数
alloc.consecutive_failures += 1
# 检查是否需要降低能量或暂停
if alloc.consecutive_failures >= 3:
# 能量减半
old_remaining = alloc.remaining
alloc.remaining *= self.ENERGY_DECAY_FACTOR
logger.warning(f"Consecutive failures for {fp_name}: {alloc.consecutive_failures}. "
f"Energy reduced: {old_remaining:.1f} -> {alloc.remaining:.1f}")
# 如果剩余能量过低,暂停
if alloc.remaining < 0.5:
alloc.state = EnergyState.SUSPENDED
logger.warning(f"Target suspended due to low energy: {fp_name}")
return {
'status': 'suspended',
'function_point': fp_name,
'consecutive_failures': alloc.consecutive_failures,
'remaining_energy': alloc.remaining
}
# 检查能量是否耗尽
if alloc.remaining <= 0:
alloc.state = EnergyState.DEPLETED
logger.warning(f"Target depleted: {fp_name}")
return {
'status': 'depleted',
'function_point': fp_name,
'total_attempts': alloc.total_attempts
}
return {
'status': 'failed',
'function_point': fp_name,
'consecutive_failures': alloc.consecutive_failures,
'remaining_energy': alloc.remaining
}
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
total = len(self.history)
successful = sum(1 for r in self.history if r.success)
energy_by_fp = {}
for result in self.history:
fp = result.function_point
if fp not in energy_by_fp:
energy_by_fp[fp] = {'consumed': 0, 'attempts': 0, 'success': False}
energy_by_fp[fp]['consumed'] += result.energy_cost
energy_by_fp[fp]['attempts'] += 1
if result.success:
energy_by_fp[fp]['success'] = True
return {
'total_attempts': total,
'successful_attempts': successful,
'success_rate': successful / total if total > 0 else 0,
'energy_by_function_point': energy_by_fp
}
# ============================================================================
# 能量重分配器
# ============================================================================
class EnergyRedistributor:
"""
能量重分配器
当某个功能点被覆盖后,将其剩余能量重新分配给其他未覆盖功能点
"""
def __init__(self, allocations: Dict[str, EnergyAllocation]):
"""
Args:
allocations: 能量分配字典
"""
self.allocations = allocations
def redistribute(self, completed_fp: str) -> Dict[str, float]:
"""
重分配已完成功能点的剩余能量
Args:
completed_fp: 已完成的功能点名称
Returns:
重分配详情 {target_fp: gained_energy}
"""
if completed_fp not in self.allocations:
return {}
completed_alloc = self.allocations[completed_fp]
# 回收剩余能量
recovered_energy = completed_alloc.remaining
if recovered_energy <= 0:
logger.debug(f"No remaining energy to recover from {completed_fp}")
return {}
# 找出活跃的未完成目标
active_targets = [
alloc for alloc in self.allocations.values()
if alloc.state == EnergyState.ACTIVE and alloc.function_point != completed_fp
]
if not active_targets:
logger.info(f"No active targets to redistribute energy to.")
return {}
# 按重要性比例分配
total_importance = sum(a.importance for a in active_targets)
redistribution = {}
for alloc in active_targets:
if total_importance > 0:
gain = (alloc.importance / total_importance) * recovered_energy
else:
gain = recovered_energy / len(active_targets)
alloc.allocated += gain
alloc.remaining += gain
redistribution[alloc.function_point] = gain
# 清零已完成目标的剩余能量
completed_alloc.remaining = 0
logger.info(f"Redistributed {recovered_energy:.1f} energy from {completed_fp} "
f"to {len(redistribution)} targets")
return redistribution
def redistribute_all(self) -> Dict[str, Dict[str, float]]:
"""
重分配所有已完成/暂停目标的剩余能量
Returns:
完整的重分配详情
"""
all_redistributions = {}
# 收集所有可回收能量
completed_fps = [
name for name, alloc in self.allocations.items()
if alloc.state in [EnergyState.COMPLETED, EnergyState.SUSPENDED]
and alloc.remaining > 0
]
for fp in completed_fps:
redistribution = self.redistribute(fp)
if redistribution:
all_redistributions[fp] = redistribution
return all_redistributions
def revive_suspended(self, min_energy: float = 1.0) -> List[str]:
"""
尝试复活暂停的目标(如果有足够的回收能量)
Args:
min_energy: 复活所需的最小能量
Returns:
复活的目标列表
"""
revived = []
# 计算可用能量(来自已完成目标)
available_energy = sum(
alloc.remaining for alloc in self.allocations.values()
if alloc.state == EnergyState.COMPLETED and alloc.remaining > 0
)
# 找出暂停的目标
suspended = [
alloc for alloc in self.allocations.values()
if alloc.state == EnergyState.SUSPENDED
]
for alloc in suspended:
if available_energy >= min_energy:
# 复活
alloc.state = EnergyState.ACTIVE
alloc.remaining = min_energy
alloc.allocated += min_energy
alloc.consecutive_failures = 0
available_energy -= min_energy
revived.append(alloc.function_point)
logger.info(f"Revived suspended target: {alloc.function_point}")
return revived
# ============================================================================
# 能量分配器(主入口)
# ============================================================================
class EnergyAllocator:
"""
能量分配器 - 第4层主入口
整合所有子模块,提供统一的能量管理接口
"""
def __init__(self,
max_iterations: int = 5,
total_energy: float = None):
"""
Args:
max_iterations: 最大迭代次数
total_energy: 总能量预算(默认使用 max_iterations
"""
self.max_iterations = max_iterations
self.total_energy = total_energy or float(max_iterations)
# 子模块
self.initializer = EnergyInitializer(total_energy=self.total_energy)
self.allocations: Dict[str, EnergyAllocation] = {}
self.selector: Optional[TargetSelector] = None
self.tracker: Optional[EnergyConsumptionTracker] = None
self.redistributor: Optional[EnergyRedistributor] = None
# 状态
self.initialized = False
self.current_target: Optional[EnergyAllocation] = None
def initialize(self, function_points: List[Dict]) -> Dict[str, Any]:
"""
初始化能量分配
Args:
function_points: 功能点列表
Returns:
初始化结果摘要
"""
self.allocations = self.initializer.initialize(
function_points,
max_iterations=self.max_iterations
)
self.selector = TargetSelector(self.allocations)
self.tracker = EnergyConsumptionTracker(self.allocations)
self.redistributor = EnergyRedistributor(self.allocations)
self.initialized = True
return {
'total_energy': self.total_energy,
'targets': len(self.allocations),
'allocation_details': {
name: {
'importance': alloc.importance,
'allocated': alloc.allocated,
'state': alloc.state.value
}
for name, alloc in self.allocations.items()
}
}
def select_next_target(self) -> Optional[str]:
"""
选择下一个生成目标
Returns:
目标功能点名称,如果没有可用目标则返回 None
"""
if not self.initialized:
logger.warning("Energy allocator not initialized.")
return None
self.current_target = self.selector.select_next_target()
return self.current_target.function_point if self.current_target else None
def record_generation(self,
success: bool,
coverage_delta: float = 0.0,
energy_cost: float = 1.0,
quality_score: float = 0.0) -> Dict[str, Any]:
"""
记录一次生成尝试
Args:
success: 是否成功覆盖目标
coverage_delta: 覆盖率变化
energy_cost: 消耗的能量
quality_score: 代码质量分数
Returns:
更新结果
"""
if not self.current_target:
return {'status': 'error', 'message': 'No current target'}
result = GenerationResult(
function_point=self.current_target.function_point,
success=success,
coverage_delta=coverage_delta,
energy_cost=energy_cost,
quality_score=quality_score
)
update_result = self.tracker.record_generation(result)
# 如果成功,触发重分配
if success:
self.redistributor.redistribute(self.current_target.function_point)
return update_result
def get_status(self) -> Dict[str, Any]:
"""获取当前状态"""
if not self.initialized:
return {'initialized': False}
active_count = sum(1 for a in self.allocations.values()
if a.state == EnergyState.ACTIVE and a.remaining > 0)
completed_count = sum(1 for a in self.allocations.values()
if a.state == EnergyState.COMPLETED)
return {
'initialized': True,
'total_energy': self.total_energy,
'total_targets': len(self.allocations),
'active_targets': active_count,
'completed_targets': completed_count,
'current_target': self.current_target.function_point if self.current_target else None,
'statistics': self.tracker.get_statistics() if self.tracker else None
}
def get_target_context(self, target_name: str = None) -> str:
"""
获取目标功能的上下文信息(用于 Prompt
Args:
target_name: 目标名称(默认使用当前目标)
Returns:
上下文字符串
"""
if not target_name and self.current_target:
target_name = self.current_target.function_point
if not target_name or target_name not in self.allocations:
return ""
alloc = self.allocations[target_name]
context = []
context.append(f"[TARGET: {target_name}]")
context.append(f"Importance: {alloc.importance:.2f}")
context.append(f"Remaining Energy: {alloc.remaining:.1f} / {alloc.allocated:.1f}")
context.append(f"Previous Attempts: {alloc.total_attempts}")
if alloc.consecutive_failures > 0:
context.append(f"Warning: {alloc.consecutive_failures} consecutive failures")
context.append("Consider a different approach or sequence")
return "\n".join(context)
def mark_targets_completed(self, function_names: List[str]) -> Dict[str, str]:
"""
将已确认覆盖的功能点直接标记为完成。
这用于基线同步或一次迭代中命中多个功能点的情况,
避免仅依赖当前 target 的涨分信号来判断完成状态。
"""
if not self.initialized:
return {}
updates = {}
for name in function_names:
if name not in self.allocations:
continue
alloc = self.allocations[name]
if alloc.state == EnergyState.COMPLETED:
updates[name] = "already_completed"
continue
alloc.state = EnergyState.COMPLETED
alloc.consecutive_failures = 0
alloc.remaining = 0.0
updates[name] = "completed"
self.redistributor.redistribute(name)
return updates
def generate_report(self) -> str:
"""生成能量分配报告"""
if not self.initialized:
return "Energy allocator not initialized."
lines = []
lines.append("=" * 60)
lines.append("ENERGY ALLOCATION REPORT")
lines.append("=" * 60)
lines.append(f"Total Energy: {self.total_energy:.1f}")
lines.append(f"Max Iterations: {self.max_iterations}")
lines.append("")
lines.append("FUNCTION POINT STATUS:")
lines.append("-" * 60)
for name, alloc in sorted(self.allocations.items(),
key=lambda x: x[1].importance, reverse=True):
status_icon = {
EnergyState.ACTIVE: "🔄",
EnergyState.COMPLETED: "",
EnergyState.DEPLETED: "",
EnergyState.SUSPENDED: "⏸️"
}.get(alloc.state, "")
efficiency = (alloc.successful_attempts / alloc.total_attempts * 100
if alloc.total_attempts > 0 else 0)
lines.append(f"{status_icon} {name}")
lines.append(f" Importance: {alloc.importance:.2f} | "
f"Energy: {alloc.remaining:.1f}/{alloc.allocated:.1f} | "
f"Efficiency: {efficiency:.0f}%")
lines.append(f" Attempts: {alloc.total_attempts} | "
f"Consecutive Failures: {alloc.consecutive_failures}")
lines.append("")
lines.append("SUMMARY:")
lines.append("-" * 60)
stats = self.tracker.get_statistics()
lines.append(f"Total Attempts: {stats['total_attempts']}")
lines.append(f"Successful: {stats['successful_attempts']}")
lines.append(f"Success Rate: {stats['success_rate']*100:.1f}%")
completed = sum(1 for a in self.allocations.values()
if a.state == EnergyState.COMPLETED)
lines.append(f"Targets Covered: {completed} / {len(self.allocations)}")
lines.append("=" * 60)
return "\n".join(lines)
# ============================================================================
# 便捷函数
# ============================================================================
def create_energy_allocator(function_points: List[Dict],
max_iterations: int = 5) -> EnergyAllocator:
"""
便捷函数:创建并初始化能量分配器
Args:
function_points: 功能点列表
max_iterations: 最大迭代次数
Returns:
初始化完成的能量分配器
"""
allocator = EnergyAllocator(max_iterations=max_iterations)
allocator.initialize(function_points)
return allocator