749 lines
26 KiB
Python
749 lines
26 KiB
Python
"""
|
|
Aggregate paper-style experiment runs into CSV summaries, statistics, and plots.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import math
|
|
import os
|
|
import random
|
|
import re
|
|
import statistics
|
|
import sys
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
if str(PROJECT_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
from experiments.paper_tasks import NEGATIVE_CASE_TASKS, POSITIVE_CASE_TASKS
|
|
|
|
|
|
DEFAULT_OUTPUT_ROOT = PROJECT_ROOT / "analysis" / "paper_runs"
|
|
|
|
|
|
def maybe_enable_matplotlib():
|
|
os.environ.setdefault("MPLCONFIGDIR", "/tmp/mplconfig-correctbench")
|
|
Path(os.environ["MPLCONFIGDIR"]).mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
import matplotlib
|
|
|
|
matplotlib.use("Agg")
|
|
import matplotlib.pyplot as plt
|
|
|
|
return plt
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def parse_eval2_ratio(value):
|
|
if not value or not isinstance(value, str) or "/" not in value:
|
|
return None
|
|
try:
|
|
num, den = value.split("/", 1)
|
|
num_i = int(num)
|
|
den_i = int(den)
|
|
return num_i / den_i if den_i else None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def safe_mean(values):
|
|
return statistics.mean(values) if values else None
|
|
|
|
|
|
def safe_std(values):
|
|
return statistics.stdev(values) if len(values) >= 2 else 0.0
|
|
|
|
|
|
def _parse_quality_report(quality_report_path: Path) -> float:
|
|
"""
|
|
Parse quality_evaluation_report.txt to extract semantic coverage.
|
|
|
|
Args:
|
|
quality_report_path: Path to quality_evaluation_report.txt
|
|
|
|
Returns:
|
|
Semantic coverage as float (0-100), or None if not found
|
|
"""
|
|
if not quality_report_path.exists():
|
|
return None
|
|
try:
|
|
content = quality_report_path.read_text(encoding="utf-8", errors="ignore")
|
|
match = re.search(r"Semantic Coverage:\s*([\d.]+)%", content)
|
|
if match:
|
|
return float(match.group(1))
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def percentile(sorted_values, q):
|
|
if not sorted_values:
|
|
return None
|
|
if len(sorted_values) == 1:
|
|
return sorted_values[0]
|
|
pos = q * (len(sorted_values) - 1)
|
|
lower = int(math.floor(pos))
|
|
upper = int(math.ceil(pos))
|
|
if lower == upper:
|
|
return sorted_values[lower]
|
|
weight = pos - lower
|
|
return sorted_values[lower] * (1.0 - weight) + sorted_values[upper] * weight
|
|
|
|
|
|
def bootstrap_mean_ci(values, samples=2000, confidence=0.95, seed=0):
|
|
if not values:
|
|
return (None, None, None)
|
|
rng = random.Random(seed)
|
|
boot = []
|
|
for _ in range(samples):
|
|
sampled = [values[rng.randrange(len(values))] for _ in range(len(values))]
|
|
boot.append(statistics.mean(sampled))
|
|
boot.sort()
|
|
alpha = (1.0 - confidence) / 2.0
|
|
return (
|
|
statistics.mean(values),
|
|
percentile(boot, alpha),
|
|
percentile(boot, 1.0 - alpha),
|
|
)
|
|
|
|
|
|
def rankdata_average(abs_values):
|
|
indexed = sorted(enumerate(abs_values), key=lambda item: item[1])
|
|
ranks = [0.0] * len(abs_values)
|
|
cursor = 0
|
|
while cursor < len(indexed):
|
|
end = cursor + 1
|
|
while end < len(indexed) and indexed[end][1] == indexed[cursor][1]:
|
|
end += 1
|
|
avg_rank = (cursor + 1 + end) / 2.0
|
|
for idx, _ in indexed[cursor:end]:
|
|
ranks[idx] = avg_rank
|
|
cursor = end
|
|
return ranks
|
|
|
|
|
|
def norm_sf(z):
|
|
return 0.5 * math.erfc(z / math.sqrt(2.0))
|
|
|
|
|
|
def wilcoxon_signed_rank(differences):
|
|
nonzero = [diff for diff in differences if abs(diff) > 1e-12]
|
|
if not nonzero:
|
|
return {
|
|
"n": 0,
|
|
"w_plus": 0.0,
|
|
"w_minus": 0.0,
|
|
"p_value": 1.0,
|
|
"method": "degenerate",
|
|
}
|
|
|
|
abs_values = [abs(diff) for diff in nonzero]
|
|
ranks = rankdata_average(abs_values)
|
|
w_plus = sum(rank for diff, rank in zip(nonzero, ranks) if diff > 0)
|
|
w_minus = sum(rank for diff, rank in zip(nonzero, ranks) if diff < 0)
|
|
n = len(nonzero)
|
|
|
|
if n <= 25:
|
|
scaled_ranks = [int(round(rank * 2)) for rank in ranks]
|
|
total_scaled = sum(scaled_ranks)
|
|
counts = {0: 1}
|
|
for rank in scaled_ranks:
|
|
next_counts = defaultdict(int)
|
|
for subtotal, cnt in counts.items():
|
|
next_counts[subtotal] += cnt
|
|
next_counts[subtotal + rank] += cnt
|
|
counts = next_counts
|
|
observed_low = int(round(min(w_plus, w_minus) * 2))
|
|
total_assignments = 2 ** n
|
|
tail = sum(cnt for subtotal, cnt in counts.items() if subtotal <= observed_low)
|
|
p_value = min(1.0, 2.0 * tail / total_assignments)
|
|
method = "exact"
|
|
else:
|
|
total_rank = n * (n + 1) / 2.0
|
|
mean_w = total_rank / 2.0
|
|
var_w = n * (n + 1) * (2 * n + 1) / 24.0
|
|
z = (abs(w_plus - mean_w) - 0.5) / math.sqrt(var_w)
|
|
p_value = min(1.0, 2.0 * norm_sf(abs(z)))
|
|
method = "normal_approx"
|
|
|
|
return {
|
|
"n": n,
|
|
"w_plus": w_plus,
|
|
"w_minus": w_minus,
|
|
"p_value": p_value,
|
|
"method": method,
|
|
}
|
|
|
|
|
|
def parse_task_log(task_log_path: Path):
|
|
trace = []
|
|
first_improvement_iter = None
|
|
current_iter = None
|
|
|
|
if not task_log_path.exists():
|
|
return trace, first_improvement_iter
|
|
|
|
baseline_pattern = re.compile(r"Baseline Coverage: ([0-9.]+)%")
|
|
iter_pattern = re.compile(r"--- CGA Iter (\d+) /")
|
|
improve_pattern = re.compile(r"Coverage Improved! \+([0-9.]+)% \(([0-9.]+)% -> ([0-9.]+)%\)")
|
|
final_pattern = re.compile(r"CGA Finished\. Final Coverage: ([0-9.]+)%")
|
|
|
|
with task_log_path.open("r", encoding="utf-8", errors="ignore") as f:
|
|
for line in f:
|
|
baseline_match = baseline_pattern.search(line)
|
|
if baseline_match:
|
|
trace.append((0, float(baseline_match.group(1))))
|
|
continue
|
|
|
|
iter_match = iter_pattern.search(line)
|
|
if iter_match:
|
|
current_iter = int(iter_match.group(1))
|
|
continue
|
|
|
|
improve_match = improve_pattern.search(line)
|
|
if improve_match:
|
|
if first_improvement_iter is None:
|
|
first_improvement_iter = current_iter
|
|
trace.append((current_iter if current_iter is not None else len(trace), float(improve_match.group(3))))
|
|
continue
|
|
|
|
final_match = final_pattern.search(line)
|
|
if final_match and trace:
|
|
if trace[-1][1] != float(final_match.group(1)):
|
|
trace.append((current_iter if current_iter is not None else len(trace), float(final_match.group(1))))
|
|
|
|
return trace, first_improvement_iter
|
|
|
|
|
|
def load_manifest(path: Path):
|
|
with path.open("r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def collect_run_rows(manifest_rows):
|
|
run_rows = []
|
|
per_task_trace = {}
|
|
|
|
for run_meta in manifest_rows:
|
|
if run_meta.get("returncode", 1) != 0:
|
|
continue
|
|
run_dir = Path(run_meta.get("run_dir", ""))
|
|
if not run_dir.exists():
|
|
continue
|
|
|
|
run_info_path = run_dir / "Chatbench_RunInfo.json"
|
|
if not run_info_path.exists():
|
|
continue
|
|
|
|
with run_info_path.open("r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
for task_info in data:
|
|
task_id = task_info.get("task_id")
|
|
task_dir = run_dir / task_id
|
|
task_log_path = task_dir / "task_log.log"
|
|
coverage_trace, first_improvement_iter = parse_task_log(task_log_path)
|
|
per_task_trace[(run_meta["model"], run_meta["condition"], run_meta["repeat"], task_id)] = coverage_trace
|
|
|
|
# Determine quality report path based on condition
|
|
condition = run_meta.get("condition", "")
|
|
if condition == "baseline":
|
|
quality_report_path = task_dir / "CGA_baseline" / "quality_evaluation_report.txt"
|
|
elif condition == "cga":
|
|
quality_report_path = task_dir / "CGA" / "quality_evaluation_report.txt"
|
|
else:
|
|
quality_report_path = None
|
|
|
|
semantic_coverage = _parse_quality_report(quality_report_path) if quality_report_path else None
|
|
|
|
run_rows.append({
|
|
"experiment_name": run_meta["experiment_name"],
|
|
"model": run_meta["model"],
|
|
"condition": condition,
|
|
"repeat": run_meta["repeat"],
|
|
"run_dir": str(run_dir),
|
|
"task_id": task_id,
|
|
"coverage": task_info.get("coverage"),
|
|
"semantic_coverage": semantic_coverage,
|
|
"eval1_pass": task_info.get("Eval1_pass"),
|
|
"eval2_pass": task_info.get("Eval2_pass"),
|
|
"eval2_ratio": task_info.get("Eval2_ratio"),
|
|
"eval2_ratio_float": parse_eval2_ratio(task_info.get("Eval2_ratio")),
|
|
"eval2_failed_mutants": ",".join(map(str, task_info.get("Eval2_failed_mutant_idxes", []))),
|
|
"full_pass": task_info.get("full_pass"),
|
|
"time_sec": task_info.get("time"),
|
|
"token_cost": task_info.get("token_cost"),
|
|
"first_improvement_iter": first_improvement_iter,
|
|
"op_record": ",".join(task_info.get("op_record", [])),
|
|
"task_log": str(task_log_path),
|
|
})
|
|
|
|
return run_rows, per_task_trace
|
|
|
|
|
|
def write_csv(path: Path, rows: list[dict], fieldnames: list[str]):
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8", newline="") as f:
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
for row in rows:
|
|
writer.writerow({name: row.get(name, "") for name in fieldnames})
|
|
|
|
|
|
def summarize_task_condition(run_rows):
|
|
grouped = defaultdict(list)
|
|
for row in run_rows:
|
|
grouped[(row["model"], row["condition"], row["task_id"])].append(row)
|
|
|
|
summary_rows = []
|
|
for (model, condition, task_id), rows in sorted(grouped.items()):
|
|
coverage_values = [row["coverage"] for row in rows if isinstance(row["coverage"], (int, float))]
|
|
semantic_values = [row["semantic_coverage"] for row in rows if isinstance(row["semantic_coverage"], (int, float))]
|
|
eval2_values = [row["eval2_ratio_float"] for row in rows if isinstance(row["eval2_ratio_float"], float)]
|
|
time_values = [row["time_sec"] for row in rows if isinstance(row["time_sec"], (int, float))]
|
|
cost_values = [row["token_cost"] for row in rows if isinstance(row["token_cost"], (int, float))]
|
|
improvement_iters = [row["first_improvement_iter"] for row in rows if isinstance(row["first_improvement_iter"], int)]
|
|
|
|
summary_rows.append({
|
|
"model": model,
|
|
"condition": condition,
|
|
"task_id": task_id,
|
|
"n_runs": len(rows),
|
|
"coverage_mean": safe_mean(coverage_values),
|
|
"coverage_std": safe_std(coverage_values),
|
|
"coverage_best": max(coverage_values) if coverage_values else None,
|
|
"semantic_coverage_mean": safe_mean(semantic_values),
|
|
"semantic_coverage_std": safe_std(semantic_values),
|
|
"semantic_coverage_best": max(semantic_values) if semantic_values else None,
|
|
"eval2_mean": safe_mean(eval2_values),
|
|
"eval2_std": safe_std(eval2_values),
|
|
"eval2_best": max(eval2_values) if eval2_values else None,
|
|
"time_mean_sec": safe_mean(time_values),
|
|
"time_std_sec": safe_std(time_values),
|
|
"token_cost_mean": safe_mean(cost_values),
|
|
"token_cost_std": safe_std(cost_values),
|
|
"first_improvement_iter_mean": safe_mean(improvement_iters),
|
|
})
|
|
|
|
return summary_rows
|
|
|
|
|
|
def build_paired_rows(run_rows):
|
|
keyed = {}
|
|
for row in run_rows:
|
|
key = (row["model"], row["repeat"], row["task_id"], row["condition"])
|
|
keyed[key] = row
|
|
|
|
paired_rows = []
|
|
triplets = set((row["model"], row["repeat"], row["task_id"]) for row in run_rows)
|
|
for model, repeat, task_id in sorted(triplets):
|
|
baseline = keyed.get((model, repeat, task_id, "baseline"))
|
|
cga = keyed.get((model, repeat, task_id, "cga"))
|
|
if not baseline or not cga:
|
|
continue
|
|
|
|
baseline_cov = baseline.get("coverage")
|
|
cga_cov = cga.get("coverage")
|
|
baseline_eval2 = baseline.get("eval2_ratio_float")
|
|
cga_eval2 = cga.get("eval2_ratio_float")
|
|
baseline_sem = baseline.get("semantic_coverage")
|
|
cga_sem = cga.get("semantic_coverage")
|
|
|
|
paired_rows.append({
|
|
"model": model,
|
|
"repeat": repeat,
|
|
"task_id": task_id,
|
|
"baseline_coverage": baseline_cov,
|
|
"cga_coverage": cga_cov,
|
|
"coverage_delta": (cga_cov - baseline_cov) if isinstance(baseline_cov, (int, float)) and isinstance(cga_cov, (int, float)) else None,
|
|
"baseline_semantic_coverage": baseline_sem,
|
|
"cga_semantic_coverage": cga_sem,
|
|
"semantic_coverage_delta": (cga_sem - baseline_sem) if isinstance(baseline_sem, (int, float)) and isinstance(cga_sem, (int, float)) else None,
|
|
"baseline_eval2": baseline_eval2,
|
|
"cga_eval2": cga_eval2,
|
|
"eval2_delta": (cga_eval2 - baseline_eval2) if isinstance(baseline_eval2, float) and isinstance(cga_eval2, float) else None,
|
|
"baseline_time_sec": baseline.get("time_sec"),
|
|
"cga_time_sec": cga.get("time_sec"),
|
|
"baseline_token_cost": baseline.get("token_cost"),
|
|
"cga_token_cost": cga.get("token_cost"),
|
|
})
|
|
|
|
return paired_rows
|
|
|
|
|
|
def summarize_paired_rows(paired_rows):
|
|
grouped = defaultdict(list)
|
|
for row in paired_rows:
|
|
grouped[(row["model"], row["task_id"])].append(row)
|
|
|
|
summary_rows = []
|
|
for (model, task_id), rows in sorted(grouped.items()):
|
|
deltas = [row["coverage_delta"] for row in rows if isinstance(row["coverage_delta"], (int, float))]
|
|
sem_deltas = [row["semantic_coverage_delta"] for row in rows if isinstance(row["semantic_coverage_delta"], (int, float))]
|
|
mean_delta, ci_low, ci_high = bootstrap_mean_ci(deltas)
|
|
sem_mean_delta, sem_ci_low, sem_ci_high = bootstrap_mean_ci(sem_deltas) if sem_deltas else (None, None, None)
|
|
summary_rows.append({
|
|
"model": model,
|
|
"task_id": task_id,
|
|
"paired_n": len(rows),
|
|
"coverage_delta_mean": mean_delta,
|
|
"coverage_delta_ci_low": ci_low,
|
|
"coverage_delta_ci_high": ci_high,
|
|
"coverage_delta_best": max(deltas) if deltas else None,
|
|
"coverage_delta_worst": min(deltas) if deltas else None,
|
|
"semantic_coverage_delta_mean": sem_mean_delta,
|
|
"semantic_coverage_delta_ci_low": sem_ci_low,
|
|
"semantic_coverage_delta_ci_high": sem_ci_high,
|
|
})
|
|
return summary_rows
|
|
|
|
|
|
def write_stats_summary(path: Path, run_rows, paired_rows, task_delta_rows):
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
lines = []
|
|
lines.append("CorrectBench Paper Experiment Summary")
|
|
lines.append("=" * 50)
|
|
lines.append(f"Total run-level rows: {len(run_rows)}")
|
|
lines.append(f"Total paired rows: {len(paired_rows)}")
|
|
lines.append("")
|
|
|
|
overall_deltas = [row["coverage_delta"] for row in paired_rows if isinstance(row["coverage_delta"], (int, float))]
|
|
overall_wilcoxon = wilcoxon_signed_rank(overall_deltas)
|
|
overall_mean, overall_ci_low, overall_ci_high = bootstrap_mean_ci(overall_deltas)
|
|
lines.append("[Overall Coverage Delta]")
|
|
lines.append(f"Mean delta: {overall_mean}")
|
|
lines.append(f"95% bootstrap CI: [{overall_ci_low}, {overall_ci_high}]")
|
|
lines.append(
|
|
"Wilcoxon signed-rank: "
|
|
f"n={overall_wilcoxon['n']}, p={overall_wilcoxon['p_value']:.6f}, method={overall_wilcoxon['method']}"
|
|
)
|
|
lines.append("")
|
|
|
|
# Semantic coverage delta analysis
|
|
overall_sem_deltas = [row["semantic_coverage_delta"] for row in paired_rows if isinstance(row["semantic_coverage_delta"], (int, float))]
|
|
if overall_sem_deltas:
|
|
overall_sem_wilcoxon = wilcoxon_signed_rank(overall_sem_deltas)
|
|
overall_sem_mean, overall_sem_ci_low, overall_sem_ci_high = bootstrap_mean_ci(overall_sem_deltas)
|
|
lines.append("[Overall Semantic Coverage Delta]")
|
|
lines.append(f"Mean delta: {overall_sem_mean}")
|
|
lines.append(f"95% bootstrap CI: [{overall_sem_ci_low}, {overall_sem_ci_high}]")
|
|
lines.append(
|
|
"Wilcoxon signed-rank: "
|
|
f"n={overall_sem_wilcoxon['n']}, p={overall_sem_wilcoxon['p_value']:.6f}, method={overall_sem_wilcoxon['method']}"
|
|
)
|
|
lines.append("")
|
|
else:
|
|
lines.append("[Overall Semantic Coverage Delta]")
|
|
lines.append("No valid paired semantic coverage data available")
|
|
lines.append("")
|
|
|
|
lines.append("[Per-Task Paired Coverage Delta]")
|
|
for row in task_delta_rows:
|
|
lines.append(
|
|
f"{row['model']} | {row['task_id']}: "
|
|
f"mean={row['coverage_delta_mean']} "
|
|
f"CI=[{row['coverage_delta_ci_low']}, {row['coverage_delta_ci_high']}] "
|
|
f"n={row['paired_n']}"
|
|
)
|
|
|
|
path.write_text("\n".join(lines), encoding="utf-8")
|
|
|
|
|
|
def write_case_studies(path: Path, run_rows):
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
lines = []
|
|
lines.append("# Case Study Pointers")
|
|
lines.append("")
|
|
|
|
grouped = defaultdict(list)
|
|
for row in run_rows:
|
|
if row["condition"] != "cga":
|
|
continue
|
|
if row["task_id"] not in POSITIVE_CASE_TASKS:
|
|
continue
|
|
if not isinstance(row["coverage"], (int, float)):
|
|
continue
|
|
grouped[row["task_id"]].append(row)
|
|
|
|
for task_id in POSITIVE_CASE_TASKS:
|
|
candidates = grouped.get(task_id, [])
|
|
if not candidates:
|
|
continue
|
|
best = max(candidates, key=lambda row: row["coverage"])
|
|
task_dir = Path(best["run_dir"]) / task_id / "CGA"
|
|
lines.append(f"## {task_id}")
|
|
lines.append(f"- Best run dir: {best['run_dir']}")
|
|
lines.append(f"- Final coverage: {best['coverage']}")
|
|
lines.append(f"- Eval2 ratio: {best['eval2_ratio']}")
|
|
lines.append(f"- First improvement iter: {best['first_improvement_iter']}")
|
|
lines.append(f"- Prompt root: {task_dir}")
|
|
lines.append("")
|
|
|
|
lines.append("## Negative Cases")
|
|
for task_id in NEGATIVE_CASE_TASKS:
|
|
lines.append(f"- {task_id}")
|
|
|
|
path.write_text("\n".join(lines), encoding="utf-8")
|
|
|
|
|
|
def plot_task_level_comparison(output_dir: Path, task_summary_rows, plt):
|
|
if plt is None:
|
|
return
|
|
|
|
cga_rows = [row for row in task_summary_rows if row["condition"] == "cga"]
|
|
baseline_map = {
|
|
(row["model"], row["task_id"]): row
|
|
for row in task_summary_rows
|
|
if row["condition"] == "baseline"
|
|
}
|
|
if not cga_rows:
|
|
return
|
|
|
|
labels = [row["task_id"] for row in cga_rows]
|
|
baseline_values = [baseline_map.get((row["model"], row["task_id"]), {}).get("coverage_mean", 0.0) or 0.0 for row in cga_rows]
|
|
cga_values = [row.get("coverage_mean", 0.0) or 0.0 for row in cga_rows]
|
|
|
|
fig, ax = plt.subplots(figsize=(max(12, len(labels) * 0.35), 6))
|
|
x_positions = list(range(len(labels)))
|
|
ax.bar([x - 0.2 for x in x_positions], baseline_values, width=0.4, label="Baseline")
|
|
ax.bar([x + 0.2 for x in x_positions], cga_values, width=0.4, label="CGA")
|
|
ax.set_ylabel("Mean structural coverage")
|
|
ax.set_title("Task-level Baseline vs CGA coverage")
|
|
ax.set_xticks(x_positions)
|
|
ax.set_xticklabels(labels, rotation=75, ha="right")
|
|
ax.legend()
|
|
fig.tight_layout()
|
|
fig.savefig(output_dir / "task_level_comparison.png", dpi=200)
|
|
plt.close(fig)
|
|
|
|
|
|
def plot_delta_distribution(output_dir: Path, task_delta_rows, plt):
|
|
if plt is None or not task_delta_rows:
|
|
return
|
|
|
|
sorted_rows = sorted(task_delta_rows, key=lambda row: row["coverage_delta_mean"] or -999, reverse=True)
|
|
labels = [row["task_id"] for row in sorted_rows]
|
|
means = [row["coverage_delta_mean"] or 0.0 for row in sorted_rows]
|
|
|
|
fig, ax = plt.subplots(figsize=(max(12, len(labels) * 0.35), 6))
|
|
ax.bar(range(len(labels)), means)
|
|
ax.axhline(0.0, color="black", linewidth=1)
|
|
ax.set_ylabel("Mean paired coverage delta")
|
|
ax.set_title("Coverage delta distribution by task")
|
|
ax.set_xticks(range(len(labels)))
|
|
ax.set_xticklabels(labels, rotation=75, ha="right")
|
|
fig.tight_layout()
|
|
fig.savefig(output_dir / "coverage_delta_distribution.png", dpi=200)
|
|
plt.close(fig)
|
|
|
|
|
|
def plot_coverage_vs_eval2(output_dir: Path, run_rows, plt):
|
|
if plt is None:
|
|
return
|
|
|
|
fig, ax = plt.subplots(figsize=(8, 6))
|
|
plotted = False
|
|
for condition, marker in [("baseline", "o"), ("cga", "^")]:
|
|
subset = [
|
|
row for row in run_rows
|
|
if row["condition"] == condition
|
|
and isinstance(row["coverage"], (int, float))
|
|
and isinstance(row["eval2_ratio_float"], float)
|
|
]
|
|
if not subset:
|
|
continue
|
|
plotted = True
|
|
ax.scatter(
|
|
[row["coverage"] for row in subset],
|
|
[row["eval2_ratio_float"] for row in subset],
|
|
label=condition,
|
|
marker=marker,
|
|
alpha=0.7,
|
|
)
|
|
ax.set_xlabel("Structural coverage")
|
|
ax.set_ylabel("Eval2 ratio")
|
|
ax.set_title("Coverage vs Eval2")
|
|
if plotted:
|
|
ax.legend()
|
|
fig.tight_layout()
|
|
fig.savefig(output_dir / "coverage_vs_eval2_scatter.png", dpi=200)
|
|
plt.close(fig)
|
|
|
|
|
|
def plot_case_studies(output_dir: Path, run_rows, plt):
|
|
if plt is None:
|
|
return
|
|
|
|
best_rows = {}
|
|
for task_id in POSITIVE_CASE_TASKS:
|
|
candidates = [
|
|
row for row in run_rows
|
|
if row["condition"] == "cga"
|
|
and row["task_id"] == task_id
|
|
and isinstance(row["coverage"], (int, float))
|
|
]
|
|
if candidates:
|
|
best_rows[task_id] = max(candidates, key=lambda row: row["coverage"])
|
|
|
|
if not best_rows:
|
|
return
|
|
|
|
fig, ax = plt.subplots(figsize=(8, 5))
|
|
for task_id, row in best_rows.items():
|
|
trace, _ = parse_task_log(Path(row["task_log"]))
|
|
if not trace:
|
|
continue
|
|
x_vals = [point[0] for point in trace]
|
|
y_vals = [point[1] for point in trace]
|
|
ax.plot(x_vals, y_vals, marker="o", label=task_id)
|
|
|
|
ax.set_xlabel("CGA iteration")
|
|
ax.set_ylabel("Structural coverage")
|
|
ax.set_title("Case-study coverage traces")
|
|
ax.legend()
|
|
fig.tight_layout()
|
|
fig.savefig(output_dir / "case_study_iterations.png", dpi=200)
|
|
plt.close(fig)
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Analyze CorrectBench paper experiment manifests.")
|
|
parser.add_argument(
|
|
"--manifest",
|
|
required=True,
|
|
help="Path to run_manifest.json generated by run_paper_experiments.py",
|
|
)
|
|
parser.add_argument(
|
|
"--output-dir",
|
|
default="",
|
|
help="Optional analysis output directory. Defaults to the manifest directory.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
manifest_path = Path(args.manifest)
|
|
if not manifest_path.is_absolute():
|
|
manifest_path = (PROJECT_ROOT / manifest_path).resolve()
|
|
output_dir = Path(args.output_dir) if args.output_dir else manifest_path.parent
|
|
if not output_dir.is_absolute():
|
|
output_dir = (PROJECT_ROOT / output_dir).resolve()
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
manifest_rows = load_manifest(manifest_path)
|
|
run_rows, _ = collect_run_rows(manifest_rows)
|
|
task_summary_rows = summarize_task_condition(run_rows)
|
|
paired_rows = build_paired_rows(run_rows)
|
|
task_delta_rows = summarize_paired_rows(paired_rows)
|
|
|
|
write_csv(
|
|
output_dir / "run_level.csv",
|
|
run_rows,
|
|
[
|
|
"experiment_name",
|
|
"model",
|
|
"condition",
|
|
"repeat",
|
|
"run_dir",
|
|
"task_id",
|
|
"coverage",
|
|
"semantic_coverage",
|
|
"eval1_pass",
|
|
"eval2_pass",
|
|
"eval2_ratio",
|
|
"eval2_ratio_float",
|
|
"eval2_failed_mutants",
|
|
"full_pass",
|
|
"time_sec",
|
|
"token_cost",
|
|
"first_improvement_iter",
|
|
"op_record",
|
|
"task_log",
|
|
],
|
|
)
|
|
write_csv(
|
|
output_dir / "task_summary.csv",
|
|
task_summary_rows,
|
|
[
|
|
"model",
|
|
"condition",
|
|
"task_id",
|
|
"n_runs",
|
|
"coverage_mean",
|
|
"coverage_std",
|
|
"coverage_best",
|
|
"semantic_coverage_mean",
|
|
"semantic_coverage_std",
|
|
"semantic_coverage_best",
|
|
"eval2_mean",
|
|
"eval2_std",
|
|
"eval2_best",
|
|
"time_mean_sec",
|
|
"time_std_sec",
|
|
"token_cost_mean",
|
|
"token_cost_std",
|
|
"first_improvement_iter_mean",
|
|
],
|
|
)
|
|
write_csv(
|
|
output_dir / "paired_deltas.csv",
|
|
paired_rows,
|
|
[
|
|
"model",
|
|
"repeat",
|
|
"task_id",
|
|
"baseline_coverage",
|
|
"cga_coverage",
|
|
"coverage_delta",
|
|
"baseline_semantic_coverage",
|
|
"cga_semantic_coverage",
|
|
"semantic_coverage_delta",
|
|
"baseline_eval2",
|
|
"cga_eval2",
|
|
"eval2_delta",
|
|
"baseline_time_sec",
|
|
"cga_time_sec",
|
|
"baseline_token_cost",
|
|
"cga_token_cost",
|
|
],
|
|
)
|
|
write_csv(
|
|
output_dir / "task_delta_summary.csv",
|
|
task_delta_rows,
|
|
[
|
|
"model",
|
|
"task_id",
|
|
"paired_n",
|
|
"coverage_delta_mean",
|
|
"coverage_delta_ci_low",
|
|
"coverage_delta_ci_high",
|
|
"coverage_delta_best",
|
|
"coverage_delta_worst",
|
|
"semantic_coverage_delta_mean",
|
|
"semantic_coverage_delta_ci_low",
|
|
"semantic_coverage_delta_ci_high",
|
|
],
|
|
)
|
|
|
|
write_stats_summary(output_dir / "stats_summary.txt", run_rows, paired_rows, task_delta_rows)
|
|
write_case_studies(output_dir / "case_studies.md", run_rows)
|
|
|
|
plt = maybe_enable_matplotlib()
|
|
plot_task_level_comparison(output_dir, task_summary_rows, plt)
|
|
plot_delta_distribution(output_dir, task_delta_rows, plt)
|
|
plot_coverage_vs_eval2(output_dir, run_rows, plt)
|
|
plot_case_studies(output_dir, run_rows, plt)
|
|
|
|
print(f"Analysis outputs written to {output_dir}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|