Files
CGA-bench/experiments/analyze_paper_experiments.py
2026-05-22 10:02:42 +08:00

749 lines
26 KiB
Python

"""
Aggregate paper-style experiment runs into CSV summaries, statistics, and plots.
"""
from __future__ import annotations
import argparse
import csv
import json
import math
import os
import random
import re
import statistics
import sys
from collections import defaultdict
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from experiments.paper_tasks import NEGATIVE_CASE_TASKS, POSITIVE_CASE_TASKS
DEFAULT_OUTPUT_ROOT = PROJECT_ROOT / "analysis" / "paper_runs"
def maybe_enable_matplotlib():
os.environ.setdefault("MPLCONFIGDIR", "/tmp/mplconfig-correctbench")
Path(os.environ["MPLCONFIGDIR"]).mkdir(parents=True, exist_ok=True)
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
return plt
except Exception:
return None
def parse_eval2_ratio(value):
if not value or not isinstance(value, str) or "/" not in value:
return None
try:
num, den = value.split("/", 1)
num_i = int(num)
den_i = int(den)
return num_i / den_i if den_i else None
except Exception:
return None
def safe_mean(values):
return statistics.mean(values) if values else None
def safe_std(values):
return statistics.stdev(values) if len(values) >= 2 else 0.0
def _parse_quality_report(quality_report_path: Path) -> float:
"""
Parse quality_evaluation_report.txt to extract semantic coverage.
Args:
quality_report_path: Path to quality_evaluation_report.txt
Returns:
Semantic coverage as float (0-100), or None if not found
"""
if not quality_report_path.exists():
return None
try:
content = quality_report_path.read_text(encoding="utf-8", errors="ignore")
match = re.search(r"Semantic Coverage:\s*([\d.]+)%", content)
if match:
return float(match.group(1))
except Exception:
pass
return None
def percentile(sorted_values, q):
if not sorted_values:
return None
if len(sorted_values) == 1:
return sorted_values[0]
pos = q * (len(sorted_values) - 1)
lower = int(math.floor(pos))
upper = int(math.ceil(pos))
if lower == upper:
return sorted_values[lower]
weight = pos - lower
return sorted_values[lower] * (1.0 - weight) + sorted_values[upper] * weight
def bootstrap_mean_ci(values, samples=2000, confidence=0.95, seed=0):
if not values:
return (None, None, None)
rng = random.Random(seed)
boot = []
for _ in range(samples):
sampled = [values[rng.randrange(len(values))] for _ in range(len(values))]
boot.append(statistics.mean(sampled))
boot.sort()
alpha = (1.0 - confidence) / 2.0
return (
statistics.mean(values),
percentile(boot, alpha),
percentile(boot, 1.0 - alpha),
)
def rankdata_average(abs_values):
indexed = sorted(enumerate(abs_values), key=lambda item: item[1])
ranks = [0.0] * len(abs_values)
cursor = 0
while cursor < len(indexed):
end = cursor + 1
while end < len(indexed) and indexed[end][1] == indexed[cursor][1]:
end += 1
avg_rank = (cursor + 1 + end) / 2.0
for idx, _ in indexed[cursor:end]:
ranks[idx] = avg_rank
cursor = end
return ranks
def norm_sf(z):
return 0.5 * math.erfc(z / math.sqrt(2.0))
def wilcoxon_signed_rank(differences):
nonzero = [diff for diff in differences if abs(diff) > 1e-12]
if not nonzero:
return {
"n": 0,
"w_plus": 0.0,
"w_minus": 0.0,
"p_value": 1.0,
"method": "degenerate",
}
abs_values = [abs(diff) for diff in nonzero]
ranks = rankdata_average(abs_values)
w_plus = sum(rank for diff, rank in zip(nonzero, ranks) if diff > 0)
w_minus = sum(rank for diff, rank in zip(nonzero, ranks) if diff < 0)
n = len(nonzero)
if n <= 25:
scaled_ranks = [int(round(rank * 2)) for rank in ranks]
total_scaled = sum(scaled_ranks)
counts = {0: 1}
for rank in scaled_ranks:
next_counts = defaultdict(int)
for subtotal, cnt in counts.items():
next_counts[subtotal] += cnt
next_counts[subtotal + rank] += cnt
counts = next_counts
observed_low = int(round(min(w_plus, w_minus) * 2))
total_assignments = 2 ** n
tail = sum(cnt for subtotal, cnt in counts.items() if subtotal <= observed_low)
p_value = min(1.0, 2.0 * tail / total_assignments)
method = "exact"
else:
total_rank = n * (n + 1) / 2.0
mean_w = total_rank / 2.0
var_w = n * (n + 1) * (2 * n + 1) / 24.0
z = (abs(w_plus - mean_w) - 0.5) / math.sqrt(var_w)
p_value = min(1.0, 2.0 * norm_sf(abs(z)))
method = "normal_approx"
return {
"n": n,
"w_plus": w_plus,
"w_minus": w_minus,
"p_value": p_value,
"method": method,
}
def parse_task_log(task_log_path: Path):
trace = []
first_improvement_iter = None
current_iter = None
if not task_log_path.exists():
return trace, first_improvement_iter
baseline_pattern = re.compile(r"Baseline Coverage: ([0-9.]+)%")
iter_pattern = re.compile(r"--- CGA Iter (\d+) /")
improve_pattern = re.compile(r"Coverage Improved! \+([0-9.]+)% \(([0-9.]+)% -> ([0-9.]+)%\)")
final_pattern = re.compile(r"CGA Finished\. Final Coverage: ([0-9.]+)%")
with task_log_path.open("r", encoding="utf-8", errors="ignore") as f:
for line in f:
baseline_match = baseline_pattern.search(line)
if baseline_match:
trace.append((0, float(baseline_match.group(1))))
continue
iter_match = iter_pattern.search(line)
if iter_match:
current_iter = int(iter_match.group(1))
continue
improve_match = improve_pattern.search(line)
if improve_match:
if first_improvement_iter is None:
first_improvement_iter = current_iter
trace.append((current_iter if current_iter is not None else len(trace), float(improve_match.group(3))))
continue
final_match = final_pattern.search(line)
if final_match and trace:
if trace[-1][1] != float(final_match.group(1)):
trace.append((current_iter if current_iter is not None else len(trace), float(final_match.group(1))))
return trace, first_improvement_iter
def load_manifest(path: Path):
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def collect_run_rows(manifest_rows):
run_rows = []
per_task_trace = {}
for run_meta in manifest_rows:
if run_meta.get("returncode", 1) != 0:
continue
run_dir = Path(run_meta.get("run_dir", ""))
if not run_dir.exists():
continue
run_info_path = run_dir / "Chatbench_RunInfo.json"
if not run_info_path.exists():
continue
with run_info_path.open("r", encoding="utf-8") as f:
data = json.load(f)
for task_info in data:
task_id = task_info.get("task_id")
task_dir = run_dir / task_id
task_log_path = task_dir / "task_log.log"
coverage_trace, first_improvement_iter = parse_task_log(task_log_path)
per_task_trace[(run_meta["model"], run_meta["condition"], run_meta["repeat"], task_id)] = coverage_trace
# Determine quality report path based on condition
condition = run_meta.get("condition", "")
if condition == "baseline":
quality_report_path = task_dir / "CGA_baseline" / "quality_evaluation_report.txt"
elif condition == "cga":
quality_report_path = task_dir / "CGA" / "quality_evaluation_report.txt"
else:
quality_report_path = None
semantic_coverage = _parse_quality_report(quality_report_path) if quality_report_path else None
run_rows.append({
"experiment_name": run_meta["experiment_name"],
"model": run_meta["model"],
"condition": condition,
"repeat": run_meta["repeat"],
"run_dir": str(run_dir),
"task_id": task_id,
"coverage": task_info.get("coverage"),
"semantic_coverage": semantic_coverage,
"eval1_pass": task_info.get("Eval1_pass"),
"eval2_pass": task_info.get("Eval2_pass"),
"eval2_ratio": task_info.get("Eval2_ratio"),
"eval2_ratio_float": parse_eval2_ratio(task_info.get("Eval2_ratio")),
"eval2_failed_mutants": ",".join(map(str, task_info.get("Eval2_failed_mutant_idxes", []))),
"full_pass": task_info.get("full_pass"),
"time_sec": task_info.get("time"),
"token_cost": task_info.get("token_cost"),
"first_improvement_iter": first_improvement_iter,
"op_record": ",".join(task_info.get("op_record", [])),
"task_log": str(task_log_path),
})
return run_rows, per_task_trace
def write_csv(path: Path, rows: list[dict], fieldnames: list[str]):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow({name: row.get(name, "") for name in fieldnames})
def summarize_task_condition(run_rows):
grouped = defaultdict(list)
for row in run_rows:
grouped[(row["model"], row["condition"], row["task_id"])].append(row)
summary_rows = []
for (model, condition, task_id), rows in sorted(grouped.items()):
coverage_values = [row["coverage"] for row in rows if isinstance(row["coverage"], (int, float))]
semantic_values = [row["semantic_coverage"] for row in rows if isinstance(row["semantic_coverage"], (int, float))]
eval2_values = [row["eval2_ratio_float"] for row in rows if isinstance(row["eval2_ratio_float"], float)]
time_values = [row["time_sec"] for row in rows if isinstance(row["time_sec"], (int, float))]
cost_values = [row["token_cost"] for row in rows if isinstance(row["token_cost"], (int, float))]
improvement_iters = [row["first_improvement_iter"] for row in rows if isinstance(row["first_improvement_iter"], int)]
summary_rows.append({
"model": model,
"condition": condition,
"task_id": task_id,
"n_runs": len(rows),
"coverage_mean": safe_mean(coverage_values),
"coverage_std": safe_std(coverage_values),
"coverage_best": max(coverage_values) if coverage_values else None,
"semantic_coverage_mean": safe_mean(semantic_values),
"semantic_coverage_std": safe_std(semantic_values),
"semantic_coverage_best": max(semantic_values) if semantic_values else None,
"eval2_mean": safe_mean(eval2_values),
"eval2_std": safe_std(eval2_values),
"eval2_best": max(eval2_values) if eval2_values else None,
"time_mean_sec": safe_mean(time_values),
"time_std_sec": safe_std(time_values),
"token_cost_mean": safe_mean(cost_values),
"token_cost_std": safe_std(cost_values),
"first_improvement_iter_mean": safe_mean(improvement_iters),
})
return summary_rows
def build_paired_rows(run_rows):
keyed = {}
for row in run_rows:
key = (row["model"], row["repeat"], row["task_id"], row["condition"])
keyed[key] = row
paired_rows = []
triplets = set((row["model"], row["repeat"], row["task_id"]) for row in run_rows)
for model, repeat, task_id in sorted(triplets):
baseline = keyed.get((model, repeat, task_id, "baseline"))
cga = keyed.get((model, repeat, task_id, "cga"))
if not baseline or not cga:
continue
baseline_cov = baseline.get("coverage")
cga_cov = cga.get("coverage")
baseline_eval2 = baseline.get("eval2_ratio_float")
cga_eval2 = cga.get("eval2_ratio_float")
baseline_sem = baseline.get("semantic_coverage")
cga_sem = cga.get("semantic_coverage")
paired_rows.append({
"model": model,
"repeat": repeat,
"task_id": task_id,
"baseline_coverage": baseline_cov,
"cga_coverage": cga_cov,
"coverage_delta": (cga_cov - baseline_cov) if isinstance(baseline_cov, (int, float)) and isinstance(cga_cov, (int, float)) else None,
"baseline_semantic_coverage": baseline_sem,
"cga_semantic_coverage": cga_sem,
"semantic_coverage_delta": (cga_sem - baseline_sem) if isinstance(baseline_sem, (int, float)) and isinstance(cga_sem, (int, float)) else None,
"baseline_eval2": baseline_eval2,
"cga_eval2": cga_eval2,
"eval2_delta": (cga_eval2 - baseline_eval2) if isinstance(baseline_eval2, float) and isinstance(cga_eval2, float) else None,
"baseline_time_sec": baseline.get("time_sec"),
"cga_time_sec": cga.get("time_sec"),
"baseline_token_cost": baseline.get("token_cost"),
"cga_token_cost": cga.get("token_cost"),
})
return paired_rows
def summarize_paired_rows(paired_rows):
grouped = defaultdict(list)
for row in paired_rows:
grouped[(row["model"], row["task_id"])].append(row)
summary_rows = []
for (model, task_id), rows in sorted(grouped.items()):
deltas = [row["coverage_delta"] for row in rows if isinstance(row["coverage_delta"], (int, float))]
sem_deltas = [row["semantic_coverage_delta"] for row in rows if isinstance(row["semantic_coverage_delta"], (int, float))]
mean_delta, ci_low, ci_high = bootstrap_mean_ci(deltas)
sem_mean_delta, sem_ci_low, sem_ci_high = bootstrap_mean_ci(sem_deltas) if sem_deltas else (None, None, None)
summary_rows.append({
"model": model,
"task_id": task_id,
"paired_n": len(rows),
"coverage_delta_mean": mean_delta,
"coverage_delta_ci_low": ci_low,
"coverage_delta_ci_high": ci_high,
"coverage_delta_best": max(deltas) if deltas else None,
"coverage_delta_worst": min(deltas) if deltas else None,
"semantic_coverage_delta_mean": sem_mean_delta,
"semantic_coverage_delta_ci_low": sem_ci_low,
"semantic_coverage_delta_ci_high": sem_ci_high,
})
return summary_rows
def write_stats_summary(path: Path, run_rows, paired_rows, task_delta_rows):
path.parent.mkdir(parents=True, exist_ok=True)
lines = []
lines.append("CorrectBench Paper Experiment Summary")
lines.append("=" * 50)
lines.append(f"Total run-level rows: {len(run_rows)}")
lines.append(f"Total paired rows: {len(paired_rows)}")
lines.append("")
overall_deltas = [row["coverage_delta"] for row in paired_rows if isinstance(row["coverage_delta"], (int, float))]
overall_wilcoxon = wilcoxon_signed_rank(overall_deltas)
overall_mean, overall_ci_low, overall_ci_high = bootstrap_mean_ci(overall_deltas)
lines.append("[Overall Coverage Delta]")
lines.append(f"Mean delta: {overall_mean}")
lines.append(f"95% bootstrap CI: [{overall_ci_low}, {overall_ci_high}]")
lines.append(
"Wilcoxon signed-rank: "
f"n={overall_wilcoxon['n']}, p={overall_wilcoxon['p_value']:.6f}, method={overall_wilcoxon['method']}"
)
lines.append("")
# Semantic coverage delta analysis
overall_sem_deltas = [row["semantic_coverage_delta"] for row in paired_rows if isinstance(row["semantic_coverage_delta"], (int, float))]
if overall_sem_deltas:
overall_sem_wilcoxon = wilcoxon_signed_rank(overall_sem_deltas)
overall_sem_mean, overall_sem_ci_low, overall_sem_ci_high = bootstrap_mean_ci(overall_sem_deltas)
lines.append("[Overall Semantic Coverage Delta]")
lines.append(f"Mean delta: {overall_sem_mean}")
lines.append(f"95% bootstrap CI: [{overall_sem_ci_low}, {overall_sem_ci_high}]")
lines.append(
"Wilcoxon signed-rank: "
f"n={overall_sem_wilcoxon['n']}, p={overall_sem_wilcoxon['p_value']:.6f}, method={overall_sem_wilcoxon['method']}"
)
lines.append("")
else:
lines.append("[Overall Semantic Coverage Delta]")
lines.append("No valid paired semantic coverage data available")
lines.append("")
lines.append("[Per-Task Paired Coverage Delta]")
for row in task_delta_rows:
lines.append(
f"{row['model']} | {row['task_id']}: "
f"mean={row['coverage_delta_mean']} "
f"CI=[{row['coverage_delta_ci_low']}, {row['coverage_delta_ci_high']}] "
f"n={row['paired_n']}"
)
path.write_text("\n".join(lines), encoding="utf-8")
def write_case_studies(path: Path, run_rows):
path.parent.mkdir(parents=True, exist_ok=True)
lines = []
lines.append("# Case Study Pointers")
lines.append("")
grouped = defaultdict(list)
for row in run_rows:
if row["condition"] != "cga":
continue
if row["task_id"] not in POSITIVE_CASE_TASKS:
continue
if not isinstance(row["coverage"], (int, float)):
continue
grouped[row["task_id"]].append(row)
for task_id in POSITIVE_CASE_TASKS:
candidates = grouped.get(task_id, [])
if not candidates:
continue
best = max(candidates, key=lambda row: row["coverage"])
task_dir = Path(best["run_dir"]) / task_id / "CGA"
lines.append(f"## {task_id}")
lines.append(f"- Best run dir: {best['run_dir']}")
lines.append(f"- Final coverage: {best['coverage']}")
lines.append(f"- Eval2 ratio: {best['eval2_ratio']}")
lines.append(f"- First improvement iter: {best['first_improvement_iter']}")
lines.append(f"- Prompt root: {task_dir}")
lines.append("")
lines.append("## Negative Cases")
for task_id in NEGATIVE_CASE_TASKS:
lines.append(f"- {task_id}")
path.write_text("\n".join(lines), encoding="utf-8")
def plot_task_level_comparison(output_dir: Path, task_summary_rows, plt):
if plt is None:
return
cga_rows = [row for row in task_summary_rows if row["condition"] == "cga"]
baseline_map = {
(row["model"], row["task_id"]): row
for row in task_summary_rows
if row["condition"] == "baseline"
}
if not cga_rows:
return
labels = [row["task_id"] for row in cga_rows]
baseline_values = [baseline_map.get((row["model"], row["task_id"]), {}).get("coverage_mean", 0.0) or 0.0 for row in cga_rows]
cga_values = [row.get("coverage_mean", 0.0) or 0.0 for row in cga_rows]
fig, ax = plt.subplots(figsize=(max(12, len(labels) * 0.35), 6))
x_positions = list(range(len(labels)))
ax.bar([x - 0.2 for x in x_positions], baseline_values, width=0.4, label="Baseline")
ax.bar([x + 0.2 for x in x_positions], cga_values, width=0.4, label="CGA")
ax.set_ylabel("Mean structural coverage")
ax.set_title("Task-level Baseline vs CGA coverage")
ax.set_xticks(x_positions)
ax.set_xticklabels(labels, rotation=75, ha="right")
ax.legend()
fig.tight_layout()
fig.savefig(output_dir / "task_level_comparison.png", dpi=200)
plt.close(fig)
def plot_delta_distribution(output_dir: Path, task_delta_rows, plt):
if plt is None or not task_delta_rows:
return
sorted_rows = sorted(task_delta_rows, key=lambda row: row["coverage_delta_mean"] or -999, reverse=True)
labels = [row["task_id"] for row in sorted_rows]
means = [row["coverage_delta_mean"] or 0.0 for row in sorted_rows]
fig, ax = plt.subplots(figsize=(max(12, len(labels) * 0.35), 6))
ax.bar(range(len(labels)), means)
ax.axhline(0.0, color="black", linewidth=1)
ax.set_ylabel("Mean paired coverage delta")
ax.set_title("Coverage delta distribution by task")
ax.set_xticks(range(len(labels)))
ax.set_xticklabels(labels, rotation=75, ha="right")
fig.tight_layout()
fig.savefig(output_dir / "coverage_delta_distribution.png", dpi=200)
plt.close(fig)
def plot_coverage_vs_eval2(output_dir: Path, run_rows, plt):
if plt is None:
return
fig, ax = plt.subplots(figsize=(8, 6))
plotted = False
for condition, marker in [("baseline", "o"), ("cga", "^")]:
subset = [
row for row in run_rows
if row["condition"] == condition
and isinstance(row["coverage"], (int, float))
and isinstance(row["eval2_ratio_float"], float)
]
if not subset:
continue
plotted = True
ax.scatter(
[row["coverage"] for row in subset],
[row["eval2_ratio_float"] for row in subset],
label=condition,
marker=marker,
alpha=0.7,
)
ax.set_xlabel("Structural coverage")
ax.set_ylabel("Eval2 ratio")
ax.set_title("Coverage vs Eval2")
if plotted:
ax.legend()
fig.tight_layout()
fig.savefig(output_dir / "coverage_vs_eval2_scatter.png", dpi=200)
plt.close(fig)
def plot_case_studies(output_dir: Path, run_rows, plt):
if plt is None:
return
best_rows = {}
for task_id in POSITIVE_CASE_TASKS:
candidates = [
row for row in run_rows
if row["condition"] == "cga"
and row["task_id"] == task_id
and isinstance(row["coverage"], (int, float))
]
if candidates:
best_rows[task_id] = max(candidates, key=lambda row: row["coverage"])
if not best_rows:
return
fig, ax = plt.subplots(figsize=(8, 5))
for task_id, row in best_rows.items():
trace, _ = parse_task_log(Path(row["task_log"]))
if not trace:
continue
x_vals = [point[0] for point in trace]
y_vals = [point[1] for point in trace]
ax.plot(x_vals, y_vals, marker="o", label=task_id)
ax.set_xlabel("CGA iteration")
ax.set_ylabel("Structural coverage")
ax.set_title("Case-study coverage traces")
ax.legend()
fig.tight_layout()
fig.savefig(output_dir / "case_study_iterations.png", dpi=200)
plt.close(fig)
def parse_args():
parser = argparse.ArgumentParser(description="Analyze CorrectBench paper experiment manifests.")
parser.add_argument(
"--manifest",
required=True,
help="Path to run_manifest.json generated by run_paper_experiments.py",
)
parser.add_argument(
"--output-dir",
default="",
help="Optional analysis output directory. Defaults to the manifest directory.",
)
return parser.parse_args()
def main():
args = parse_args()
manifest_path = Path(args.manifest)
if not manifest_path.is_absolute():
manifest_path = (PROJECT_ROOT / manifest_path).resolve()
output_dir = Path(args.output_dir) if args.output_dir else manifest_path.parent
if not output_dir.is_absolute():
output_dir = (PROJECT_ROOT / output_dir).resolve()
output_dir.mkdir(parents=True, exist_ok=True)
manifest_rows = load_manifest(manifest_path)
run_rows, _ = collect_run_rows(manifest_rows)
task_summary_rows = summarize_task_condition(run_rows)
paired_rows = build_paired_rows(run_rows)
task_delta_rows = summarize_paired_rows(paired_rows)
write_csv(
output_dir / "run_level.csv",
run_rows,
[
"experiment_name",
"model",
"condition",
"repeat",
"run_dir",
"task_id",
"coverage",
"semantic_coverage",
"eval1_pass",
"eval2_pass",
"eval2_ratio",
"eval2_ratio_float",
"eval2_failed_mutants",
"full_pass",
"time_sec",
"token_cost",
"first_improvement_iter",
"op_record",
"task_log",
],
)
write_csv(
output_dir / "task_summary.csv",
task_summary_rows,
[
"model",
"condition",
"task_id",
"n_runs",
"coverage_mean",
"coverage_std",
"coverage_best",
"semantic_coverage_mean",
"semantic_coverage_std",
"semantic_coverage_best",
"eval2_mean",
"eval2_std",
"eval2_best",
"time_mean_sec",
"time_std_sec",
"token_cost_mean",
"token_cost_std",
"first_improvement_iter_mean",
],
)
write_csv(
output_dir / "paired_deltas.csv",
paired_rows,
[
"model",
"repeat",
"task_id",
"baseline_coverage",
"cga_coverage",
"coverage_delta",
"baseline_semantic_coverage",
"cga_semantic_coverage",
"semantic_coverage_delta",
"baseline_eval2",
"cga_eval2",
"eval2_delta",
"baseline_time_sec",
"cga_time_sec",
"baseline_token_cost",
"cga_token_cost",
],
)
write_csv(
output_dir / "task_delta_summary.csv",
task_delta_rows,
[
"model",
"task_id",
"paired_n",
"coverage_delta_mean",
"coverage_delta_ci_low",
"coverage_delta_ci_high",
"coverage_delta_best",
"coverage_delta_worst",
"semantic_coverage_delta_mean",
"semantic_coverage_delta_ci_low",
"semantic_coverage_delta_ci_high",
],
)
write_stats_summary(output_dir / "stats_summary.txt", run_rows, paired_rows, task_delta_rows)
write_case_studies(output_dir / "case_studies.md", run_rows)
plt = maybe_enable_matplotlib()
plot_task_level_comparison(output_dir, task_summary_rows, plt)
plot_delta_distribution(output_dir, task_delta_rows, plt)
plot_coverage_vs_eval2(output_dir, run_rows, plt)
plot_case_studies(output_dir, run_rows, plt)
print(f"Analysis outputs written to {output_dir}")
if __name__ == "__main__":
main()