"""The Estimator itself — log-log linear regression per target.
Design notes
------------
For sparse direct factorisation on 3D SPD meshes, factor nnz
scales roughly as :math:`n^{4/3}` under nested dissection
ordering (George 1973) and solve cost scales with the same
exponent. Peak RSS tracks the factor size linearly. A log-log
linear model ``log(y) = α + β · log(n_dof)`` captures that power
law directly, with ``β`` falling out of the fit near the expected
theoretical exponent.
We use a **per-host, per-linear-solver** coefficient pair — every
(host_signature, linear_solver) combination gets its own ``(α,
β)`` if there are ≥ 2 training rows for it. Everything else
falls back to a shared prior fit across all training data.
The estimator ships with an empty training set; callers feed it
with :func:`femorph_solver.estimators.load_training_rows` (typically
from the TA-6 benchmark JSON they just generated on their box).
That's the simplest possible arrangement that gives useful
numbers without bundling a multi-MB trained model into the
package.
References
----------
- George, A. "Nested dissection of a regular finite element
mesh." SIAM J. Numer. Anal. 10 (1973), pp. 345-363. The
origin of the ``n^(4/3)`` factor-fill bound on structured 3D
grids.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
import numpy as np
from femorph_solver.estimators._host import HostSpec
from femorph_solver.estimators._loader import TrainingRow
from femorph_solver.estimators._problem import ProblemSpec
#: Minimum rows needed for a per-(host, backend) fit. Below this
#: we fall back to the shared prior across all training data.
_MIN_ROWS_PER_BUCKET = 2
#: Default prior when no training data exists at all — a
#: shape-of-universe estimate tuned from our own 2026-04-24
#: benchmark sweeps on an i9-14900KF. Tunes itself as soon as the
#: user provides any training data of their own. Format:
#: ``(log_alpha, beta)`` for ``log(y) = log_alpha + beta · log(n_dof)``.
_DEFAULT_PRIORS: dict[str, tuple[float, float]] = {
# Wall-time: on a 14900KF MKL/pardiso SPD pipeline, 20 k DOFs →
# ~0.3 s, 130 k DOFs → ~5 s → log₁₀(y/x^(4/3)) ≈ −6.7 → α =
# ln(10^−6.7) ≈ −15.4 on the natural-log scale.
"wall_s": (-15.4, 1.33),
# Peak RSS: ~200 MB + ~50 MB/nnz linear. Using log-log
# dominated by factor fill — similar slope.
"peak_rss_mb": (-4.6, 1.00),
}
#: OOC multipliers from the TA-1 bench (``ooc-limits.rst`` table):
#: +3.5× wall, −22 % peak RSS. Applied post-prediction.
_OOC_WALL_FACTOR = 3.5
_OOC_PEAK_FACTOR = 0.78
[docs]
@dataclass
class Estimate:
"""Prediction return type."""
wall_s_p50: float
wall_s_p95: float
peak_rss_mb_p50: float
peak_rss_mb_p95: float
#: How many training rows fed this bucket's coefficients.
#: ``0`` means we fell through to the default prior and the
#: confidence band is very wide.
n_training_rows: int
#: Which bucket was hit: ``"(host, backend)"`` on a per-host
#: fit, ``"shared"`` on the cross-host fallback, ``"prior"`` on
#: no training data at all.
bucket: str
def __str__(self) -> str:
return (
f"Estimate(wall_s=p50 {self.wall_s_p50:.2f} / p95 {self.wall_s_p95:.2f}, "
f"peak_rss_mb=p50 {self.peak_rss_mb_p50:.0f} / p95 {self.peak_rss_mb_p95:.0f}, "
f"bucket={self.bucket!r}, n_training={self.n_training_rows})"
)
[docs]
class Estimator:
"""Fit per-(host, backend) log-log linear models from training rows.
Usage::
>>> from femorph_solver.estimators import (
... Estimator, HostSpec, ProblemSpec, load_training_rows
... )
>>> est = Estimator.fit(load_training_rows())
>>> est.predict(ProblemSpec(n_dof=100_000, n_modes=10),
... host=HostSpec.auto())
Estimate(wall_s=p50 3.47 / p95 ..., peak_rss_mb=p50 ...,
bucket='Intel(R) ...|94101.0|Linux 6.12.66 (x86_64)|mkl_direct',
n_training=5)
"""
def __init__(
self,
fits: dict[str, dict[str, tuple[float, float, int]]] | None = None,
) -> None:
"""``fits`` keyed by ``"target"`` (``wall_s`` / ``peak_rss_mb``)
→ ``"bucket"`` → ``(log_alpha, beta, n_rows)``. Empty by
default; fit from training rows via :meth:`Estimator.fit`.
"""
self.fits = fits or {"wall_s": {}, "peak_rss_mb": {}}
[docs]
@classmethod
def fit(cls, rows: list[TrainingRow]) -> Estimator:
"""Fit log-log linear coefficients on every (host, backend)
bucket with ≥ :data:`_MIN_ROWS_PER_BUCKET` rows. Unbucketed
training data feeds a ``"shared"`` bucket.
Parameters
----------
rows
Training rows loaded via
:func:`femorph_solver.estimators.load_training_rows`.
"""
fits: dict[str, dict[str, tuple[float, float, int]]] = {
"wall_s": {},
"peak_rss_mb": {},
}
# Group rows by (host_signature, linear_solver) — the
# combination that affects both prediction targets
# uniformly. eigen_solver could be a third axis but
# eigsh / lobpcg cost is already captured via the wall_s
# delta at that linear backend, and splitting three ways
# starves buckets at small training set sizes.
buckets: dict[str, list[TrainingRow]] = {}
for r in rows:
key = f"{r.host_signature}|{r.linear_solver}"
buckets.setdefault(key, []).append(r)
buckets.setdefault("shared", []).append(r)
for target in ("wall_s", "peak_rss_mb"):
for bucket_key, bucket_rows in buckets.items():
if len(bucket_rows) < _MIN_ROWS_PER_BUCKET:
continue
# Extract (n_dof, y) pairs. Skip OOC rows for
# in-core predictions; the OOC multiplier handles
# them post-hoc.
ic_rows = [r for r in bucket_rows if not r.ooc]
if len(ic_rows) < _MIN_ROWS_PER_BUCKET:
continue
xs = np.array([float(r.n_dof) for r in ic_rows])
ys = np.array([getattr(r, target) for r in ic_rows])
mask = (xs > 0) & (ys > 0)
xs, ys = xs[mask], ys[mask]
if len(xs) < _MIN_ROWS_PER_BUCKET:
continue
# log-log fit: log(y) = α + β · log(n_dof)
lx = np.log(xs)
ly = np.log(ys)
beta, alpha = np.polyfit(lx, ly, 1)
fits[target][bucket_key] = (float(alpha), float(beta), len(xs))
return cls(fits=fits)
def _lookup(
self,
target: str,
bucket_key: str,
) -> tuple[float, float, int, str]:
"""Resolve (α, β, n_rows, bucket_name) for a target + bucket."""
by_bucket = self.fits.get(target, {})
if bucket_key in by_bucket:
α, β, n = by_bucket[bucket_key]
return α, β, n, bucket_key
if "shared" in by_bucket:
α, β, n = by_bucket["shared"]
return α, β, n, "shared"
α, β = _DEFAULT_PRIORS[target]
return α, β, 0, "prior"
[docs]
def predict(
self,
problem: ProblemSpec,
*,
host: HostSpec | None = None,
) -> Estimate:
"""Predict ``(wall_s, peak_rss_mb)`` for a single solve.
Parameters
----------
problem
:class:`ProblemSpec` describing the solve — ``n_dof`` is
the dominant feature, ``linear_solver`` picks the
bucket.
host
:class:`HostSpec` of the target machine.
``HostSpec.auto()`` by default.
Returns
-------
Estimate
``p50`` / ``p95`` confidence bands on wall-time and peak
RSS. The bands widen when the bucket had fewer training
rows or when we fell back to the shared / prior fit.
"""
host = host or HostSpec.auto()
bucket_key = f"{host.signature()}|{problem.linear_solver}"
α_w, β_w, n_w, bw = self._lookup("wall_s", bucket_key)
α_r, β_r, n_r, br = self._lookup("peak_rss_mb", bucket_key)
log_n = math.log(max(1, problem.n_dof))
wall_p50 = math.exp(α_w + β_w * log_n)
peak_p50 = math.exp(α_r + β_r * log_n)
# Confidence widening:
# - "prior" (no data): ±50% band (p95 = p50 × 2).
# - "shared" (cross-host fit): ±30%.
# - per-bucket fit: scale inversely with √n_rows, floored
# at ±10%.
def _p95_mult(n_rows: int, bucket_name: str) -> float:
if bucket_name == "prior":
return 2.0
if bucket_name == "shared":
return 1.3
# Per-bucket: 1.1 at n_rows ≥ 10, up to 1.5 at n_rows = 2.
return max(1.1, 1.5 - 0.04 * (n_rows - 2))
wall_p95 = wall_p50 * _p95_mult(n_w, bw)
peak_p95 = peak_p50 * _p95_mult(n_r, br)
if problem.ooc:
wall_p50 *= _OOC_WALL_FACTOR
wall_p95 *= _OOC_WALL_FACTOR
peak_p50 *= _OOC_PEAK_FACTOR
peak_p95 *= _OOC_PEAK_FACTOR
n_training = n_w # n for wall_s, same bucket → same count
return Estimate(
wall_s_p50=wall_p50,
wall_s_p95=wall_p95,
peak_rss_mb_p50=peak_p50,
peak_rss_mb_p95=peak_p95,
n_training_rows=n_training,
bucket=bw,
)