"""Load TA-6 JSON benchmark output into estimator training rows.
Input contract: ``femorph-benchmark-*.json`` produced by
:mod:`femorph_solver.benchmark`. Schema is append-only and
documented in :file:`doc/source/user-guide/solving/benchmark.rst`;
this loader reads what it needs via ``.get()`` so new fields
don't break older historical files.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class TrainingRow:
"""One training sample for the estimator.
Derived entirely from TA-6 JSON output; no live solver state
required. ``host_signature`` groups rows that share a box so
the estimator can fit per-host coefficients when enough data
is available.
"""
host_signature: str
n_dof: int
n_modes: int
linear_solver: str
eigen_solver: str
ooc: bool
wall_s: float
eig_s: float
peak_rss_mb: float
source_path: Path
def _host_signature_from_report(report: str) -> str:
"""Extract a stable host signature from the Report string.
The Report block contains ``CPU model`` / ``RAM (MB)`` lines
we can scrape deterministically. Missing fields degrade to
``"unknown-host"`` so the loader never raises.
"""
cpu = "unknown"
ram = "unknown"
os_line = "unknown"
for line in (report or "").splitlines():
if "CPU model" in line and ":" in line:
cpu = line.split(":", 1)[1].strip()
elif line.strip().startswith("RAM (MB)") and ":" in line:
ram = line.split(":", 1)[1].strip()
elif line.strip().startswith("OS ") and ":" in line and os_line == "unknown":
os_line = line.split(":", 1)[1].strip()
return f"{cpu}|{ram}|{os_line}"
[docs]
def load_training_rows(paths: list[Path | str] | None = None) -> list[TrainingRow]:
"""Scan a list of benchmark JSON files into training rows.
Parameters
----------
paths
Explicit list of JSON file paths. When ``None``, globs
the current working directory for ``femorph-benchmark-*.json``
— useful when a user dumps their last few runs alongside the
script they're running.
Returns
-------
list[TrainingRow]
Every successful benchmark row, one per entry. Failed rows
(``ok != True``) are skipped so the estimator doesn't train
on timeouts / crashes.
"""
if paths is None:
paths = sorted(Path.cwd().glob("femorph-benchmark-*.json"))
else:
paths = [Path(p) for p in paths]
rows: list[TrainingRow] = []
for p in paths:
try:
payload = json.loads(p.read_text())
except (OSError, json.JSONDecodeError):
continue
host_sig = _host_signature_from_report(payload.get("host_report", ""))
for r in payload.get("rows", []):
if not r.get("ok"):
continue
spec = r.get("spec", {})
rows.append(
TrainingRow(
host_signature=host_sig,
n_dof=int(r.get("n_dof", 0) or spec.get("n_dof", 0)),
n_modes=int(spec.get("n_modes", 10)),
linear_solver=str(spec.get("linear_solver", "auto")),
eigen_solver=str(spec.get("eigen_solver", "arpack")),
ooc=bool(spec.get("ooc", False)),
wall_s=float(r.get("wall_s", 0.0)),
eig_s=float(r.get("eig_s", 0.0)),
peak_rss_mb=float(r.get("peak_rss_mb", 0.0)),
source_path=p,
)
)
return rows
[docs]
def load_validation_rows(
paths: list[Path | str] | None = None,
*,
host_signature: str = "unknown-host",
) -> list[TrainingRow]:
"""Scan validation JSON files into estimator training rows.
Validation runs (via :mod:`femorph_solver.validation`) write
their convergence records through
:func:`femorph_solver.validation._report.render_json`. Each
record carries per-refinement ``wall_s`` + ``peak_rss_mb``
fields (added in TA-10h-8) — exactly what the estimator
training fit consumes.
Because the validation reports have one file per study and
don't embed a host report, the caller passes an explicit
``host_signature`` — typically derived from a companion
:class:`femorph_solver.Report` snapshot. If omitted the
loader uses ``"unknown-host"``; the estimator's per-host
bucket handling gracefully degrades in that case.
Parameters
----------
paths
Explicit list of JSON file paths. When ``None``, globs
the current working directory for
``femorph-validation-*.json`` and ``validation.json``.
host_signature
A stable signature for the host these runs came from.
Returns
-------
list[TrainingRow]
One row per refinement × quantity. When the same
(problem, refinement) appears in multiple published
quantities, the identical wall / peak row is emitted for
each quantity — the estimator treats them as duplicate
observations of the same solve cost, which is desired.
"""
if paths is None:
cwd = Path.cwd()
paths = sorted(cwd.glob("femorph-validation-*.json")) + sorted(cwd.glob("validation.json"))
else:
paths = [Path(p) for p in paths]
rows: list[TrainingRow] = []
for p in paths:
try:
payload = json.loads(p.read_text())
except (OSError, json.JSONDecodeError):
continue
if not isinstance(payload, list):
continue
for rec in payload:
for r in rec.get("results", []):
# Only rows with timing present are usable.
wall = r.get("wall_s")
peak = r.get("peak_rss_mb")
if wall is None or peak is None:
continue
mesh = r.get("mesh_params", {}) or {}
n_modes = int(mesh.get("n_modes", 1))
rows.append(
TrainingRow(
host_signature=host_signature,
n_dof=int(r.get("n_dof") or 0),
n_modes=n_modes,
linear_solver="auto",
eigen_solver="arpack",
ooc=False,
wall_s=float(wall),
# Validation runs don't separately time the
# eigensolve; approximate as the full wall
# for modal studies. Static studies leave
# this at zero.
eig_s=float(wall),
peak_rss_mb=float(peak),
source_path=p,
)
)
return rows