Source code for femorph_solver.estimators._estimator

"""The Estimator itself — log-log linear regression per target.

Design notes
------------

For sparse direct factorisation on 3D SPD meshes, factor nnz
scales roughly as :math:`n^{4/3}` under nested dissection
ordering (George 1973) and solve cost scales with the same
exponent.  Peak RSS tracks the factor size linearly.  A log-log
linear model ``log(y) = α + β · log(n_dof)`` captures that power
law directly, with ``β`` falling out of the fit near the expected
theoretical exponent.

We use a **per-host, per-linear-solver** coefficient pair — every
(host_signature, linear_solver) combination gets its own ``(α,
β)`` if there are ≥ 2 training rows for it.  Everything else
falls back to a shared prior fit across all training data.

The estimator ships with an empty training set; callers feed it
with :func:`femorph_solver.estimators.load_training_rows` (typically
from the TA-6 benchmark JSON they just generated on their box).
That's the simplest possible arrangement that gives useful
numbers without bundling a multi-MB trained model into the
package.

References
----------
- George, A. "Nested dissection of a regular finite element
  mesh." SIAM J. Numer. Anal. 10 (1973), pp. 345-363.  The
  origin of the ``n^(4/3)`` factor-fill bound on structured 3D
  grids.
"""

from __future__ import annotations

import math
from dataclasses import dataclass

import numpy as np

from femorph_solver.estimators._host import HostSpec
from femorph_solver.estimators._loader import TrainingRow
from femorph_solver.estimators._problem import ProblemSpec

#: Minimum rows needed for a per-(host, backend) fit.  Below this
#: we fall back to the shared prior across all training data.
_MIN_ROWS_PER_BUCKET = 2

#: Default prior when no training data exists at all — a
#: shape-of-universe estimate tuned from our own 2026-04-24
#: benchmark sweeps on an i9-14900KF.  Tunes itself as soon as the
#: user provides any training data of their own.  Format:
#: ``(log_alpha, beta)`` for ``log(y) = log_alpha + beta · log(n_dof)``.
_DEFAULT_PRIORS: dict[str, tuple[float, float]] = {
    # Wall-time: on a 14900KF MKL/pardiso SPD pipeline, 20 k DOFs →
    # ~0.3 s, 130 k DOFs → ~5 s → log₁₀(y/x^(4/3)) ≈ −6.7 → α =
    # ln(10^−6.7) ≈ −15.4 on the natural-log scale.
    "wall_s": (-15.4, 1.33),
    # Peak RSS: ~200 MB + ~50 MB/nnz linear.  Using log-log
    # dominated by factor fill — similar slope.
    "peak_rss_mb": (-4.6, 1.00),
}

#: OOC multipliers from the TA-1 bench (``ooc-limits.rst`` table):
#: +3.5× wall, −22 % peak RSS.  Applied post-prediction.
_OOC_WALL_FACTOR = 3.5
_OOC_PEAK_FACTOR = 0.78


[docs] @dataclass class Estimate: """Prediction return type.""" wall_s_p50: float wall_s_p95: float peak_rss_mb_p50: float peak_rss_mb_p95: float #: How many training rows fed this bucket's coefficients. #: ``0`` means we fell through to the default prior and the #: confidence band is very wide. n_training_rows: int #: Which bucket was hit: ``"(host, backend)"`` on a per-host #: fit, ``"shared"`` on the cross-host fallback, ``"prior"`` on #: no training data at all. bucket: str def __str__(self) -> str: return ( f"Estimate(wall_s=p50 {self.wall_s_p50:.2f} / p95 {self.wall_s_p95:.2f}, " f"peak_rss_mb=p50 {self.peak_rss_mb_p50:.0f} / p95 {self.peak_rss_mb_p95:.0f}, " f"bucket={self.bucket!r}, n_training={self.n_training_rows})" )
[docs] class Estimator: """Fit per-(host, backend) log-log linear models from training rows. Usage:: >>> from femorph_solver.estimators import ( ... Estimator, HostSpec, ProblemSpec, load_training_rows ... ) >>> est = Estimator.fit(load_training_rows()) >>> est.predict(ProblemSpec(n_dof=100_000, n_modes=10), ... host=HostSpec.auto()) Estimate(wall_s=p50 3.47 / p95 ..., peak_rss_mb=p50 ..., bucket='Intel(R) ...|94101.0|Linux 6.12.66 (x86_64)|mkl_direct', n_training=5) """ def __init__( self, fits: dict[str, dict[str, tuple[float, float, int]]] | None = None, ) -> None: """``fits`` keyed by ``"target"`` (``wall_s`` / ``peak_rss_mb``) → ``"bucket"`` → ``(log_alpha, beta, n_rows)``. Empty by default; fit from training rows via :meth:`Estimator.fit`. """ self.fits = fits or {"wall_s": {}, "peak_rss_mb": {}}
[docs] @classmethod def fit(cls, rows: list[TrainingRow]) -> Estimator: """Fit log-log linear coefficients on every (host, backend) bucket with ≥ :data:`_MIN_ROWS_PER_BUCKET` rows. Unbucketed training data feeds a ``"shared"`` bucket. Parameters ---------- rows Training rows loaded via :func:`femorph_solver.estimators.load_training_rows`. """ fits: dict[str, dict[str, tuple[float, float, int]]] = { "wall_s": {}, "peak_rss_mb": {}, } # Group rows by (host_signature, linear_solver) — the # combination that affects both prediction targets # uniformly. eigen_solver could be a third axis but # eigsh / lobpcg cost is already captured via the wall_s # delta at that linear backend, and splitting three ways # starves buckets at small training set sizes. buckets: dict[str, list[TrainingRow]] = {} for r in rows: key = f"{r.host_signature}|{r.linear_solver}" buckets.setdefault(key, []).append(r) buckets.setdefault("shared", []).append(r) for target in ("wall_s", "peak_rss_mb"): for bucket_key, bucket_rows in buckets.items(): if len(bucket_rows) < _MIN_ROWS_PER_BUCKET: continue # Extract (n_dof, y) pairs. Skip OOC rows for # in-core predictions; the OOC multiplier handles # them post-hoc. ic_rows = [r for r in bucket_rows if not r.ooc] if len(ic_rows) < _MIN_ROWS_PER_BUCKET: continue xs = np.array([float(r.n_dof) for r in ic_rows]) ys = np.array([getattr(r, target) for r in ic_rows]) mask = (xs > 0) & (ys > 0) xs, ys = xs[mask], ys[mask] if len(xs) < _MIN_ROWS_PER_BUCKET: continue # log-log fit: log(y) = α + β · log(n_dof) lx = np.log(xs) ly = np.log(ys) beta, alpha = np.polyfit(lx, ly, 1) fits[target][bucket_key] = (float(alpha), float(beta), len(xs)) return cls(fits=fits)
def _lookup( self, target: str, bucket_key: str, ) -> tuple[float, float, int, str]: """Resolve (α, β, n_rows, bucket_name) for a target + bucket.""" by_bucket = self.fits.get(target, {}) if bucket_key in by_bucket: α, β, n = by_bucket[bucket_key] return α, β, n, bucket_key if "shared" in by_bucket: α, β, n = by_bucket["shared"] return α, β, n, "shared" α, β = _DEFAULT_PRIORS[target] return α, β, 0, "prior"
[docs] def predict( self, problem: ProblemSpec, *, host: HostSpec | None = None, ) -> Estimate: """Predict ``(wall_s, peak_rss_mb)`` for a single solve. Parameters ---------- problem :class:`ProblemSpec` describing the solve — ``n_dof`` is the dominant feature, ``linear_solver`` picks the bucket. host :class:`HostSpec` of the target machine. ``HostSpec.auto()`` by default. Returns ------- Estimate ``p50`` / ``p95`` confidence bands on wall-time and peak RSS. The bands widen when the bucket had fewer training rows or when we fell back to the shared / prior fit. """ host = host or HostSpec.auto() bucket_key = f"{host.signature()}|{problem.linear_solver}" α_w, β_w, n_w, bw = self._lookup("wall_s", bucket_key) α_r, β_r, n_r, br = self._lookup("peak_rss_mb", bucket_key) log_n = math.log(max(1, problem.n_dof)) wall_p50 = math.exp(α_w + β_w * log_n) peak_p50 = math.exp(α_r + β_r * log_n) # Confidence widening: # - "prior" (no data): ±50% band (p95 = p50 × 2). # - "shared" (cross-host fit): ±30%. # - per-bucket fit: scale inversely with √n_rows, floored # at ±10%. def _p95_mult(n_rows: int, bucket_name: str) -> float: if bucket_name == "prior": return 2.0 if bucket_name == "shared": return 1.3 # Per-bucket: 1.1 at n_rows ≥ 10, up to 1.5 at n_rows = 2. return max(1.1, 1.5 - 0.04 * (n_rows - 2)) wall_p95 = wall_p50 * _p95_mult(n_w, bw) peak_p95 = peak_p50 * _p95_mult(n_r, br) if problem.ooc: wall_p50 *= _OOC_WALL_FACTOR wall_p95 *= _OOC_WALL_FACTOR peak_p50 *= _OOC_PEAK_FACTOR peak_p95 *= _OOC_PEAK_FACTOR n_training = n_w # n for wall_s, same bucket → same count return Estimate( wall_s_p50=wall_p50, wall_s_p95=wall_p95, peak_rss_mb_p50=peak_p50, peak_rss_mb_p95=peak_p95, n_training_rows=n_training, bucket=bw, )