"""femorph-solver model — a thin wrapper around a ``pyvista.UnstructuredGrid``.
The grid is the canonical mesh representation:
* ``point_data["ansys_node_num"]`` — 1-based node numbers
* ``cell_data["ansys_elem_num"]`` — 1-based element numbers
* ``cell_data["ansys_elem_type_num"]`` — element-type id per element
* ``cell_data["ansys_material_type"]`` — material id per element
* ``cell_data["ansys_real_constant"]`` — real-constant id per element
The ``ansys_*`` array names are kept for back-compatibility with
``mapdl_archive`` and existing ``.pv`` files on disk; they are
storage-format keys, not API names — every public method on
:class:`Model` operates on neutral kernel names.
Global tables that don't belong per-cell live on the Model directly:
* :attr:`etypes` — ET id → element class name (e.g. ``"TRUSS2"``)
* :attr:`materials` — MAT id → ``{"EX": ..., "PRXY": ..., "DENS": ...}``
* :attr:`real_constants` — REAL id → 1-D ``numpy.ndarray``
References
----------
* Global-assembly orchestration (loop over element groups, gather
per-element ``Kₑ`` / ``Mₑ`` into CSR via a pre-computed sparsity
pattern): Saad, Y., *Iterative Methods for Sparse Linear
Systems*, 2nd ed., SIAM, 2003, §3.3 (symbolic + numeric
assembly); Zienkiewicz, O.C. and Taylor, R.L., *The Finite
Element Method: Its Basis and Fundamentals*, 7th ed.,
Butterworth-Heinemann, 2013, §2.8.
* Boundary-condition elimination (partition into free / fixed
DOFs and solve the reduced system; ``_bc_reduce_and_release``):
Cook, Malkus, Plesha, Witt, *Concepts and Applications of
Finite Element Analysis*, 4th ed., Wiley, 2002, §2.10
(partitioning of ``K`` + ``u`` by prescribed / free DOFs);
Bathe, K.J., *Finite Element Procedures*, 2nd ed., Prentice
Hall, 2014, §3.4.
* Master–slave / cyclic constraint reduction (``P^H K P``):
Craveur, J.-C. and Jetteur, P., *Modélisation par éléments
finis*, Dunod, 2008, §17 (substructuring + cyclic); Petrolo,
M. and Zappino, E., cyclic symmetry reduction for rotor
dynamics, various references; see also
:mod:`femorph_solver.solvers.cyclic` for the detailed block.
"""
from __future__ import annotations
import contextlib
from typing import TYPE_CHECKING
import numpy as np
import pyvista as pv
import scipy.sparse as _sp
from vtkmodules.util.numpy_support import numpy_to_vtk
from vtkmodules.vtkCommonCore import VTK_UNSIGNED_CHAR, vtkTypeInt32Array
from vtkmodules.vtkCommonDataModel import vtkCellArray
from femorph_solver import _core
from femorph_solver._labels import (
DOFLabel,
ElementType,
ForceLabel,
MaterialProperty,
UnitSystem,
_token,
)
from femorph_solver.elements import get as _get_element
from femorph_solver.elements._base import CANONICAL_DOFS as _CANONICAL_DOFS
from femorph_solver.elements._base import ElementBase
if TYPE_CHECKING: # pragma: no cover
from collections.abc import Callable, Sequence
from pathlib import Path
from femorph_solver._state import BoundaryConditions, Loads
from femorph_solver.solvers.cyclic import CyclicModalResult
from femorph_solver.solvers.harmonic import HarmonicResult
from femorph_solver.solvers.modal import ModalResult
from femorph_solver.solvers.static import StaticResult
from femorph_solver.solvers.transient import TransientResult
def _canonicalize_grid(grid: pv.UnstructuredGrid) -> pv.UnstructuredGrid:
"""Return a copy of ``grid`` with points sorted by ``ansys_node_num``
and cells sorted by ``ansys_elem_num``.
Every :class:`Model` hot path assumes the grid is in
node-number-sorted order — the DOF layout walks points in the
order they appear and matrix rows inherit that order, which
means the same mesh loaded via two different paths must iterate
points identically or two freshly-assembled Ks would differ by
a permutation. Centralising the canonicalisation here (rather
than re-sorting inside every consumer) keeps the invariant
local and lets the hot path take a zero-copy view of
``point_data`` / ``cell_data``.
When the grid already happens to be sorted the function is still a
no-op on content — it shortcuts if both orderings are already
monotonic — so for meshes produced by ``_rebuild_grid`` (which
always emits canonically-sorted output) or by foreign-deck
readers (which typically do as well) there's no copy.
"""
node_nums = np.asarray(grid.point_data.get("ansys_node_num"))
elem_nums = np.asarray(grid.cell_data.get("ansys_elem_num"))
pt_order = np.argsort(node_nums, kind="stable")
cell_order = np.argsort(elem_nums, kind="stable")
pts_sorted = np.all(pt_order == np.arange(node_nums.size))
cells_sorted = np.all(cell_order == np.arange(elem_nums.size))
if pts_sorted and cells_sorted:
return grid
# Rebuild with reordered points + cells. VTK has no in-place
# ``ReorderPoints`` that also rewires connectivity, so the cleanest
# path is to extract → reorder → reassemble.
new_points = np.asarray(grid.points, dtype=np.float64)[pt_order]
# Inverse map: old VTK point index -> new VTK point index.
inverse_pt = np.empty(node_nums.size, dtype=np.int32)
inverse_pt[pt_order] = np.arange(node_nums.size, dtype=np.int32)
# Per-cell connectivity in the original VTK index space, reindexed
# to the new point order.
old_conn = np.asarray(grid.cell_connectivity, dtype=np.int64)
old_offset = np.asarray(grid.offset, dtype=np.int64)
new_conn_full = inverse_pt[old_conn].astype(np.int32, copy=False)
# Reorder cells. Each cell's block in ``new_conn_full`` starts at
# ``old_offset[i]`` and has length ``old_offset[i+1] - old_offset[i]``.
cell_sizes = np.diff(old_offset)
new_conn_chunks: list[np.ndarray] = []
new_offset = [0]
for i in cell_order:
start = int(old_offset[i])
sz = int(cell_sizes[i])
new_conn_chunks.append(new_conn_full[start : start + sz])
new_offset.append(new_offset[-1] + sz)
new_conn = (
np.concatenate(new_conn_chunks).astype(np.int32)
if new_conn_chunks
else np.empty(0, dtype=np.int32)
)
new_offset_arr = np.asarray(new_offset, dtype=np.int32)
new_celltypes = np.asarray(grid.celltypes, dtype=np.uint8)[cell_order]
# Assemble a fresh grid with int32 cell structure up-front.
vtk_dtype = vtkTypeInt32Array().GetDataType()
offsets_vtk = numpy_to_vtk(new_offset_arr, deep=True, array_type=vtk_dtype)
conn_vtk = numpy_to_vtk(new_conn, deep=True, array_type=vtk_dtype)
cell_array = vtkCellArray()
cell_array.SetData(offsets_vtk, conn_vtk)
celltypes_vtk = numpy_to_vtk(new_celltypes, deep=True, array_type=VTK_UNSIGNED_CHAR)
new_grid = pv.UnstructuredGrid()
new_grid.SetCells(celltypes_vtk, cell_array)
new_grid.points = new_points
# Carry over point/cell data in the new ordering.
for k, v in grid.point_data.items():
new_grid.point_data[k] = np.asarray(v)[pt_order]
for k, v in grid.cell_data.items():
new_grid.cell_data[k] = np.asarray(v)[cell_order]
return new_grid
def _force_int32_cells(grid: pv.UnstructuredGrid) -> None:
"""In-place: coerce ``grid.cell_connectivity`` + ``grid.offset`` to int32.
VTK builds these arrays as int64 by default on 64-bit systems even
though every real FEM mesh fits in a 32-bit index (2 G cells / points
would be ~64 GB of doubles alone). The int64 layout doubles the
memory footprint of cell structure and — more importantly — forces
a copy-cast every time our downstream code asks for the int32 view
it actually consumes (scipy CSR, the C++ scatter kernels, etc.).
Rewriting the ``vtkCellArray`` with int32 backing storage is
zero-copy after one pass — matches :func:`femorph.utilities.force_int32_itype`.
"""
if grid.n_cells == 0:
return
offset = grid.offset
if offset.dtype == np.int32:
return
conn = grid.cell_connectivity
vtk_dtype = vtkTypeInt32Array().GetDataType()
offset32 = np.ascontiguousarray(offset, dtype=np.int32)
conn32 = np.ascontiguousarray(conn, dtype=np.int32)
offsets_vtk = numpy_to_vtk(offset32, deep=True, array_type=vtk_dtype)
conn_vtk = numpy_to_vtk(conn32, deep=True, array_type=vtk_dtype)
cell_array = vtkCellArray()
cell_array.SetData(offsets_vtk, conn_vtk)
cell_types_vtk = numpy_to_vtk(grid.celltypes, deep=True, array_type=VTK_UNSIGNED_CHAR)
grid.SetCells(cell_types_vtk, cell_array)
[docs]
class Model:
def __init__(
self,
grid: pv.UnstructuredGrid | None = None,
*,
unit_system: UnitSystem | str = UnitSystem.UNSPECIFIED,
) -> None:
self._grid: pv.UnstructuredGrid = pv.UnstructuredGrid() if grid is None else grid
self._etypes: dict[int, str] = {}
self._materials: dict[int, dict[str, float]] = {}
self._real_constants: dict[int, np.ndarray] = {}
# Bookkeeping-only stamp. The solver is dimensionless — ``E = 2.1e11``
# is whatever the caller intends — but every realistic user works in
# one consistent system. Stamping the Model lets downstream readers
# (result files, stress recovery, material libraries) interpret the
# numbers without guessing. See :class:`femorph_solver.UnitSystem`.
self._unit_system: UnitSystem = (
unit_system if isinstance(unit_system, UnitSystem) else UnitSystem(unit_system)
)
self._current_type: int = 1
self._current_mat: int = 1
self._current_real: int = 1
# Incremental-build staging area for ``N`` / ``E``. After any
# grid rebuild these are drained back into the grid and cleared;
# the grid is the permanent source of truth for geometry +
# connectivity. Previously the dicts were the primary storage
# (the grid was rebuilt from them on demand) — that cost ~34 MB
# at 200×200×2 once the mesh matured because every node (80 B)
# and element (~250 B) was duplicated between dict and grid.
self._staged_nodes: dict[int, tuple[float, float, float]] = {}
# elem_num -> (node_nums, etype_id, mat_id, real_id)
self._staged_elements: dict[int, tuple[list[int], int, int, int]] = {}
self._dirty: bool = False
# Cached _dof_layout() result — reused across K/M assembly once the grid
# is clean. Invalidated by any structural mutation (n, e, et).
self._layout_cache: (
tuple[np.ndarray, dict[int, tuple[int, tuple[str, ...]]], int] | None
) = None
self._node_dof_labels_cache: dict[int, tuple[str, ...]] | None = None
# Cached (N, row_ptr, col_idx) CSR pattern — reused across K and M
# since connectivity is geometry-only. Invalidated alongside the
# layout cache on any structural mutation (n / e / et).
self._csr_pattern_cache: tuple[int, np.ndarray, np.ndarray] | None = None
# Cached assembled K and M. Invalidated on any structural or
# material mutation — see ``_invalidate_matrix_cache``. Callers
# that run modal sweeps or static solves back-to-back on the same
# Model see the zero-overhead hit on the second call.
self._K_cache: _sp.csr_array | None = None
self._M_cache: _sp.csr_array | None = None
self._M_lumped_cache: _sp.csr_array | None = None
# Boundary conditions / loads staged via D / F. Keyed by (node, dof_idx).
self._disp_bc: dict[tuple[int, int], float] = {}
self._force_bc: dict[tuple[int, int], float] = {}
@property
def grid(self) -> pv.UnstructuredGrid:
if self._dirty:
self._rebuild_grid()
return self._grid
@property
def etypes(self) -> dict[int, str]:
return self._etypes
@property
def materials(self) -> dict[int, dict[str, float]]:
return self._materials
@property
def real_constants(self) -> dict[int, np.ndarray]:
return self._real_constants
@property
def bcs(self) -> BoundaryConditions:
"""Typed view of the Model's Dirichlet boundary-condition state.
Zero-copy view over the internal ``(node, dof) → value``
storage. Provides clean inspection (``bcs.fixed``,
``bcs.n_pinned``, ``len(bcs)``, ``bcs.is_fixed(node, dof)``),
mutation helpers (``bcs.pin(...)``, ``bcs.clear()``), and a
``bcs.to_table()`` for REPL printing.
Every :meth:`fix` / :meth:`d` call on the Model shows up here
immediately; the view doesn't duplicate state.
"""
from femorph_solver._state import BoundaryConditions # noqa: PLC0415
return BoundaryConditions(self)
@property
def loads(self) -> Loads:
"""Typed view of the Model's applied nodal forces / moments.
Mirror of :attr:`bcs` for :meth:`apply_force` / :meth:`f` state.
Supports ``loads.forces``, ``loads.n_applied``,
``loads.apply(node, fz=...)``, ``loads.clear()``, and
``loads.to_table()``.
"""
from femorph_solver._state import Loads # noqa: PLC0415
return Loads(self)
@property
def unit_system(self) -> UnitSystem:
"""Current :class:`UnitSystem` stamp.
Defaults to :attr:`UnitSystem.UNSPECIFIED` for legacy Models that
pre-date this attribute. Set via :meth:`set_unit_system`, pass
via the constructor, or let :meth:`Model.from_grid` /
:func:`femorph_solver.mapdl_api.from_cdb` detect it from the input.
"""
return self._unit_system
[docs]
def set_unit_system(self, unit_system: UnitSystem | str) -> None:
"""Declare the :class:`UnitSystem` this Model's numeric inputs follow.
Bookkeeping only — the solver's math is dimensionless. The stamp
travels with the Model through assembly, solve, and the result
serialiser so downstream readers can interpret the numbers.
"""
self._unit_system = (
unit_system if isinstance(unit_system, UnitSystem) else UnitSystem(unit_system)
)
[docs]
def assert_unit_system(self, expected: UnitSystem | str) -> None:
"""Raise :class:`ValueError` if this Model's ``unit_system`` is not ``expected``.
Use this in scripts that require a specific unit convention — a
mismatch between the caller's assumption and the Model's stamp
usually means numbers will be silently wrong by a factor of
:math:`10^n`. :attr:`UnitSystem.UNSPECIFIED` always passes (no
assertion possible on an unstamped Model).
"""
want = expected if isinstance(expected, UnitSystem) else UnitSystem(expected)
if self._unit_system == UnitSystem.UNSPECIFIED:
return
if self._unit_system != want:
raise ValueError(
f"Model.unit_system is {self._unit_system.value}, expected {want.value}"
)
@property
def n_nodes(self) -> int:
return int(self.grid.n_points)
@property
def n_elements(self) -> int:
return int(self.grid.n_cells)
@property
def node_numbers(self) -> np.ndarray:
arr = self.grid.point_data.get("ansys_node_num")
return np.asarray(arr, dtype=np.int32) if arr is not None else np.empty(0, dtype=np.int32)
@property
def element_numbers(self) -> np.ndarray:
arr = self.grid.cell_data.get("ansys_elem_num")
return np.asarray(arr, dtype=np.int32) if arr is not None else np.empty(0, dtype=np.int32)
def __repr__(self) -> str:
return (
f"<femorph_solver.Model nodes={self.n_nodes} elements={self.n_elements} "
f"etypes={len(self._etypes)} materials={len(self._materials)} "
f"reals={len(self._real_constants)}>"
)
[docs]
def save(self, path: str | Path) -> Path:
"""Serialise this Model to a single ``.pv`` file (TA-15 / #265).
The mesh, cell- / point-data, materials, real constants,
boundary conditions, force loads, and unit-system stamp all
round-trip through :meth:`Model.load`. Solver-internal state
is stamped into ``field_data`` under keys prefixed with
``__solver_key_``; a vanilla pyvista reader can recognise
those as opaque and skip them.
Parameters
----------
path : str or pathlib.Path
Destination file (``.pv`` extension conventional).
Returns
-------
pathlib.Path
Canonicalised absolute path of the written file.
Examples
--------
Build, save, restore, verify the round-trip is exact::
>>> model = fs.Model.from_grid(grid)
>>> model.assign(ELEMENTS.HEX8, {"EX": 2.0e11, "PRXY": 0.30})
>>> model.save("/tmp/cantilever.pv")
>>> restored = fs.Model.load("/tmp/cantilever.pv")
>>> (model.stiffness_matrix() != restored.stiffness_matrix()).nnz == 0
True
"""
from femorph_solver._persistence import save_model # noqa: PLC0415
return save_model(self, path)
[docs]
@classmethod
def load(cls, path: str | Path) -> Model:
"""Inverse of :meth:`save` — read a ``.pv`` back into a Model.
Parameters
----------
path : str or pathlib.Path
Path to a file previously written by :meth:`save`.
Returns
-------
Model
Fresh Model with the saved geometry, etypes, materials,
real constants, BCs, loads, and unit system applied.
Raises
------
FileNotFoundError
If the file does not exist.
ValueError
If the file is missing the solver-internal state blob
(i.e. it was not written by :meth:`save`).
"""
from femorph_solver._persistence import load_model # noqa: PLC0415
return load_model(path)
[docs]
@classmethod
def from_pv(cls, path: str | Path) -> Model:
"""Load a single-file ``.pv`` Model — alias of :meth:`load`.
Mirrors the ``from_<format>`` family (:meth:`from_grid`,
:meth:`from_lines`, :meth:`from_chain`, :meth:`from_points`)
for callers who prefer the ``from_pv`` spelling for the native
single-file format :meth:`save` writes.
"""
return cls.load(path)
[docs]
@classmethod
def from_grid(cls, grid: pv.UnstructuredGrid) -> Model:
"""Wrap an existing ``pyvista.UnstructuredGrid`` as a Model.
The grid is expected to carry the structured cell/point data arrays
(``ansys_node_num``, ``ansys_elem_num``, ``ansys_elem_type_num``,
``ansys_material_type``, ``ansys_real_constant``). Missing arrays are
auto-filled with sequential 1-based ids so meshes built directly in
pyvista (e.g. a ``StructuredGrid`` cast to unstructured) are usable
without the caller stamping every field themselves.
The grid becomes the Model's sole source of truth for geometry
and connectivity — the staging dicts stay empty for the lifetime
of the Model unless the caller later mutates via :meth:`n` /
:meth:`e`. Two incidental passes happen here:
* ``cell_connectivity`` / ``offset`` are coerced to int32 if they
came in as int64 (VTK's default on 64-bit builds) — halves the
cell-structure footprint and removes a copy-cast the hot path
would otherwise do on every assembly call.
* Points and cells are reordered so ``ansys_node_num`` and
``ansys_elem_num`` are strictly increasing. Every downstream
hot path assumes node-number-sorted order to produce a deterministic
DOF layout; canonicalising here keeps the invariant local to
Model construction rather than leaking into every consumer.
"""
m = cls()
n_points = int(grid.n_points)
n_cells = int(grid.n_cells)
node_nums_arr = grid.point_data.get("ansys_node_num")
if node_nums_arr is None:
node_nums = np.arange(1, n_points + 1, dtype=np.int32)
grid.point_data["ansys_node_num"] = node_nums
else:
node_nums = np.asarray(node_nums_arr, dtype=np.int32)
if node_nums.dtype != np.int32:
grid.point_data["ansys_node_num"] = node_nums
if grid.cell_data.get("ansys_elem_num") is None:
grid.cell_data["ansys_elem_num"] = np.arange(1, n_cells + 1, dtype=np.int32)
if grid.cell_data.get("ansys_elem_type_num") is None:
grid.cell_data["ansys_elem_type_num"] = np.ones(n_cells, dtype=np.int32)
if grid.cell_data.get("ansys_material_type") is None:
grid.cell_data["ansys_material_type"] = np.ones(n_cells, dtype=np.int32)
if grid.cell_data.get("ansys_real_constant") is None:
grid.cell_data["ansys_real_constant"] = np.ones(n_cells, dtype=np.int32)
# Canonicalise + int32: ensures every hot path reads the grid
# in node-number-sorted order with zero-copy numpy views.
grid = _canonicalize_grid(grid)
_force_int32_cells(grid)
m._grid = grid
m._dirty = False
return m
[docs]
@classmethod
def from_lines(
cls,
nodes: np.ndarray,
elements: np.ndarray,
) -> Model:
"""Build a Model from explicit nodes + 2-node line connectivity.
Produces an :class:`pyvista.UnstructuredGrid` with VTK_LINE cells
— the canonical cell type for TRUSS2 / BEAM2 / SPRING. After
construction, attach an element kernel via ``model.assign(...)``.
Parameters
----------
nodes : numpy.ndarray
``(n_points, 3)`` float64 nodal coordinates.
elements : numpy.ndarray
``(n_elements, 2)`` int connectivity — each row is
``(node_i, node_j)`` indexing into ``nodes`` (0-based).
Returns
-------
Model
Native Model with ``ansys_node_num`` and ``ansys_elem_num``
stamped 1-based, ready for :meth:`assign` / :meth:`fix` /
:meth:`apply_force`.
Examples
--------
Two-link axial truss::
>>> model = Model.from_lines(
... nodes=np.array([[0, 0, 0], [1, 0, 0], [2, 0, 0]], dtype=float),
... elements=np.array([[0, 1], [1, 2]], dtype=int),
... )
>>> model.assign(ELEMENTS.TRUSS2, {"EX": 2.0e11}, real=(1e-4,))
"""
nodes_arr = np.ascontiguousarray(np.asarray(nodes, dtype=np.float64))
elements_arr = np.ascontiguousarray(np.asarray(elements, dtype=np.int64))
if nodes_arr.ndim != 2 or nodes_arr.shape[1] != 3:
raise ValueError(f"nodes must have shape (n_points, 3), got {nodes_arr.shape}")
if elements_arr.ndim != 2 or elements_arr.shape[1] != 2:
raise ValueError(f"elements must have shape (n_elements, 2), got {elements_arr.shape}")
if elements_arr.size and (
elements_arr.min() < 0 or elements_arr.max() >= nodes_arr.shape[0]
):
raise ValueError(
f"element connectivity references node index out of range [0, {nodes_arr.shape[0]})"
)
n_elems = elements_arr.shape[0]
# VTK cell-array layout: per cell ``[n_nodes, node0, node1, ...]``.
cells = np.empty((n_elems, 3), dtype=np.int64)
cells[:, 0] = 2
cells[:, 1:] = elements_arr
ctypes = np.full(n_elems, 3, dtype=np.uint8) # VTK_LINE
grid = pv.UnstructuredGrid(cells.ravel(), ctypes, nodes_arr)
return cls.from_grid(grid)
[docs]
@classmethod
def from_chain(cls, points: np.ndarray) -> Model:
"""Build a 1-D chain Model — sequential 2-node line cells.
Convenience wrapper around :meth:`from_lines` for the common
case of N points connected sequentially: ``(0,1), (1,2), …,
(N-2, N-1)``.
Parameters
----------
points : numpy.ndarray
``(n_points, 3)`` nodal coordinates along the chain.
Returns
-------
Model
Model with ``n_points - 1`` line cells.
"""
pts = np.ascontiguousarray(np.asarray(points, dtype=np.float64))
if pts.ndim != 2 or pts.shape[1] != 3:
raise ValueError(f"points must have shape (n_points, 3), got {pts.shape}")
n = pts.shape[0]
if n < 2:
raise ValueError(f"chain needs at least 2 points, got {n}")
elements = np.column_stack(
[np.arange(n - 1, dtype=np.int64), np.arange(1, n, dtype=np.int64)]
)
return cls.from_lines(pts, elements)
[docs]
@classmethod
def from_points(cls, nodes: np.ndarray) -> Model:
"""Build a Model from explicit nodes as 0-D VTK_VERTEX cells.
Each row of ``nodes`` becomes one single-node cell — the
canonical layout for POINT_MASS (POINT_MASS). Node ids and element
ids are stamped 1-based by :meth:`from_grid`.
Parameters
----------
nodes : numpy.ndarray
``(n_points, 3)`` nodal coordinates.
Returns
-------
Model
Model with ``n_points`` vertex cells.
"""
pts = np.ascontiguousarray(np.asarray(nodes, dtype=np.float64))
if pts.ndim != 2 or pts.shape[1] != 3:
raise ValueError(f"nodes must have shape (n_points, 3), got {pts.shape}")
n = pts.shape[0]
if n == 0:
raise ValueError("from_points needs at least 1 node")
cells = np.empty((n, 2), dtype=np.int64)
cells[:, 0] = 1
cells[:, 1] = np.arange(n, dtype=np.int64)
ctypes = np.full(n, 1, dtype=np.uint8) # VTK_VERTEX
grid = pv.UnstructuredGrid(cells.ravel(), ctypes, pts)
return cls.from_grid(grid)
# ---------------------------------------------------------------------
# Preprocessor verbs — APDL-dialect commands; names mirror the legacy APDL set for users porting a deck.
# ---------------------------------------------------------------------
def _invalidate_topology_caches(self) -> None:
"""Invalidate every derived state that depends on mesh connectivity."""
self._layout_cache = None
self._node_dof_labels_cache = None
self._csr_pattern_cache = None
self._K_cache = None
self._M_cache = None
self._M_lumped_cache = None
def _invalidate_matrix_cache(self) -> None:
"""Invalidate only the assembled K / M caches — materials/real changed
but connectivity is unchanged so the DOF layout / CSR pattern stay valid.
"""
self._K_cache = None
self._M_cache = None
self._M_lumped_cache = None
[docs]
def release_matrix_cache(self) -> None:
"""Drop the cached assembled K / M matrices.
Each cached matrix is roughly ``nnz × 12 bytes`` — on a
1.2 M-DOF plate with ~100 M nnz that's ~1.2 GB each, so
releasing both saves ~2.4 GB. Call after a ``modal_solve`` /
``static_solve`` finishes and the caller is done with further
analyses on this Model. Subsequent ``stiffness_matrix()`` or
``mass_matrix()`` calls will re-assemble.
"""
self._K_cache = None
self._M_cache = None
self._M_lumped_cache = None
# ---------------------------------------------------------------------
# Private APDL-verb implementations.
# ---------------------------------------------------------------------
# The APDL-dialect preprocessor verbs (``N`` / ``E`` / ``ET`` / ``TYPE`` /
# ``MAT`` / ``REAL`` / ``MP`` / ``R`` / ``D`` / ``F``) used to live on the
# ``Model`` public surface. As of TA-10c-3 they exist only as private
# ``_<verb>_impl`` helpers; :class:`femorph_solver.interop.mapdl.APDL`
# delegates to them. User-facing code on :class:`Model` uses
# :meth:`assign`, :meth:`fix`, and :meth:`apply_force` directly.
def _n_impl(self, num: int, x: float = 0.0, y: float = 0.0, z: float = 0.0) -> None:
self._staged_nodes[int(num)] = (float(x), float(y), float(z))
self._dirty = True
self._invalidate_topology_caches()
def _e_impl(self, *node_nums: int, num: int | None = None) -> int:
if num is not None:
elem_num = int(num)
else:
existing_max = 0
if self._grid.n_cells > 0:
existing_max = int(self._grid.cell_data["ansys_elem_num"].max())
staged_max = max(self._staged_elements, default=0)
elem_num = max(existing_max, staged_max) + 1
self._staged_elements[elem_num] = (
[int(n) for n in node_nums],
self._current_type,
self._current_mat,
self._current_real,
)
self._dirty = True
self._invalidate_topology_caches()
return elem_num
def _et_impl(self, et_id: int, name: str | ElementType) -> None:
name_str = _token(name).upper()
with contextlib.suppress(KeyError):
name_str = ElementType[name_str].value
_get_element(name_str)
self._etypes[int(et_id)] = name_str
self._invalidate_topology_caches()
def _type_impl(self, et_id: int) -> None:
self._current_type = int(et_id)
def _mat_impl(self, mat_id: int) -> None:
self._current_mat = int(mat_id)
def _real_impl(self, real_id: int) -> None:
self._current_real = int(real_id)
def _mp_impl(self, prop: str | MaterialProperty, mat_id: int, value: float) -> None:
prop_str = _token(prop).upper()
if not prop_str.startswith("_"):
try:
MaterialProperty(prop_str)
except ValueError as exc:
allowed = ", ".join(m.value for m in MaterialProperty)
raise ValueError(
f"unknown material property {prop!r}; expected one of: {allowed}"
) from exc
self._materials.setdefault(int(mat_id), {})[prop_str] = float(value)
self._invalidate_matrix_cache()
def _r_impl(self, real_id: int, *values: float) -> None:
self._real_constants[int(real_id)] = np.asarray(values, dtype=np.float64)
self._invalidate_matrix_cache()
# ---------------------------------------------------------------------
# Loads / boundary conditions
# ---------------------------------------------------------------------
_DOF_LABELS: dict[str, int] = {
"UX": 0,
"UY": 1,
"UZ": 2,
"ROTX": 3,
"ROTY": 4,
"ROTZ": 5,
}
_FORCE_LABELS: dict[str, int] = {
"FX": 0,
"FY": 1,
"FZ": 2,
"MX": 3,
"MY": 4,
"MZ": 5,
}
def _d_impl(self, node: int, label: str | DOFLabel, value: float = 0.0) -> None:
lbl = _token(label).upper()
if lbl == "ALL":
labels = self._node_dof_labels().get(int(node), _CANONICAL_DOFS[:3])
for active in labels:
self._disp_bc[(int(node), self._DOF_LABELS[active])] = float(value)
return
if lbl not in self._DOF_LABELS:
raise ValueError(f"unknown displacement DOF label {label!r}")
self._disp_bc[(int(node), self._DOF_LABELS[lbl])] = float(value)
def _f_impl(
self,
node: int,
label: str | ForceLabel,
value: float,
*,
accumulate: bool = False,
) -> None:
lbl = _token(label).upper()
if lbl not in self._FORCE_LABELS:
raise ValueError(f"unknown force DOF label {label!r}")
key = (int(node), self._FORCE_LABELS[lbl])
if accumulate:
self._force_bc[key] = self._force_bc.get(key, 0.0) + float(value)
else:
self._force_bc[key] = float(value)
# ---------------------------------------------------------------------
# Native Python-first API
# ---------------------------------------------------------------------
# :meth:`assign`, :meth:`fix`, and :meth:`apply_force` are the
# primary Model user surface — keyword-driven, predicate-friendly,
# vectorised. Callers porting an APDL deck line-by-line reach for
# :class:`femorph_solver.interop.mapdl.APDL` instead; it wraps the
# private ``_<verb>_impl`` helpers above, and its state writes land
# on the same stores (``_etypes`` / ``_materials`` / ``_disp_bc`` /
# ``_force_bc``) as the native methods, so the two styles mix freely.
[docs]
def assign(
self,
element,
material: dict[str, float] | None = None,
*,
et_id: int = 1,
mat_id: int = 1,
real_id: int | None = None,
real: np.ndarray | tuple[float, ...] | None = None,
) -> None:
"""Declare an element kernel and a material in one call.
Parameters
----------
element :
An :class:`~femorph_solver.ELEMENTS` spec **class**
(default-configured shorthand, e.g. ``ELEMENTS.HEX8``)
or **instance** with named, typed formulation knobs
(``ELEMENTS.HEX8(integration="enhanced_strain")``).
Strings and :class:`~femorph_solver.ElementType`
members are not accepted; reach for
:class:`~femorph_solver.interop.mapdl.APDL` if you
need the string-form preprocessor verbs at the
APDL-deck interop boundary.
material :
Mapping of material properties (``{"EX": ..., "PRXY": ...,
"DENS": ...}``). Values are stamped under ``mat_id``
(default ``1``). Accepts the output of
:func:`femorph_solver.materials.fetch` directly.
et_id, mat_id, real_id :
Numeric ids under which to register the kernel / material /
real constants. Only relevant when a model mixes multiple
element types or materials; most single-physics demos use
the defaults.
real :
Real-constant values (array-like of floats) to stamp under
``real_id``. ``None`` skips the real-constant declaration.
"""
from femorph_solver.elements._specs import is_element_spec # noqa: PLC0415
if not is_element_spec(element):
raise TypeError(
f"Model.assign: `element` must be an ELEMENTS spec class or "
f"instance (e.g. `ELEMENTS.HEX8` or "
f'`ELEMENTS.HEX8(integration="enhanced_strain")`), got '
f"{element!r} of type {type(element).__name__}. Use "
f"`APDL(model).et(...)` from `femorph_solver.interop.mapdl` "
f"if you need string-form preprocessor verbs for APDL-deck porting."
)
spec = element() if isinstance(element, type) else element
self._et_impl(et_id, spec._kernel_name)
if material is not None:
for prop, value in material.items():
# ``_HEX8_INTEGRATION`` etc. are string-valued kernel
# flags — the Model already special-cases them in
# `materials`, so pass them through the dict rather
# than through ``_mp_impl``.
if not isinstance(value, (int, float)):
self.materials.setdefault(mat_id, {})[prop] = value
continue
self._mp_impl(prop, mat_id, float(value))
flags = spec._material_flags()
if flags:
self.materials.setdefault(mat_id, {}).update(flags)
if real is not None:
if real_id is None:
real_id = 1
self._r_impl(real_id, *real)
[docs]
def fix(
self,
nodes: np.ndarray | list[int] | tuple[int, ...] | int | None = None,
*,
where: np.ndarray | None = None,
dof: str | DOFLabel | list[str | DOFLabel] | tuple[str | DOFLabel, ...] = "ALL",
value: float = 0.0,
) -> None:
"""Pin nodal DOFs to a prescribed value (Dirichlet boundary condition).
Parameters
----------
nodes :
Single node number, or an array / iterable of node
numbers. Mutually exclusive with ``where``.
where :
Boolean mask of length ``Model.n_nodes`` selecting grid
points to pin. Indexed by VTK point order — the same
order used by ``grid.points``. Mutually exclusive with
``nodes``.
dof : str, :class:`~femorph_solver.DOFLabel`, or sequence thereof,
default ``"ALL"``
DOF (or DOFs) to pin. ``"ALL"`` clamps every DOF active at
the selected node(s); a sequence clamps each listed DOF in
turn.
value : float, default ``0.0``
Value to prescribe (the right-hand side in ``K u = F``).
Zero is the lossless case for every analysis type; non-zero
values only round-trip cleanly through :func:`solve_static`.
Examples
--------
Clamp everything at the ``x = 0`` face of a bar::
model.fix(where=grid.points[:, 0] < 1e-9)
Pin a single node's UZ::
model.fix(nodes=42, dof="UZ")
Pin multiple DOFs at once::
model.fix(where=base_mask, dof=["UX", "UY", "UZ"])
"""
if (nodes is None) == (where is None):
raise ValueError("fix: pass exactly one of `nodes=` or `where=`.")
if where is not None:
where = np.asarray(where, dtype=bool)
if where.shape != (self.grid.n_points,):
raise ValueError(
f"fix: `where` must have shape ({self.grid.n_points},), got {where.shape}"
)
node_nums = np.asarray(self.grid.point_data["ansys_node_num"], dtype=np.int64)
nodes_arr = node_nums[where]
elif np.ndim(nodes) == 0:
nodes_arr = np.asarray([int(nodes)], dtype=np.int64)
else:
nodes_arr = np.asarray(nodes, dtype=np.int64)
if isinstance(dof, (list, tuple)):
dofs = list(dof)
else:
dofs = [dof]
for nn in nodes_arr:
for d in dofs:
self._d_impl(int(nn), d, value)
[docs]
def apply_force(
self,
node: int,
*,
fx: float = 0.0,
fy: float = 0.0,
fz: float = 0.0,
mx: float = 0.0,
my: float = 0.0,
mz: float = 0.0,
accumulate: bool = False,
) -> None:
"""Apply a keyword-driven nodal force / moment vector.
Sparse by construction — zero-valued components are skipped.
Any of the six :class:`~femorph_solver.ForceLabel` entries can
be supplied as keyword arguments; unused ones stay zero (and
unregistered on the Model).
Parameters
----------
node :
Node number the load is applied at.
fx, fy, fz :
Translational force components (units inherited from the
Model's :attr:`unit_system`).
mx, my, mz :
Moment components. Relevant for elements with rotational
DOFs (``BEAM2`` / ``QUAD4_SHELL`` / etc.).
accumulate :
``False`` (default): the supplied components **overwrite**
any previously-applied values at this ``(node, label)``
pair (mirrors the APDL ``F`` semantics + the typical
"build-and-replace" workflow).
``True``: the components **add** to the existing values —
the natural semantics for distributing a traction or
pressure across multiple element segments that share
nodes (multiple NASTRAN ``FORCE`` cards on the same GRID,
multiple Abaqus ``*CLOAD`` rows, segmented integration of
a traction-edge integral).
Examples
--------
>>> model.apply_force(tip_node, fz=-100.0) # 100 N down
>>> model.apply_force(pin_node, fx=200.0, mz=5.0)
Lump per-segment contributions on shared nodes:
>>> for seg in segments:
... for n, contribution in seg.lumped_force(node_traction).items():
... model.apply_force(n, fx=contribution, accumulate=True)
"""
components = {"FX": fx, "FY": fy, "FZ": fz, "MX": mx, "MY": my, "MZ": mz}
for label, value in components.items():
if value != 0.0:
self._f_impl(int(node), label, float(value), accumulate=accumulate)
# ---------------------------------------------------------------------
# Grid materialisation
# ---------------------------------------------------------------------
def _rebuild_grid(self) -> None:
"""Merge the existing grid with any staged ``N`` / ``E`` additions
and emit a fresh node-number-sorted grid with int32 cell structure.
The grid is the Model's sole permanent storage for geometry and
connectivity; the ``_staged_*`` dicts only hold the deltas since
the last rebuild and are cleared here. Merging via dicts (keyed
on node number) honours overwrite semantics — a later ``n()``
with an existing number replaces the grid's coordinate.
"""
# Pull existing grid rows back into dicts, then overlay staged
# additions (overwrite wins). Cheap because the grid is already
# numpy; zip into a {int: tuple} dict for the bulk merge path.
nodes: dict[int, tuple[float, float, float]] = {}
if self._grid.n_points > 0:
pts = np.asarray(self._grid.points, dtype=np.float64)
nums = np.asarray(self._grid.point_data["ansys_node_num"], dtype=np.int32)
for nn, pt in zip(nums, pts):
nodes[int(nn)] = (float(pt[0]), float(pt[1]), float(pt[2]))
nodes.update(self._staged_nodes)
elements: dict[int, tuple[list[int], int, int, int]] = {}
if self._grid.n_cells > 0:
elem_nums_existing = np.asarray(self._grid.cell_data["ansys_elem_num"], dtype=np.int32)
et_existing = np.asarray(self._grid.cell_data["ansys_elem_type_num"], dtype=np.int32)
mat_existing = np.asarray(self._grid.cell_data["ansys_material_type"], dtype=np.int32)
real_existing = np.asarray(self._grid.cell_data["ansys_real_constant"], dtype=np.int32)
pt_nums_existing = np.asarray(self._grid.point_data["ansys_node_num"], dtype=np.int32)
conn_flat = np.asarray(self._grid.cell_connectivity, dtype=np.int32)
offsets = np.asarray(self._grid.offset, dtype=np.int32)
for i, en in enumerate(elem_nums_existing):
vtk_ids = conn_flat[offsets[i] : offsets[i + 1]]
elements[int(en)] = (
[int(pt_nums_existing[j]) for j in vtk_ids],
int(et_existing[i]),
int(mat_existing[i]),
int(real_existing[i]),
)
elements.update(self._staged_elements)
if not nodes:
self._grid = pv.UnstructuredGrid()
self._staged_nodes.clear()
self._staged_elements.clear()
self._dirty = False
return
# Node numbers are 1-based and may be sparse; map to contiguous
# VTK point indices by sorted node number.
node_nums = np.fromiter(sorted(nodes), dtype=np.int32)
points = np.array([nodes[n] for n in node_nums], dtype=np.float64)
num_to_idx = {n: i for i, n in enumerate(node_nums)}
if elements:
elem_nums = np.fromiter(sorted(elements), dtype=np.int32)
cells_flat: list[int] = []
cell_types: list[int] = []
etype_nums = np.empty(len(elem_nums), dtype=np.int32)
mat_ids = np.empty(len(elem_nums), dtype=np.int32)
real_ids = np.empty(len(elem_nums), dtype=np.int32)
for i, en in enumerate(elem_nums):
nns, et_id, mat_id, real_id = elements[int(en)]
if et_id not in self._etypes:
raise KeyError(f"element {en} uses TYPE={et_id} but no ET is defined for it")
cls = self._kernel_for(et_id, len(nns))
cell_types.append(cls.vtk_cell_type)
cells_flat.append(len(nns))
for nn in nns:
if nn not in num_to_idx:
raise KeyError(f"element {en} references undefined node {nn}")
cells_flat.append(num_to_idx[nn])
etype_nums[i] = et_id
mat_ids[i] = mat_id
real_ids[i] = real_id
grid = pv.UnstructuredGrid(
np.array(cells_flat, dtype=np.int64),
np.array(cell_types, dtype=np.uint8),
points,
)
grid.cell_data["ansys_elem_num"] = elem_nums
grid.cell_data["ansys_elem_type_num"] = etype_nums
grid.cell_data["ansys_material_type"] = mat_ids
grid.cell_data["ansys_real_constant"] = real_ids
else:
grid = pv.UnstructuredGrid()
grid.points = points
grid.point_data["ansys_node_num"] = node_nums
_force_int32_cells(grid)
self._grid = grid
self._staged_nodes.clear()
self._staged_elements.clear()
self._dirty = False
# ---------------------------------------------------------------------
# Global DOF layout + sparse K/M assembly
# ---------------------------------------------------------------------
# Degenerate-topology aliases for kernels whose underlying foreign-deck ET id
# spans multiple shapes — e.g. ``HEX20`` covers HEX20 + WEDGE15 +
# PYR13. mapdl-archive (and VTK) splits such elements by cell type /
# node count; this table picks the right neutral kernel without forcing
# callers to spell the degenerate name.
_DEGENERATE_ALIASES: dict[str, dict[int, str]] = {
"HEX20": {20: "HEX20", 15: "WEDGE15", 13: "PYR13"},
}
def _kernel_for(self, et_id: int, n_nodes: int) -> type[ElementBase]:
"""Resolve the element class for a given ET id + actual node count.
A single foreign-deck ET (e.g. MAPDL 186 → HEX20) can correspond to multiple shapes once
corner nodes are collapsed; the connectivity length in the grid
disambiguates them.
"""
name = self._etypes[int(et_id)]
aliases = self._DEGENERATE_ALIASES.get(name)
if aliases is not None:
alt = aliases.get(int(n_nodes))
if alt is not None:
return _get_element(alt)
cls = _get_element(name)
if n_nodes != cls.n_nodes:
raise ValueError(
f"{name} expects {cls.n_nodes} nodes per element but an element "
f"with {n_nodes} nodes was recorded; no degenerate kernel known"
)
return cls
def _sorted_node_nums(self) -> np.ndarray:
# After ``from_grid`` / ``_rebuild_grid`` the grid is guaranteed
# node-number-sorted, so the ``point_data`` view is the answer verbatim
# (zero copy). Fall back through ``.grid`` if the model is dirty.
g = self.grid
if g.n_points == 0:
return np.empty(0, dtype=np.int32)
return np.asarray(g.point_data["ansys_node_num"], dtype=np.int32)
[docs]
def node_coord(self, node_num: int) -> tuple[float, float, float]:
"""Return ``(x, y, z)`` for the given node number.
The grid is the only permanent storage, so single-node lookups
aren't O(1) the way the old ``_nodes[nn]`` dict lookup was —
but callers that do this in a hot loop should lift the full
``grid.points`` array via :meth:`Model.grid` instead.
"""
g = self.grid
nums = np.asarray(g.point_data["ansys_node_num"])
# ``nums`` is node-number-sorted after canonicalisation, so searchsorted
# is O(log n) and matches the dict's effective lookup cost.
idx = np.searchsorted(nums, node_num)
if idx >= nums.size or int(nums[idx]) != int(node_num):
raise KeyError(f"node {node_num} not in model")
pt = g.points[idx]
return (float(pt[0]), float(pt[1]), float(pt[2]))
[docs]
def element_info(self, elem_num: int) -> tuple[list[int], int, int, int]:
"""Return ``(node_nums, et_id, mat_id, real_id)`` for an element.
Convenience accessor for the stress-recovery / strain-export
paths that previously reached into ``model._elements[en]``. O(log n)
lookup; lift ``grid.cell_data`` / ``grid.cell_connectivity`` yourself
for bulk iteration.
"""
g = self.grid
elem_nums = np.asarray(g.cell_data["ansys_elem_num"])
idx = np.searchsorted(elem_nums, elem_num)
if idx >= elem_nums.size or int(elem_nums[idx]) != int(elem_num):
raise KeyError(f"element {elem_num} not in model")
offsets = np.asarray(g.offset, dtype=np.int32)
conn = np.asarray(g.cell_connectivity, dtype=np.int32)
pt_nums = np.asarray(g.point_data["ansys_node_num"], dtype=np.int32)
vtk_ids = conn[offsets[idx] : offsets[idx + 1]]
nns = [int(pt_nums[j]) for j in vtk_ids]
return (
nns,
int(g.cell_data["ansys_elem_type_num"][idx]),
int(g.cell_data["ansys_material_type"][idx]),
int(g.cell_data["ansys_real_constant"][idx]),
)
def _grid_element_arrays(
self,
) -> tuple[
np.ndarray, # elem_nums (n_elem,) int32
np.ndarray, # n_nodes per elem (n_elem,) int32
np.ndarray, # offsets into cell_connectivity (n_elem + 1,) int32
np.ndarray, # cell_connectivity flat (total_nns,) int32 — VTK point indices
np.ndarray, # et_ids (n_elem,) int32
np.ndarray, # mat_ids (n_elem,) int32
np.ndarray, # real_ids (n_elem,) int32
np.ndarray, # point_nums (n_points,) int32 node number per VTK point
np.ndarray, # points (n_points, 3) float64
]:
"""Bulk numpy view of the grid for the assembly / strain hot paths.
Every array is a zero-copy view onto the grid's internal buffers
(assuming the grid has been canonicalised + int32-coerced) —
no dict iteration, no per-element Python lookups. Centralised
so the hot paths don't each reach into the pyvista API.
"""
g = self.grid
offsets = np.asarray(g.offset, dtype=np.int32)
conn = np.asarray(g.cell_connectivity, dtype=np.int32)
n_nodes_per_elem = np.diff(offsets).astype(np.int32, copy=False)
return (
np.asarray(g.cell_data["ansys_elem_num"], dtype=np.int32),
n_nodes_per_elem,
offsets,
conn,
np.asarray(g.cell_data["ansys_elem_type_num"], dtype=np.int32),
np.asarray(g.cell_data["ansys_material_type"], dtype=np.int32),
np.asarray(g.cell_data["ansys_real_constant"], dtype=np.int32),
np.asarray(g.point_data["ansys_node_num"], dtype=np.int32),
np.asarray(g.points, dtype=np.float64),
)
def _node_dof_labels(self) -> dict[int, tuple[str, ...]]:
"""Return ``{node: (dof_label, ...)}`` in canonical DOF order.
The DOF set for a node is the union of ``dof_labels`` across every
element that touches it. Nodes not referenced by any element default
to the three translational DOFs so unconnected nodes still fit the
global layout.
Tolerant of partial state: if a caller stages an element
referencing an ET that hasn't been declared yet (via
``APDL(model).et(...)``), the element is simply skipped here —
``_rebuild_grid`` is the method that enforces the ET invariant
at solve time. Walks the grid + the staged buffers directly
without forcing a rebuild, so pre-rebuild introspection doesn't
crash on intentionally-partial models.
"""
if self._node_dof_labels_cache is not None:
return self._node_dof_labels_cache
default = _CANONICAL_DOFS[:3]
per_node: dict[int, set[str]] = {}
# Walk the grid (authoritative committed state).
if self._grid.n_points > 0:
pt_nums = np.asarray(self._grid.point_data["ansys_node_num"], dtype=np.int32)
for n in pt_nums:
per_node[int(n)] = set()
if self._grid.n_cells > 0:
offsets = np.asarray(self._grid.offset, dtype=np.int32)
conn = np.asarray(self._grid.cell_connectivity, dtype=np.int32)
et_ids = np.asarray(self._grid.cell_data["ansys_elem_type_num"], dtype=np.int32)
pt_nums = np.asarray(self._grid.point_data["ansys_node_num"], dtype=np.int32)
for i in range(self._grid.n_cells):
et_id = int(et_ids[i])
if et_id not in self._etypes:
continue
vtk_ids = conn[offsets[i] : offsets[i + 1]]
cls = self._kernel_for(et_id, vtk_ids.size)
for j in vtk_ids:
per_node[int(pt_nums[j])].update(cls.dof_labels)
# Overlay staged incremental additions (not yet merged into the grid).
for nn in self._staged_nodes:
per_node.setdefault(int(nn), set())
for nns, et_id, _, _ in self._staged_elements.values():
if et_id not in self._etypes:
continue
cls = self._kernel_for(et_id, len(nns))
for nn in nns:
per_node.setdefault(int(nn), set()).update(cls.dof_labels)
out: dict[int, tuple[str, ...]] = {}
for nn, s in per_node.items():
out[nn] = tuple(lbl for lbl in _CANONICAL_DOFS if lbl in s) or default
self._node_dof_labels_cache = out
return out
def _dof_layout(
self,
) -> tuple[np.ndarray, dict[int, tuple[int, tuple[str, ...]]], int]:
"""Return ``(sorted_node_nums, {node: (offset, labels)}, N)``.
``offset`` is the first global DOF index for a node; ``labels`` is its
DOF-label tuple in canonical order. Cached across calls; invalidated
by any structural mutation (n, e, et).
"""
if self._layout_cache is not None:
return self._layout_cache
node_nums = self._sorted_node_nums()
node_dofs = self._node_dof_labels()
layout: dict[int, tuple[int, tuple[str, ...]]] = {}
offset = 0
for nn in node_nums:
labels = node_dofs[int(nn)]
layout[int(nn)] = (offset, labels)
offset += len(labels)
self._layout_cache = (node_nums, layout, offset)
return self._layout_cache
[docs]
def dof_map(self) -> np.ndarray:
"""Return an ``(N, 2)`` array of ``(mapdl_node_num, local_dof_idx)``.
Row ``i`` of K/M corresponds to ``dof_map()[i]``. ``local_dof_idx`` is
the 0-based position in the canonical DOF order
``(UX, UY, UZ, ROTX, ROTY, ROTZ)``. Nodes only get rows for the DOFs
the elements at them actually need, so a truss model stays 3-DOF while
a beam model gets 6.
"""
node_nums, layout, total = self._dof_layout()
out = np.empty((total, 2), dtype=np.int32)
for nn in node_nums:
offset, labels = layout[int(nn)]
for k, lbl in enumerate(labels):
out[offset + k] = (int(nn), self._DOF_LABELS[lbl])
return out
def _assemble(
self,
which: str,
lumped: bool = False,
*,
full_to_free: np.ndarray | None = None,
triu_only: bool = False,
pattern: tuple[np.ndarray, np.ndarray] | None = None,
) -> _sp.csr_array:
"""Assemble the global K or M matrix.
``full_to_free`` (optional): length-N ``int32`` array mapping
each full-system DOF index to its reduced-system index
(-1 for fixed DOFs that should be dropped entirely). When
supplied, returned matrix has shape ``(n_free, n_free)`` and
is built directly in the reduced numbering — the full-system
K/M are never materialised, which roughly halves peak
assembly RSS on BC-constrained problems.
``triu_only`` drops entries below the diagonal during the scatter
so the result is upper-triangular (diagonal + strictly-upper).
Lets callers feed MKL Pardiso's SPD Cholesky factor directly
without a separate ``scipy.sparse.triu`` extraction step —
halves the CSR's nnz footprint and skips one ``~n_free × 12 B``
transient copy. The returned matrix is tagged
``.femorph_triu = True`` so downstream solvers can detect the
storage convention.
``pattern=(row_ptr, col_idx)`` skips the symbolic pattern build
and scatters into the supplied arrays directly. Lets the caller
share indptr + indices between K and M (they're both keyed on
the same element connectivity + reduced-DOF mask, so the
sparsity structures are identical when both are assembled with
the same ``triu_only`` / ``full_to_free`` flags). The returned
matrix aliases the supplied arrays — modifying them after the
fact mutates the returned CSR.
"""
_ = self.grid # triggers _rebuild_grid if dirty (also validates nodes/types)
_, layout, N = self._dof_layout()
if full_to_free is None:
N_out = N
else:
if full_to_free.shape[0] != N:
raise ValueError(f"full_to_free length {full_to_free.shape[0]} != N={N}")
# Maximum free index + 1 is the reduced dimension.
N_out = int(full_to_free.max()) + 1 if N > 0 else 0
if N == 0 or self._grid.n_cells == 0:
return _sp.csr_array((N_out, N_out), dtype=np.float64)
# Grid-backed numpy views of the element table. ``conn`` holds
# VTK point indices (0-based, contiguous after canonicalisation);
# ``pt_nums`` maps them back to node numbers when we need
# them for DOF-layout lookup. ``points`` is indexed by VTK
# point index directly, so per-group coord gather is a single
# fancy-index (no ``searchsorted``).
(
_elem_nums,
n_nodes_per_elem,
offsets,
conn,
et_ids,
mat_ids,
real_ids,
pt_nums,
points,
) = self._grid_element_arrays()
# Group by (et_id, n_nodes, material-VALUE, real-VALUE) — NOT
# raw ``mat_id`` / ``real_id``. Decks like ``ptr.cdb`` assign a
# unique ``mat_id`` per cell even when every entry holds
# identical material properties; keying on the ID value fans out
# 5000+ singleton groups and defeats ``ke_batch`` / ``me_batch``.
# Keying on the hashed property tuple collapses it to the real
# element-type groups (4 on PTR: 3069 hex + 497 pyramid +
# 19 wedge + 1581 tet) and restores batching.
# ``n_nodes`` joins the key so degenerate topologies under a
# shared foreign-deck ET id (e.g. MAPDL 186 → HEX20 / WEDGE15 / PYR13) land
# in distinct groups.
mat_cache: dict[int, tuple] = {}
real_cache: dict[int, tuple] = {}
def _mat_key(mid: int) -> tuple:
key = mat_cache.get(mid)
if key is None:
props = self._materials.get(mid, {})
# Sort items so dict-ordering doesn't split identical
# materials across groups.
key = tuple(sorted(props.items()))
mat_cache[mid] = key
return key
def _real_key(rid: int) -> tuple:
key = real_cache.get(rid)
if key is None:
vals = self._real_constants.get(rid, np.empty(0, dtype=np.float64))
# ``tobytes`` is O(n_vals) but n_vals is tiny (≤ 10 for
# any real-constant set).
key = (vals.shape, np.ascontiguousarray(vals).tobytes())
real_cache[rid] = key
return key
# Key -> (et_id, n_nodes, representative_mat_id,
# representative_real_id, cell_idx_list). Keep a
# representative ``mat_id`` / ``real_id`` for each group so we
# can look up the material dict / real-constant array below.
groups: dict[tuple, tuple[int, int, int, int, list[int]]] = {}
for i in range(et_ids.size):
mid = int(mat_ids[i])
rid = int(real_ids[i])
key = (int(et_ids[i]), int(n_nodes_per_elem[i]), _mat_key(mid), _real_key(rid))
entry = groups.get(key)
if entry is None:
groups[key] = (int(et_ids[i]), int(n_nodes_per_elem[i]), mid, rid, [i])
else:
entry[4].append(i)
# Per-group element matrices and DOF maps. ``dofs_per_elem`` is
# int32 — what the C++ scatter and scipy CSR both want.
group_dofs: list[np.ndarray] = []
group_mats: list[np.ndarray] = []
for et_id, nn_per_elem, mat_id, real_id, cell_idx_list in groups.values():
cls = self._kernel_for(et_id, nn_per_elem)
material = self._materials.get(mat_id, {})
real = self._real_constants.get(real_id, np.empty(0, dtype=np.float64))
cell_idx = np.asarray(cell_idx_list, dtype=np.int32)
# (n_grp, nn_per_elem) VTK point indices for each element in
# the group. ``offsets[cell_idx][:, None] + arange(nn)`` is a
# vectorised gather over the flat connectivity.
base = offsets[cell_idx][:, None]
per_elem_cols = np.arange(nn_per_elem, dtype=np.int32)[None, :]
elem_vtk_ids = conn[base + per_elem_cols] # (n_grp, nn_per_elem)
coords_arr = points[elem_vtk_ids] # (n_grp, nn_per_elem, 3) — zero dict hits
if which == "K":
batch_fn = getattr(cls, "ke_batch", None)
mats = batch_fn(coords_arr, material, real) if batch_fn else None
if mats is None:
mats = np.stack(
[cls.ke(coords_arr[i], material, real) for i in range(cell_idx.size)]
)
else:
batch_fn = getattr(cls, "me_batch", None)
mats = batch_fn(coords_arr, material, real, lumped=lumped) if batch_fn else None
if mats is None:
mats = np.stack(
[
cls.me(coords_arr[i], material, real, lumped=lumped)
for i in range(cell_idx.size)
]
)
# Per-element DOF indices. Layout is keyed by node
# number, so translate through ``pt_nums`` once per unique
# node touched by this group — cheaper than doing it per
# (elem, node) pair.
unique_vtk, inverse = np.unique(elem_vtk_ids.ravel(), return_inverse=True)
unique_mapdl = pt_nums[unique_vtk]
# Fast path: homogeneous DOF labels across every node in the
# group. Holds whenever a node is touched only by elements
# sharing the same ``dof_labels`` — in particular, the common
# case where the whole model is one element kind (HEX8,
# BEAM2, etc.). At 300×300×2 HEX8 (271k nodes × 3
# labels per K/M assembly) the vectorised path replaces
# ~800k Python iterations with two numpy ops, roughly
# 200 ms → 5 ms per assembly call. When labels vary per
# node (mixed-element models where a node sits between a
# HEX8 and a BEAM2 group) we fall back to the
# per-node lookup below.
first_labels = layout[int(unique_mapdl[0])][1]
homogeneous = True
for nn in unique_mapdl[1:]:
if layout[int(nn)][1] != first_labels:
homogeneous = False
break
if homogeneous:
bases = np.fromiter(
(layout[int(nn)][0] for nn in unique_mapdl),
dtype=np.int32,
count=unique_mapdl.size,
)
label_offsets = np.array(
[first_labels.index(lbl) for lbl in cls.dof_labels],
dtype=np.int32,
)
node_dofs = bases[:, None] + label_offsets[None, :]
else:
node_dofs = np.empty((unique_vtk.size, cls.n_dof_per_node), dtype=np.int32)
for u_idx, nn in enumerate(unique_mapdl):
layout_base, labels = layout[int(nn)]
for k, lbl in enumerate(cls.dof_labels):
node_dofs[u_idx, k] = layout_base + labels.index(lbl)
dofs_per_elem = node_dofs[inverse].reshape(
cell_idx.size, nn_per_elem * cls.n_dof_per_node
)
if full_to_free is not None:
# Remap to reduced numbering; fixed DOFs become -1 and the
# C++ scatter drops their rows + cols entirely.
dofs_per_elem = full_to_free[dofs_per_elem]
group_dofs.append(np.ascontiguousarray(dofs_per_elem, dtype=np.int32))
group_mats.append(np.ascontiguousarray(mats, dtype=np.float64))
# Build (or reuse) the symbolic CSR pattern from every group's
# connectivity. When the pattern is for the full system it's
# cached for reuse across K and M (geometry-only). For the
# reduced path we rebuild per call — the mask is a property of
# the BC set, not the mesh, and the cost is dominated by the
# factor anyway. ``triu_only`` always bypasses the cache — the
# triu pattern is K-specific.
if pattern is not None:
row_ptr, col_idx = pattern
elif full_to_free is None and not triu_only:
row_ptr, col_idx = self._get_or_build_csr_pattern(group_dofs, N)
else:
row_ptr, col_idx = self._build_csr_pattern(group_dofs, N_out, triu_only=triu_only)
data = np.zeros(col_idx.shape[0], dtype=np.float64)
for dofs_pe, mats in zip(group_dofs, group_mats):
_core.scatter_into_csr(
dofs_pe, mats, row_ptr, col_idx, data, N_out, triu_only=triu_only
)
# The standard ``_sp.csr_array((data, col_idx, row_ptr), ...)``
# constructor runs ``check_format`` which copies ``data`` and
# ``col_idx`` into fresh arrays — a transient 2× spike that
# shows up as ~90 MB of avoidable peak RSS on a 200×200×2
# triu HEX8 plate. We've just populated the arrays with
# the C++ scatter so we know they're valid (sorted column
# indices per row from ``build_csr_pattern``, matching dtype,
# contiguous) — construct the csr_array by assigning to the
# private attributes directly so scipy skips the copies.
out = _sp.csr_array.__new__(_sp.csr_array)
out._shape = (N_out, N_out)
out.indptr = row_ptr
out.indices = col_idx
out.data = data
# Required by scipy's internal caches / maxprint:
out._maxprint = 50
if triu_only:
# Tag so Pardiso / ARPACK can skip their own triu extraction and
# avoid calling K @ x (which would only compute the triu-product).
out.femorph_triu = True
return out
def _build_csr_pattern(
self, group_dofs: list[np.ndarray], N: int, *, triu_only: bool = False
) -> tuple[np.ndarray, np.ndarray]:
"""Symbolic CSR pattern from element-DOF groups, no caching.
Supports ``-1`` sentinels in ``group_dofs`` (the reduced-assembly
path uses them for filtered-out fixed DOFs); the C++ builder and
the scipy fallback both ignore them.
``triu_only=True`` drops entries below the diagonal so the result
is upper-triangular — see :meth:`_assemble` for why that's useful.
"""
if len(group_dofs) == 1:
return _core.build_csr_pattern(group_dofs[0], N, triu_only=triu_only)
# Mixed ``ndof_per_elem`` across groups: pad each group's per-row
# DOF array up to the max with ``-1`` sentinels, stack, then hand
# the merged block to the single-group C++ builder. The kernel
# already treats ``-1`` entries as fixed-and-dropped (matches
# the reduced-assembly contract), so padded slots are ignored
# with zero extra bookkeeping.
#
# The cost of the extra "wasted" cells is small: on ``ptr.cdb``
# padding 1581 TET10 rows (10 nodes × 3 DOF = 30) up to 60
# plus 497 PYR13 rows (13 × 3 = 39) adds ~50 k cells to a
# 5166-row block. In exchange we skip the scipy ``coo → csr``
# fallback (which materialised ~1.47 M COO entries, sorted them
# and deduplicated with a Python-level unique) AND pick up the
# OpenMP row-parallel speedup in the C++ builder. Net on PTR:
# ~500 ms → ~120 ms pattern build at 4 threads.
max_ndof = max(int(g.shape[1]) for g in group_dofs)
n_elem_total = sum(int(g.shape[0]) for g in group_dofs)
padded = np.full((n_elem_total, max_ndof), -1, dtype=np.int32)
off = 0
for g in group_dofs:
n, nd = g.shape
padded[off : off + n, :nd] = g
off += n
return _core.build_csr_pattern(padded, N, triu_only=triu_only)
def _get_or_build_csr_pattern(
self, group_dofs: list[np.ndarray], N: int
) -> tuple[np.ndarray, np.ndarray]:
cached = self._csr_pattern_cache
if cached is not None and cached[0] == N:
return cached[1], cached[2]
row_ptr, col_idx = self._build_csr_pattern(group_dofs, N)
self._csr_pattern_cache = (N, row_ptr, col_idx)
return row_ptr, col_idx
[docs]
def stiffness_matrix(self) -> _sp.csr_array:
"""Assemble the global stiffness matrix as a ``scipy.sparse.csr_array``.
Row/column ``i`` corresponds to :meth:`dof_map` row ``i``.
Cached on the Model: repeat calls on an unchanged Model skip the
~2 s assembly phase entirely. The cache is invalidated by any
structural (``n`` / ``e`` / ``et``) or material (``mp`` / ``r``)
mutation.
"""
if self._K_cache is not None and not self._dirty:
return self._K_cache
K = self._assemble("K")
self._K_cache = K
return K
[docs]
def mass_matrix(self, lumped: bool = False) -> _sp.csr_array:
"""Assemble the global mass matrix as a ``scipy.sparse.csr_array``.
Same caching semantics as :meth:`stiffness_matrix` — consistent
and lumped variants are cached independently.
"""
if not self._dirty:
cached = self._M_lumped_cache if lumped else self._M_cache
if cached is not None:
return cached
M = self._assemble("M", lumped=lumped)
if lumped:
self._M_lumped_cache = M
else:
self._M_cache = M
return M
# ---------------------------------------------------------------------
# Analysis drivers
# ---------------------------------------------------------------------
def _global_index(
self,
node: int,
dof_canonical: int,
layout: dict[int, tuple[int, tuple[str, ...]]],
) -> int:
if node not in layout:
raise KeyError(f"DOF referenced undefined node {node}")
offset, labels = layout[node]
label = _CANONICAL_DOFS[dof_canonical]
if label not in labels:
raise KeyError(f"node {node} has no {label} DOF (active labels: {labels})")
return offset + labels.index(label)
def _load_vector(self) -> np.ndarray:
_, layout, N = self._dof_layout()
F = np.zeros(N, dtype=np.float64)
for (node, dof), value in self._force_bc.items():
F[self._global_index(int(node), dof, layout)] = value
return F
def _prescribed_dofs(self) -> dict[int, float]:
_, layout, _ = self._dof_layout()
out: dict[int, float] = {}
for (node, dof), value in self._disp_bc.items():
out[self._global_index(int(node), dof, layout)] = value
return out
[docs]
def strain(self, displacement: np.ndarray) -> dict[int, np.ndarray]:
"""Compute per-element elastic strain from a global displacement vector.
Returns a dict mapping element number → ``(n_nodes, 6)``
engineering-shear Voigt strain sampled at each element node.
Shares the kernel path with :func:`write_static_result` /
:func:`write_modal_result` but stays in memory — nothing hits
disk. Use this to recover strain from any displacement field
(static result, one modal shape, a morphed guess) without
round-tripping through ``.pv``.
``displacement`` must be indexed by :meth:`dof_map` (the same
layout the solvers produce). Elements whose kernel returns
``None`` from ``eel_batch`` are skipped.
"""
_ = self.grid
_, layout, _N = self._dof_layout()
displacement = np.ascontiguousarray(displacement, dtype=np.float64)
out: dict[int, np.ndarray] = {}
if self._grid.n_cells == 0:
return out
(
elem_nums_arr,
n_nodes_per_elem,
offsets,
conn,
et_ids,
mat_ids,
real_ids,
pt_nums,
points,
) = self._grid_element_arrays()
# Group by (et_id, n_nodes, mat_id, real_id) so each batch calls
# a single ``eel_batch`` kernel on a contiguous coord buffer.
groups: dict[tuple[int, int, int, int], list[int]] = {}
for i in range(et_ids.size):
key = (int(et_ids[i]), int(n_nodes_per_elem[i]), int(mat_ids[i]), int(real_ids[i]))
groups.setdefault(key, []).append(i)
for (et_id, nn_per_elem, _mat, _real), cell_idx_list in groups.items():
cls = self._kernel_for(et_id, nn_per_elem)
batch_fn = getattr(cls, "eel_batch", None)
if batch_fn is None:
continue
cell_idx = np.asarray(cell_idx_list, dtype=np.int32)
base = offsets[cell_idx][:, None]
per_elem_cols = np.arange(nn_per_elem, dtype=np.int32)[None, :]
elem_vtk_ids = conn[base + per_elem_cols] # (n_grp, nn_per_elem)
coords = points[elem_vtk_ids]
# Gather each element's global DOFs from the layout, same as
# the assembly path. Translate VTK → element-number once per unique
# node before the per-unique Python dict hit.
unique_vtk, inverse = np.unique(elem_vtk_ids.ravel(), return_inverse=True)
unique_mapdl = pt_nums[unique_vtk]
node_dofs = np.empty((unique_vtk.size, cls.n_dof_per_node), dtype=np.int64)
for u_idx, nn in enumerate(unique_mapdl):
layout_base, labels = layout[int(nn)]
for k, lbl in enumerate(cls.dof_labels):
node_dofs[u_idx, k] = layout_base + labels.index(lbl)
dofs_per_elem = node_dofs[inverse].reshape(
cell_idx.size, nn_per_elem * cls.n_dof_per_node
)
u_e = np.ascontiguousarray(displacement[dofs_per_elem])
# Plane kernels need ν (and the plane-stress vs plane-strain
# selector) to fill εzz; 3D solids accept the kwarg and
# ignore it.
material = self._materials.get(int(_mat))
eps = batch_fn(np.ascontiguousarray(coords), u_e, material=material)
if eps is None:
continue
group_elem_nums = elem_nums_arr[cell_idx]
for i, en in enumerate(group_elem_nums):
out[int(en)] = np.asarray(eps[i])
return out
[docs]
def solve(
self,
*,
linear_solver: str = "auto",
progress: bool = False,
) -> StaticResult:
"""Run a linear static analysis.
Uses the K assembled from current elements, the D-staged Dirichlet
BCs, and the F-staged nodal forces. Returns a ``StaticResult`` whose
``displacement``/``reaction`` arrays are indexed by
:meth:`dof_map`. ``linear_solver`` picks the sparse backend (see
:func:`femorph_solver.solvers.linear.list_linear_solvers`).
Parameters
----------
linear_solver : str, default ``"auto"``
Sparse linear backend name.
progress : bool, default ``False``
Show a per-stage progress display (``rich`` → ``tqdm`` →
logger fallback). Off by default for scripted use; the
CLI front-end flips it on.
"""
from femorph_solver._progress import ProgressReporter # noqa: PLC0415
from femorph_solver.solvers.static import solve_static as _solve_static # noqa: PLC0415
with ProgressReporter(enabled=progress) as report:
with report.stage("assemble K"):
K = self.stiffness_matrix()
with report.stage("build load vector"):
F = self._load_vector()
prescribed = self._prescribed_dofs()
with report.stage("factor + solve"):
return _solve_static(K, F, prescribed, linear_solver=linear_solver)
[docs]
def estimate_solve(
self,
n_modes: int = 10,
*,
linear_solver: str = "auto",
eigen_solver: str = "arpack",
ooc: bool = False,
host_spec=None,
):
"""Predicted ``(wall_s, peak_rss_mb)`` for a modal solve on this
model and machine, **without** running it.
Wraps :class:`~femorph_solver.estimators.Estimator` — reads
training rows from any ``femorph-benchmark-*.json`` files in
the current working directory and fits per-(host, backend)
log-log power laws on the fly. No training data → falls
back to a shape-of-universe prior tuned from the repo's own
14900KF benchmarks; the returned ``p95`` band then sits at
2× ``p50`` so callers can tell they're looking at a prior,
not a calibrated prediction.
Parameters
----------
n_modes : int
Modes requested (matches :meth:`modal_solve`).
linear_solver, eigen_solver : str
Backend names — picks which per-bucket fit applies.
ooc : bool, default False
``True`` applies the measured OOC multipliers
(``+3.5×`` wall, ``−22 %`` peak) on top of the in-core
prediction; useful when ``modal_solve`` would otherwise
exceed RAM.
host_spec : HostSpec or None
Override host detection (useful for "what would this
take on a different box?" queries). ``None`` → auto-
detect.
Returns
-------
:class:`femorph_solver.estimators.Estimate`
``p50`` / ``p95`` bands + the training-bucket diagnostic
the :class:`Estimator` picked.
"""
from femorph_solver.estimators import ( # noqa: PLC0415
Estimator,
HostSpec,
ProblemSpec,
load_training_rows,
)
problem = ProblemSpec.from_model(
self,
n_modes=n_modes,
linear_solver=linear_solver,
eigen_solver=eigen_solver,
ooc=ooc,
)
host = host_spec or HostSpec.auto()
est = Estimator.fit(load_training_rows())
return est.predict(problem, host=host)
[docs]
def modal_solve(
self,
n_modes: int = 10,
lumped: bool = False,
*,
eigen_solver: str = "arpack",
linear_solver: str = "auto",
tol: float = 1e-12,
release_inputs: bool = True,
mixed_precision: bool | None = False,
progress: bool = False,
) -> ModalResult:
"""Run a linear modal analysis.
Solves ``K φ = ω² M φ`` with D-staged Dirichlet BCs applied.
Returns the lowest ``n_modes`` modes, ordered by frequency.
``eigen_solver`` picks the sparse eigenbackend (``"arpack"``,
``"lobpcg"``, ``"primme"``, ``"dense"``); ``linear_solver``
picks the inner shift-invert factorizer. The default
``"auto"`` picks the fastest SPD direct solver available
(pardiso → cholmod → umfpack → superlu); pass an explicit name
(``"superlu"``, ``"cholmod"``, …) to override. See
:func:`femorph_solver.solvers.eigen.list_eigen_solvers` and
:func:`femorph_solver.solvers.linear.list_linear_solvers`.
``release_inputs=True`` (the **default**) drops the Model's
cached full K / M the moment the BC-reduced ``K_ff`` / ``M_ff``
are built, and before the Pardiso factor allocates. On a
1.2 M-DOF HEX20 problem that saves ~2 GB of peak RSS (the
full K and M each run ~1 GB + scaling with density).
Time cost: negligible on the first call — CPython's refcount
frees the full K / M immediately when the BC-reduce helper
returns (no ``gc.collect`` involved; the sparse-matrix graph
has no cycles so a full GC would be pure overhead). On
**repeat calls on the same Model** however the full K / M
must be re-assembled from elements (~1-3 s at large sizes)
because the cache was released; parametric sweeps should
pass ``release_inputs=False`` to keep the warm-assembly
cache.
``mixed_precision`` controls the inner Pardiso factor
precision (ignored by non-Pardiso backends):
* ``False`` (**default**) — factor and solve in double
precision. Identical frequencies to established commercial solvers at machine-epsilon.
* ``True`` — factor in single precision and refine with
double-precision residuals. Halves the factor's permanent
footprint when the linked MKL honours the request; the
refinement typically recovers ≥13-digit eigenvalue accuracy
on well-conditioned SPD stiffness matrices.
* ``None`` — let femorph-solver decide: opts in for Pardiso
SPD factorisation above ~50 k free DOFs, where the memory
saving matters most. Not recommended for production models
that are known to be ill-conditioned (e.g. highly
anisotropic composites, degenerate meshes with
near-zero-Jacobian cells).
.. note::
Mixed-precision Pardiso depends on MKL exposing
``iparm(28)`` through pypardiso; some MKL builds silently
no-op the request. Check ``lu._solver.get_iparm(28)`` after
a factorize if you need to confirm the request was honoured.
"""
from femorph_solver._progress import ProgressReporter # noqa: PLC0415
with ProgressReporter(enabled=progress) as report:
if release_inputs:
return self._modal_solve_release(
n_modes=n_modes,
lumped=lumped,
eigen_solver=eigen_solver,
linear_solver=linear_solver,
tol=tol,
mixed_precision=mixed_precision,
report=report,
)
from femorph_solver.solvers.modal import solve_modal as _solve_modal # noqa: PLC0415
with report.stage("assemble K"):
K = self.stiffness_matrix()
with report.stage("assemble M"):
M = self.mass_matrix(lumped=lumped)
prescribed = self._prescribed_dofs()
with report.stage("reduce BCs + eigensolve"):
return _solve_modal(
K,
M,
prescribed,
n_modes=n_modes,
eigen_solver=eigen_solver,
linear_solver=linear_solver,
tol=tol,
mixed_precision=mixed_precision,
)
def _modal_solve_release(
self,
*,
n_modes: int,
lumped: bool,
eigen_solver: str,
linear_solver: str,
tol: float,
mixed_precision: bool | None = False,
report: object | None = None,
) -> ModalResult:
"""Peak-RSS-minimising modal solve — see ``modal_solve(release_inputs=True)``.
Goes directly from elements to the BC-reduced ``K_ff`` / ``M_ff``
via a single element-level scatter pass per matrix (see
:meth:`_bc_reduce_and_release`). The full ``K`` / ``M`` are
never materialised, so peak assembly RSS is bounded by
``K_ff + M_ff`` rather than ``K + M + K_ff + M_ff``.
When the resolved linear-solver backend is Pardiso, K_ff is
assembled as upper-triangular only — halves its CSR storage
and skips the duplicate ``sp.triu`` extraction Pardiso would
otherwise make. Shift-invert ARPACK at σ=0 never calls
``K @ x`` (verified via scipy's internal path), so the triu
storage doesn't break the eigensolve.
"""
from femorph_solver._progress import ProgressReporter # noqa: PLC0415
from femorph_solver.solvers.linear import select_default_linear_solver # noqa: PLC0415
from femorph_solver.solvers.modal import solve_modal_reduced # noqa: PLC0415
_null = ProgressReporter(enabled=False)
report = report if report is not None else _null
prescribed = self._prescribed_dofs()
# Decide whether to assemble K_ff as triu-only. Needs:
# - the linear solver to accept upper-triangular SPD input (only
# Pardiso currently — CHOLMOD's wrapper does its own sp.tril);
# - the eigensolver to be an ARPACK-flavour shift-invert at σ=0
# (the only mode that doesn't call ``K @ x``).
_, _, N = self._dof_layout()
n_free_est = N - len(prescribed) # upper bound; rigid-body reduction may shrink
resolved_linear = (
select_default_linear_solver(spd=True, n_dof=n_free_est)
if linear_solver == "auto"
else linear_solver
)
triu_k = resolved_linear == "pardiso" and eigen_solver in ("auto", "arpack")
# M-as-triu is orthogonal to K — it only needs the eigen backend
# to route M @ x through ``_maybe_parallel_matvec`` (which
# detects ``femorph_triu`` and applies the symmetric wrapper).
# The scipy dense path in ``solve_modal_reduced`` also honours
# the tag via ``la.eigh(..., lower=False)``. Safe for every
# eigen backend we ship.
triu_m = eigen_solver in ("auto", "arpack")
with report.stage("assemble K_ff + M_ff"):
K_ff, M_ff, free = self._bc_reduce_and_release(
prescribed, lumped=lumped, triu_k=triu_k, triu_m=triu_m
)
with report.stage("factor + eigensolve"):
return solve_modal_reduced(
K_ff,
M_ff,
free,
n_modes=n_modes,
eigen_solver=eigen_solver,
linear_solver=linear_solver,
tol=tol,
mixed_precision=mixed_precision,
)
def _bc_reduce_and_release(
self,
prescribed: dict[int, float],
*,
lumped: bool,
triu_k: bool = False,
triu_m: bool = False,
) -> tuple[_sp.csr_array, _sp.csr_array, np.ndarray]:
"""Assemble BC-reduced K_ff / M_ff directly from element contributions.
Builds the reduced matrices in one element-level scatter pass each —
the full N×N K / M are never materialised, so peak RSS during
assembly is roughly ``K_ff + M_ff`` instead of
``K + M + K_ff + M_ff``. On a 500 k-DOF plate that saves ~700 MB
of peak.
``triu_k=True`` assembles K_ff as upper-triangular only (diagonal +
strictly upper). Halves K's CSR footprint and lets the Pardiso
SPD path skip its own ``sp.triu`` extraction.
``triu_m=True`` assembles M_ff as upper-triangular only too.
Halves M's CSR footprint. Callers that need ``M @ x`` (ARPACK
generalized shift-invert) must route through
``_maybe_parallel_matvec`` which detects the tag and applies
``y = M @ x + M.T @ x - diag(M) * x`` — the symmetric product,
no lower-triangular mirror allocation. Only safe when M is
truly symmetric (all consistent / lumped element-mass matrices
on this codebase are).
Rigid-body DOFs (zero diagonal stiffness, e.g. TRUSS2 transverse)
are caught post-assembly via ``K_ff.diagonal()`` and dropped with a
second ``csr_submatrix_free`` pass — the common case (HEX20 /
HEX8 etc.) has none so the extra pass is skipped entirely.
"""
_, _layout, N = self._dof_layout()
# Drop any stale cached K / M so the element-level build below
# doesn't double-allocate alongside them.
self.release_matrix_cache()
mask_fixed = np.zeros(N, dtype=bool)
for idx in prescribed:
mask_fixed[int(idx)] = True
free_pre = ~mask_fixed
n_free_pre = int(free_pre.sum())
if n_free_pre == 0 or N == 0 or self._grid.n_cells == 0:
empty = _sp.csr_array((0, 0), dtype=np.float64)
return empty, empty, free_pre
full_to_free = np.full(N, -1, dtype=np.int32)
full_to_free[free_pre] = np.arange(n_free_pre, dtype=np.int32)
K_ff = self._assemble("K", full_to_free=full_to_free, triu_only=triu_k)
# K and M share element connectivity + the reduced-DOF mask, so
# their sparsity patterns (row_ptr + col_idx) are identical when
# both are built with the same ``triu_only`` flag. Pass K's
# pattern arrays into M's build so M aliases them instead of
# rebuilding its own copies — saves ~(n_free + nnz) × 4 B
# (≈30 MB on a 200×200×2 triu HEX8 plate).
shared_pattern = (K_ff.indptr, K_ff.indices) if triu_k == triu_m else None
M_ff = self._assemble(
"M",
lumped=lumped,
full_to_free=full_to_free,
triu_only=triu_m,
pattern=shared_pattern,
)
# Rigid-body pass — drop rows whose K diagonal is below a relative
# pivot tolerance. Matches the tolerance from the old post-hoc
# ``_bc_reduce`` path so parity with existing tests holds. The
# diagonal is present in both full and triu CSR so this check works
# regardless of ``triu_k``.
k_diag = K_ff.diagonal()
scale = float(np.max(np.abs(k_diag))) if k_diag.size else 1.0
tol_pivot = 1e-12 * scale if scale > 0 else 1e-30
rigid = np.abs(k_diag) <= tol_pivot
if rigid.any():
n_free = int(n_free_pre - rigid.sum())
reduce_map = np.full(n_free_pre, -1, dtype=np.int32)
reduce_map[~rigid] = np.arange(n_free, dtype=np.int32)
K_ff = self._csr_submatrix(K_ff, reduce_map, n_free)
M_ff = self._csr_submatrix(M_ff, reduce_map, n_free)
if triu_k:
# csr_submatrix_free preserves the triu-ness of the input
# (it only picks rows/cols, no transpose), but the tagging
# doesn't survive the construction of the new csr_array.
K_ff.femorph_triu = True
if triu_m:
M_ff.femorph_triu = True
# Propagate the rigid drop back into the full-DOF mask so the
# free-mask returned to ``solve_modal_reduced`` reflects both
# prescribed + rigid-body removals.
pre_free_indices = np.where(free_pre)[0]
mask_fixed[pre_free_indices[rigid]] = True
# Drop the per-node DOF layout caches before the caller
# allocates the Pardiso factor. These are pure memoization
# (``{node: (offset, labels)}`` dicts, ~O(n_nodes) with dict
# overhead) reconstructable from ``self._grid`` /
# ``self._etypes`` in a few ms. At 100×100×2 HEX8 they
# hold ~18 MB; at 300×300×2, ~160 MB. Holding them through
# the 1-3 GB Pardiso factor + eigsh shift-invert is dead
# weight — the assembly that needed them is finished, and
# neither Pardiso nor eigsh touches them. A subsequent
# ``modal_solve`` on the same Model rebuilds the caches on
# demand; the only cost is re-walking the grid once per
# solve, which is a tiny fraction of a second.
self._layout_cache = None
self._node_dof_labels_cache = None
# glibc arena hygiene before the caller (typically
# ``solve_modal_reduced``) allocates the 1-3 GB Pardiso
# Cholesky factor on top. The large transient scratches in
# assembly (``ke_batch`` / ``me_batch`` at hundreds of MB,
# ``_build_csr_pattern`` scratch at tens of MB) go through
# ``mmap`` directly — those are already returned to the OS
# on ``free()`` and ``malloc_trim`` can't touch them. What
# IS sometimes reclaimable: smaller allocations (coords_arr
# copies, dof-map scratch, scipy CSR temporaries) that land
# in glibc's arena and sit there as cached chunks. On a
# 200×200×2 HEX8 plate this typically reclaims 5-15 MB
# post-assembly — a small win but no-cost on platforms
# where ``malloc_trim`` exists (Linux/glibc) and a no-op
# everywhere else. Kept here as cheap hygiene, not a
# headline optimisation: the real peak-RSS driver is the
# Pardiso factor itself, which is attacked separately.
from femorph_solver._malloc import trim_allocator # noqa: PLC0415
trim_allocator()
return K_ff, M_ff, ~mask_fixed
@staticmethod
def _csr_submatrix(A: _sp.csr_array, full_to_free: np.ndarray, n_free: int) -> _sp.csr_array:
"""Extract ``A[keep][:, keep]`` in CSR using the C++ parallel path."""
ip, ix, d = _core.csr_submatrix_free(
np.ascontiguousarray(A.indptr, dtype=np.int32),
np.ascontiguousarray(A.indices, dtype=np.int32),
np.ascontiguousarray(A.data, dtype=np.float64),
full_to_free,
n_free,
)
return _sp.csr_array((d, ix, ip), shape=(n_free, n_free))
[docs]
def transient_solve(
self,
*,
dt: float,
n_steps: int,
load: Callable[[float], np.ndarray] | np.ndarray | None = None,
lumped: bool = False,
damping: _sp.csr_array | None = None,
u0: np.ndarray | None = None,
v0: np.ndarray | None = None,
beta: float = 0.25,
gamma: float = 0.5,
) -> TransientResult:
"""Run a linear transient analysis.
Integrates ``M ü + C u̇ + K u = F(t)`` with Newmark-β. The load
``F(t)`` defaults to the F-staged nodal forces held constant over
the interval; pass a callable ``t -> (N,)`` for time-varying loads
or a fixed array to override the staged values.
"""
from femorph_solver.solvers.transient import solve_transient as _solve_transient
K = self.stiffness_matrix()
M = self.mass_matrix(lumped=lumped)
prescribed = self._prescribed_dofs()
F = self._load_vector() if load is None else load
return _solve_transient(
K,
M,
F,
dt=dt,
n_steps=n_steps,
prescribed=prescribed,
C=damping,
u0=u0,
v0=v0,
beta=beta,
gamma=gamma,
)
[docs]
def harmonic_solve(
self,
*,
frequencies: np.ndarray | Sequence[float],
n_modes: int = 10,
load: np.ndarray | None = None,
modal_damping_ratio: float | Sequence[float] | np.ndarray | None = None,
rayleigh: tuple[float, float] | None = None,
lumped: bool = False,
eigen_solver: str = "arpack",
linear_solver: str = "auto",
tol: float = 1e-12,
modal_result: ModalResult | None = None,
) -> HarmonicResult:
"""Run a frequency-response (harmonic) analysis by modal superposition.
Solves ``(K - ω² M + i ω C) u(ω) = F`` at each excitation
frequency by projecting onto the lowest ``n_modes`` mass-
orthonormal eigenmodes of the undamped system. ``C`` is
introduced through per-mode damping ratios — see
:func:`femorph_solver.solvers.harmonic.solve_harmonic_modal`
for the full superposition / damping math.
Parameters
----------
frequencies
``(n_freq,)`` excitation frequencies [Hz].
n_modes
Number of modes retained in the superposition. Ignored
when ``modal_result`` is supplied.
load
``(N,)`` complex (or real) DOF-indexed load. Defaults to
the F-staged nodal forces from :meth:`apply_force`.
modal_damping_ratio
Scalar (uniform) or per-mode damping ratio ζ_n ∈ [0, 1).
rayleigh
``(α, β)`` Rayleigh constants such that
``ζ_n = α/(2 ω_n) + β ω_n / 2``. May be combined with
``modal_damping_ratio``.
lumped, eigen_solver, linear_solver, tol
Forwarded to the underlying :meth:`modal_solve` when
``modal_result`` is not supplied.
modal_result
Reuse a precomputed :class:`ModalResult` instead of
recomputing — useful for parametric forcing sweeps where
the eigenproblem doesn't change.
Returns
-------
HarmonicResult
Frequency-domain response with ``.displacement`` shaped
``(n_freq, N)`` complex.
"""
from femorph_solver.solvers.harmonic import solve_harmonic_modal as _solve
K = self.stiffness_matrix()
M = self.mass_matrix(lumped=lumped)
prescribed = self._prescribed_dofs()
F = self._load_vector() if load is None else load
return _solve(
K,
M,
F,
frequencies=frequencies,
n_modes=n_modes,
prescribed=prescribed,
modal_damping_ratio=modal_damping_ratio,
rayleigh=rayleigh,
eigen_solver=eigen_solver,
linear_solver=linear_solver,
tol=tol,
modal_result=modal_result,
)
[docs]
def cyclic_modal_solve(
self,
*,
low_face: np.ndarray,
high_face: np.ndarray,
n_sectors: int,
n_modes: int = 10,
sector_angle: float | None = None,
pair_rotation: np.ndarray | None = None,
harmonic_indices: Sequence[int] | None = None,
lumped: bool = False,
linear_solver: str = "auto",
) -> list[CyclicModalResult]:
"""Solve the cyclic-symmetry modal problem on a single base sector.
Wraps :func:`femorph_solver.solvers.cyclic.solve_cyclic_modal`
with the Model's ``K`` / ``M`` and a node-array → DOF-array
translation, so the caller works in node space (the same space
as :meth:`fix` / :meth:`apply_force`).
Parameters
----------
low_face, high_face : ndarray
Nodes on the rotor's two cyclic faces. Either node
numbers (1-D int array) **or** boolean masks of length
``Model.n_nodes`` indexed in grid-point order — the same
forms accepted by :meth:`fix`. Both faces must list the
same number of paired nodes in matching order.
n_sectors : int
Number of repetitions that close the rotor (``N``).
n_modes : int, default 10
Number of lowest modes per harmonic index.
sector_angle : float, optional
Sector opening angle in radians (defaults to
``2π / n_sectors``). Used only to derive
``pair_rotation`` when the latter is omitted.
Override on a non-uniform sector or when low/high faces
already sit in a per-sector local frame (then pass
``pair_rotation=None`` and ``sector_angle=None``).
pair_rotation : (3, 3) array, optional
Spatial rotation R(α) mapping sector j's local frame to
sector j+1's. When ``None``, derived from
``sector_angle`` as a +Z-axis rotation; pass
explicit ``R`` for off-axis rotors.
harmonic_indices : sequence of int, optional
Harmonic indices to solve. Defaults to ``0, 1, …, N//2``.
lumped : bool, default False
Use a row-summed lumped mass instead of consistent mass.
linear_solver : str, default ``"auto"``
Linear-solver backend identifier registered in
:mod:`femorph_solver.solvers.linear`.
Returns
-------
list[CyclicModalResult]
One result per harmonic index, each carrying
``omega_sq``, ``frequency``, ``mode_shapes``, and the
harmonic index ``k``.
"""
from femorph_solver.solvers.cyclic import (
solve_cyclic_modal as _solve_cyclic_modal,
)
K = self.stiffness_matrix()
M = self.mass_matrix(lumped=lumped)
low_dofs = self._face_node_dofs(low_face)
high_dofs = self._face_node_dofs(high_face)
if low_dofs.shape != high_dofs.shape:
raise ValueError(
f"cyclic_modal_solve: low_face has {low_dofs.shape[0]} paired nodes "
f"but high_face has {high_dofs.shape[0]} — they must match."
)
if pair_rotation is None and sector_angle is None:
sector_angle = 2.0 * np.pi / int(n_sectors)
if pair_rotation is None and sector_angle is not None:
c, s = np.cos(sector_angle), np.sin(sector_angle)
pair_rotation = np.array(
[[c, -s, 0.0], [s, c, 0.0], [0.0, 0.0, 1.0]],
dtype=np.float64,
)
prescribed = self._prescribed_dofs() or None
return _solve_cyclic_modal(
K,
M,
low_dofs,
high_dofs,
n_sectors=int(n_sectors),
n_modes=int(n_modes),
pair_rotation=pair_rotation,
harmonic_indices=harmonic_indices,
prescribed=prescribed,
linear_solver=linear_solver,
)
def _face_node_dofs(self, face: np.ndarray) -> np.ndarray:
"""Translate a node selector (mask or id array) into a ``(P, d)``
DOF-index array suitable for :func:`solve_cyclic_modal`.
``face`` accepts the same forms as :meth:`fix` — a boolean mask
of length ``Model.n_nodes`` (indexed by grid-point order) or
an array of node numbers. Per-node DOFs are taken from
:meth:`_node_dof_labels`; every node in ``face`` must declare
the same DOF set so the resulting array is rectangular.
"""
face_arr = np.asarray(face)
if face_arr.dtype == bool:
if face_arr.shape != (self.grid.n_points,):
raise ValueError(
f"cyclic_modal_solve: boolean `face` must have shape "
f"({self.grid.n_points},), got {face_arr.shape}"
)
node_nums = np.asarray(self.grid.point_data["ansys_node_num"], dtype=np.int64)
face_nodes = node_nums[face_arr]
else:
face_nodes = face_arr.astype(np.int64, copy=False)
dof_map = self.dof_map()
# Build an O(N) lookup from (node, dof_idx) → global row.
index = {(int(n), int(d)): i for i, (n, d) in enumerate(dof_map)}
node_dofs = self._node_dof_labels()
# Use the first node's DOF set as the canonical layout; every
# face node must report the same set so the (P, d) array stays
# rectangular.
ref_labels = node_dofs.get(int(face_nodes[0])) if face_nodes.size else None
if ref_labels is None:
raise ValueError(
"cyclic_modal_solve: cannot resolve DOF labels for the "
"first low-face node — make sure the model has been "
"fully assembled (call stiffness_matrix() once)."
)
ref_dof_idx = [self._DOF_LABELS[lbl] for lbl in ref_labels]
rows: list[list[int]] = []
for nn in face_nodes:
labels = node_dofs.get(int(nn))
dof_idx = (
[self._DOF_LABELS[lbl] for lbl in labels] if labels is not None else ref_dof_idx
)
if dof_idx != ref_dof_idx:
raise ValueError(
f"cyclic_modal_solve: node {int(nn)} has DOF set "
f"{labels} but the face's first node has "
f"{ref_labels}; every paired-face node must declare "
f"the same DOFs."
)
rows.append([index[(int(nn), d)] for d in dof_idx])
return np.asarray(rows, dtype=np.int64)
# ---------------------------------------------------------------------
# Post-processing
# ---------------------------------------------------------------------
[docs]
def eel(
self,
u: np.ndarray,
*,
nodal_avg: bool = True,
) -> np.ndarray | dict[int, np.ndarray]:
"""Elastic strain from a displacement vector.
Strain is evaluated at each element's own nodes by ε(ξ_node) =
B(ξ_node)·u_e — direct nodal evaluation rather than Gauss
extrapolation, but the two agree exactly for uniform-strain fields
and converge on mesh refinement. Output is 6-component Voigt with
engineering shears ``[εxx, εyy, εzz, γxy, γyz, γxz]``.
Parameters
----------
u : (N,) float64
Global displacement vector indexed by :meth:`dof_map` —
i.e. the :attr:`StaticResult.displacement` from :meth:`solve`
or a mode shape from :meth:`modal_solve`.
nodal_avg : bool, default True
If ``True`` (the default), values at shared nodes
are averaged across every element touching them — returns a
``(n_nodes_global, 6)`` array indexed by :attr:`node_numbers`.
If ``False``, returns the per-element dict
``{elem_num: (n_nodes_in_elem, 6)}`` — unaveraged, like
``PLESOL``.
"""
_ = self.grid
node_nums_sorted, layout, N = self._dof_layout()
u = np.ascontiguousarray(np.asarray(u, dtype=np.float64).ravel())
if u.shape[0] != N:
raise ValueError(f"u has shape {u.shape}, expected ({N},) DOFs")
if N == 0 or self._grid.n_cells == 0:
return np.zeros((0, 6), dtype=np.float64) if nodal_avg else {}
(
elem_nums_arr,
n_nodes_per_elem,
offsets,
conn,
et_ids,
_mat_ids,
_real_ids,
pt_nums,
points,
) = self._grid_element_arrays()
# Group by (et_id, n_nodes) — material / real don't affect strain but
# degenerate topologies need their own kernel.
groups: dict[tuple[int, int], list[int]] = {}
for i in range(et_ids.size):
groups.setdefault((int(et_ids[i]), int(n_nodes_per_elem[i])), []).append(i)
# Per-group (element_nums, nns_mapdl, eps) — nns_mapdl[i] is the
# Node numbers for the strain rows in eps[i]; used downstream
# for nodal averaging.
per_group: list[tuple[np.ndarray, np.ndarray, np.ndarray]] = []
for (et_id, _nn), cell_idx_list in groups.items():
cls = self._kernel_for(et_id, _nn)
batch_fn = getattr(cls, "eel_batch", None)
if batch_fn is None:
raise NotImplementedError(f"{cls.name}.eel_batch is not implemented")
nn_per_elem = cls.n_nodes
ndof_pn = cls.n_dof_per_node
cell_idx = np.asarray(cell_idx_list, dtype=np.int32)
base = offsets[cell_idx][:, None]
per_elem_cols = np.arange(nn_per_elem, dtype=np.int32)[None, :]
elem_vtk_ids = conn[base + per_elem_cols]
coords_arr = points[elem_vtk_ids]
nns_mapdl = pt_nums[elem_vtk_ids] # node nums (n_grp, nn_per_elem)
unique_vtk, inverse = np.unique(elem_vtk_ids.ravel(), return_inverse=True)
unique_mapdl = pt_nums[unique_vtk]
node_dofs = np.empty((unique_vtk.size, ndof_pn), dtype=np.int32)
for u_idx, nn in enumerate(unique_mapdl):
layout_base, labels = layout[int(nn)]
for k, lbl in enumerate(cls.dof_labels):
node_dofs[u_idx, k] = layout_base + labels.index(lbl)
dofs_per_elem = node_dofs[inverse].reshape(cell_idx.size, nn_per_elem * ndof_pn)
u_e = u[dofs_per_elem]
eps = batch_fn(coords_arr, u_e)
if eps is None:
raise RuntimeError(
f"C++ extension unavailable — {cls.name} has no Python strain fallback"
)
per_group.append((elem_nums_arr[cell_idx], nns_mapdl, np.asarray(eps)))
if not nodal_avg:
out: dict[int, np.ndarray] = {}
for enums, _nns, eps in per_group:
for i, en in enumerate(enums):
out[int(en)] = eps[i]
return out
# Nodal-averaged: sum contributions per node, divide by count.
sums = np.zeros((node_nums_sorted.size, 6), dtype=np.float64)
counts = np.zeros(node_nums_sorted.size, dtype=np.int64)
for _enums, nns_arr, eps in per_group:
flat_nn = nns_arr.ravel()
flat_eps = eps.reshape(-1, 6)
idx = np.searchsorted(node_nums_sorted, flat_nn)
np.add.at(sums, idx, flat_eps)
np.add.at(counts, idx, 1)
avg = np.zeros_like(sums)
mask = counts > 0
avg[mask] = sums[mask] / counts[mask, None]
return avg