Source code for femorph_solver.model

"""femorph-solver model — a thin wrapper around a ``pyvista.UnstructuredGrid``.

The grid is the canonical mesh representation:

* ``point_data["ansys_node_num"]``          — 1-based MAPDL node numbers
* ``cell_data["ansys_elem_num"]``           — 1-based MAPDL element numbers
* ``cell_data["ansys_elem_type_num"]``      — MAPDL ET id per element
* ``cell_data["ansys_material_type"]``      — MAPDL MAT id per element
* ``cell_data["ansys_real_constant"]``      — MAPDL REAL id per element

Convention matches :mod:`mapdl_archive` so femorph-solver models can be round-tripped
through CDB and other MAPDL tools without relabelling.

Global tables that don't belong per-cell live on the Model directly:

* :attr:`etypes`         — ET id → element class name (e.g. ``"LINK180"``)
* :attr:`materials`      — MAT id → ``{"EX": ..., "PRXY": ..., "DENS": ...}``
* :attr:`real_constants` — REAL id → 1-D ``numpy.ndarray``
"""

from __future__ import annotations

from typing import TYPE_CHECKING

import numpy as np
import pyvista as pv
import scipy.sparse as _sp
from vtkmodules.util.numpy_support import numpy_to_vtk
from vtkmodules.vtkCommonCore import VTK_UNSIGNED_CHAR, vtkTypeInt32Array
from vtkmodules.vtkCommonDataModel import vtkCellArray

from femorph_solver import _core
from femorph_solver._labels import (
    DOFLabel,
    ElementType,
    ForceLabel,
    MaterialProperty,
    UnitSystem,
    _token,
)
from femorph_solver.elements import get as _get_element
from femorph_solver.elements._base import CANONICAL_DOFS as _CANONICAL_DOFS
from femorph_solver.elements._base import ElementBase

if TYPE_CHECKING:  # pragma: no cover
    from collections.abc import Callable

    from femorph_solver.solvers.modal import ModalResult
    from femorph_solver.solvers.static import StaticResult
    from femorph_solver.solvers.transient import TransientResult


def _canonicalize_grid(grid: pv.UnstructuredGrid) -> pv.UnstructuredGrid:
    """Return a copy of ``grid`` with points sorted by ``ansys_node_num``
    and cells sorted by ``ansys_elem_num``.

    Every :class:`Model` hot path assumes the grid is in MAPDL-sorted
    order — the DOF layout walks points in the order they appear and
    matrix rows inherit that order, which means the same mesh loaded
    via two different paths must iterate points identically or two
    freshly-assembled Ks would differ by a permutation.  Centralising
    the canonicalisation here (rather than re-sorting inside every
    consumer) keeps the invariant local and lets the hot path take
    a zero-copy view of ``point_data`` / ``cell_data``.

    When the grid already happens to be sorted the function is still a
    no-op on content — it shortcuts if both orderings are already
    monotonic — so for meshes produced by ``_rebuild_grid`` (which
    always emits MAPDL-sorted) or by mapdl-archive (which typically
    does as well) there's no copy.
    """
    node_nums = np.asarray(grid.point_data.get("ansys_node_num"))
    elem_nums = np.asarray(grid.cell_data.get("ansys_elem_num"))
    pt_order = np.argsort(node_nums, kind="stable")
    cell_order = np.argsort(elem_nums, kind="stable")
    pts_sorted = np.all(pt_order == np.arange(node_nums.size))
    cells_sorted = np.all(cell_order == np.arange(elem_nums.size))
    if pts_sorted and cells_sorted:
        return grid

    # Rebuild with reordered points + cells.  VTK has no in-place
    # ``ReorderPoints`` that also rewires connectivity, so the cleanest
    # path is to extract → reorder → reassemble.
    new_points = np.asarray(grid.points, dtype=np.float64)[pt_order]
    # Inverse map: old VTK point index -> new VTK point index.
    inverse_pt = np.empty(node_nums.size, dtype=np.int32)
    inverse_pt[pt_order] = np.arange(node_nums.size, dtype=np.int32)

    # Per-cell connectivity in the original VTK index space, reindexed
    # to the new point order.
    old_conn = np.asarray(grid.cell_connectivity, dtype=np.int64)
    old_offset = np.asarray(grid.offset, dtype=np.int64)
    new_conn_full = inverse_pt[old_conn].astype(np.int32, copy=False)

    # Reorder cells.  Each cell's block in ``new_conn_full`` starts at
    # ``old_offset[i]`` and has length ``old_offset[i+1] - old_offset[i]``.
    cell_sizes = np.diff(old_offset)
    new_conn_chunks: list[np.ndarray] = []
    new_offset = [0]
    for i in cell_order:
        start = int(old_offset[i])
        sz = int(cell_sizes[i])
        new_conn_chunks.append(new_conn_full[start : start + sz])
        new_offset.append(new_offset[-1] + sz)
    new_conn = (
        np.concatenate(new_conn_chunks).astype(np.int32)
        if new_conn_chunks
        else np.empty(0, dtype=np.int32)
    )
    new_offset_arr = np.asarray(new_offset, dtype=np.int32)
    new_celltypes = np.asarray(grid.celltypes, dtype=np.uint8)[cell_order]

    # Assemble a fresh grid with int32 cell structure up-front.
    vtk_dtype = vtkTypeInt32Array().GetDataType()
    offsets_vtk = numpy_to_vtk(new_offset_arr, deep=True, array_type=vtk_dtype)
    conn_vtk = numpy_to_vtk(new_conn, deep=True, array_type=vtk_dtype)
    cell_array = vtkCellArray()
    cell_array.SetData(offsets_vtk, conn_vtk)
    celltypes_vtk = numpy_to_vtk(new_celltypes, deep=True, array_type=VTK_UNSIGNED_CHAR)

    new_grid = pv.UnstructuredGrid()
    new_grid.SetCells(celltypes_vtk, cell_array)
    new_grid.points = new_points

    # Carry over point/cell data in the new ordering.
    for k, v in grid.point_data.items():
        new_grid.point_data[k] = np.asarray(v)[pt_order]
    for k, v in grid.cell_data.items():
        new_grid.cell_data[k] = np.asarray(v)[cell_order]
    return new_grid


def _force_int32_cells(grid: pv.UnstructuredGrid) -> None:
    """In-place: coerce ``grid.cell_connectivity`` + ``grid.offset`` to int32.

    VTK builds these arrays as int64 by default on 64-bit systems even
    though every real FEM mesh fits in a 32-bit index (2 G cells / points
    would be ~64 GB of doubles alone).  The int64 layout doubles the
    memory footprint of cell structure and — more importantly — forces
    a copy-cast every time our downstream code asks for the int32 view
    it actually consumes (scipy CSR, the C++ scatter kernels, etc.).

    Rewriting the ``vtkCellArray`` with int32 backing storage is
    zero-copy after one pass — matches :func:`femorph.utilities.force_int32_itype`.
    """
    if grid.n_cells == 0:
        return
    offset = grid.offset
    if offset.dtype == np.int32:
        return
    conn = grid.cell_connectivity
    vtk_dtype = vtkTypeInt32Array().GetDataType()
    offset32 = np.ascontiguousarray(offset, dtype=np.int32)
    conn32 = np.ascontiguousarray(conn, dtype=np.int32)
    offsets_vtk = numpy_to_vtk(offset32, deep=True, array_type=vtk_dtype)
    conn_vtk = numpy_to_vtk(conn32, deep=True, array_type=vtk_dtype)
    cell_array = vtkCellArray()
    cell_array.SetData(offsets_vtk, conn_vtk)
    cell_types_vtk = numpy_to_vtk(grid.celltypes, deep=True, array_type=VTK_UNSIGNED_CHAR)
    grid.SetCells(cell_types_vtk, cell_array)


[docs] class Model: def __init__( self, grid: pv.UnstructuredGrid | None = None, *, unit_system: UnitSystem | str = UnitSystem.UNSPECIFIED, ) -> None: self._grid: pv.UnstructuredGrid = pv.UnstructuredGrid() if grid is None else grid self._etypes: dict[int, str] = {} self._materials: dict[int, dict[str, float]] = {} self._real_constants: dict[int, np.ndarray] = {} # Bookkeeping-only stamp. The solver is dimensionless — ``E = 2.1e11`` # is whatever the caller intends — but every realistic user works in # one consistent system. Stamping the Model lets downstream readers # (result files, stress recovery, material libraries) interpret the # numbers without guessing. See :class:`femorph_solver.UnitSystem`. self._unit_system: UnitSystem = ( unit_system if isinstance(unit_system, UnitSystem) else UnitSystem(unit_system) ) self._current_type: int = 1 self._current_mat: int = 1 self._current_real: int = 1 # Incremental-build staging area for ``N`` / ``E``. After any # grid rebuild these are drained back into the grid and cleared; # the grid is the permanent source of truth for geometry + # connectivity. Previously the dicts were the primary storage # (the grid was rebuilt from them on demand) — that cost ~34 MB # at 200×200×2 once the mesh matured because every node (80 B) # and element (~250 B) was duplicated between dict and grid. self._staged_nodes: dict[int, tuple[float, float, float]] = {} # elem_num -> (node_nums, etype_id, mat_id, real_id) self._staged_elements: dict[int, tuple[list[int], int, int, int]] = {} self._dirty: bool = False # Cached _dof_layout() result — reused across K/M assembly once the grid # is clean. Invalidated by any structural mutation (n, e, et). self._layout_cache: ( tuple[np.ndarray, dict[int, tuple[int, tuple[str, ...]]], int] | None ) = None self._node_dof_labels_cache: dict[int, tuple[str, ...]] | None = None # Cached (N, row_ptr, col_idx) CSR pattern — reused across K and M # since connectivity is geometry-only. Invalidated alongside the # layout cache on any structural mutation (n / e / et). self._csr_pattern_cache: tuple[int, np.ndarray, np.ndarray] | None = None # Cached assembled K and M. Invalidated on any structural or # material mutation — see ``_invalidate_matrix_cache``. Callers # that run modal sweeps or static solves back-to-back on the same # Model see the zero-overhead hit on the second call. self._K_cache: _sp.csr_array | None = None self._M_cache: _sp.csr_array | None = None self._M_lumped_cache: _sp.csr_array | None = None # Boundary conditions / loads staged via D / F. Keyed by (node, dof_idx). self._disp_bc: dict[tuple[int, int], float] = {} self._force_bc: dict[tuple[int, int], float] = {} @property def grid(self) -> pv.UnstructuredGrid: if self._dirty: self._rebuild_grid() return self._grid @property def etypes(self) -> dict[int, str]: return self._etypes @property def materials(self) -> dict[int, dict[str, float]]: return self._materials @property def real_constants(self) -> dict[int, np.ndarray]: return self._real_constants @property def unit_system(self) -> UnitSystem: """Current :class:`UnitSystem` stamp. Defaults to :attr:`UnitSystem.UNSPECIFIED` for legacy Models that pre-date this attribute. Set via :meth:`set_unit_system`, pass via the constructor, or let :meth:`Model.from_grid` / :func:`femorph_solver.mapdl_api.from_cdb` detect it from the input. """ return self._unit_system
[docs] def set_unit_system(self, unit_system: UnitSystem | str) -> None: """Declare the :class:`UnitSystem` this Model's numeric inputs follow. Bookkeeping only — the solver's math is dimensionless. The stamp travels with the Model through assembly, solve, and the result serialiser so downstream readers can interpret the numbers. """ self._unit_system = ( unit_system if isinstance(unit_system, UnitSystem) else UnitSystem(unit_system) )
[docs] def assert_unit_system(self, expected: UnitSystem | str) -> None: """Raise :class:`ValueError` if this Model's ``unit_system`` is not ``expected``. Use this in scripts that require a specific unit convention — a mismatch between the caller's assumption and the Model's stamp usually means numbers will be silently wrong by a factor of :math:`10^n`. :attr:`UnitSystem.UNSPECIFIED` always passes (no assertion possible on an unstamped Model). """ want = expected if isinstance(expected, UnitSystem) else UnitSystem(expected) if self._unit_system == UnitSystem.UNSPECIFIED: return if self._unit_system != want: raise ValueError( f"Model.unit_system is {self._unit_system.value}, expected {want.value}" )
@property def n_nodes(self) -> int: return int(self.grid.n_points) @property def n_elements(self) -> int: return int(self.grid.n_cells) @property def node_numbers(self) -> np.ndarray: arr = self.grid.point_data.get("ansys_node_num") return np.asarray(arr, dtype=np.int32) if arr is not None else np.empty(0, dtype=np.int32) @property def element_numbers(self) -> np.ndarray: arr = self.grid.cell_data.get("ansys_elem_num") return np.asarray(arr, dtype=np.int32) if arr is not None else np.empty(0, dtype=np.int32) def __repr__(self) -> str: return ( f"<femorph_solver.Model nodes={self.n_nodes} elements={self.n_elements} " f"etypes={len(self._etypes)} materials={len(self._materials)} " f"reals={len(self._real_constants)}>" )
[docs] @classmethod def from_grid(cls, grid: pv.UnstructuredGrid) -> Model: """Wrap an existing ``pyvista.UnstructuredGrid`` as a Model. The grid is expected to carry the MAPDL-style cell/point data arrays (``ansys_node_num``, ``ansys_elem_num``, ``ansys_elem_type_num``, ``ansys_material_type``, ``ansys_real_constant``). Missing arrays are auto-filled with sequential 1-based ids so meshes built directly in pyvista (e.g. a ``StructuredGrid`` cast to unstructured) are usable without the caller stamping every field themselves. The grid becomes the Model's sole source of truth for geometry and connectivity — the staging dicts stay empty for the lifetime of the Model unless the caller later mutates via :meth:`n` / :meth:`e`. Two incidental passes happen here: * ``cell_connectivity`` / ``offset`` are coerced to int32 if they came in as int64 (VTK's default on 64-bit builds) — halves the cell-structure footprint and removes a copy-cast the hot path would otherwise do on every assembly call. * Points and cells are reordered so ``ansys_node_num`` and ``ansys_elem_num`` are strictly increasing. Every downstream hot path assumes MAPDL-sorted order to produce a deterministic DOF layout; canonicalising here keeps the invariant local to Model construction rather than leaking into every consumer. """ m = cls() n_points = int(grid.n_points) n_cells = int(grid.n_cells) node_nums_arr = grid.point_data.get("ansys_node_num") if node_nums_arr is None: node_nums = np.arange(1, n_points + 1, dtype=np.int32) grid.point_data["ansys_node_num"] = node_nums else: node_nums = np.asarray(node_nums_arr, dtype=np.int32) if node_nums.dtype != np.int32: grid.point_data["ansys_node_num"] = node_nums if grid.cell_data.get("ansys_elem_num") is None: grid.cell_data["ansys_elem_num"] = np.arange(1, n_cells + 1, dtype=np.int32) if grid.cell_data.get("ansys_elem_type_num") is None: grid.cell_data["ansys_elem_type_num"] = np.ones(n_cells, dtype=np.int32) if grid.cell_data.get("ansys_material_type") is None: grid.cell_data["ansys_material_type"] = np.ones(n_cells, dtype=np.int32) if grid.cell_data.get("ansys_real_constant") is None: grid.cell_data["ansys_real_constant"] = np.ones(n_cells, dtype=np.int32) # Canonicalise + int32: ensures every hot path reads the grid # in MAPDL-sorted order with zero-copy numpy views. grid = _canonicalize_grid(grid) _force_int32_cells(grid) m._grid = grid m._dirty = False return m
# --------------------------------------------------------------------- # Preprocessor commands — names chosen to match MAPDL's APDL exactly. # --------------------------------------------------------------------- def _invalidate_topology_caches(self) -> None: """Invalidate every derived state that depends on mesh connectivity.""" self._layout_cache = None self._node_dof_labels_cache = None self._csr_pattern_cache = None self._K_cache = None self._M_cache = None self._M_lumped_cache = None def _invalidate_matrix_cache(self) -> None: """Invalidate only the assembled K / M caches — materials/real changed but connectivity is unchanged so the DOF layout / CSR pattern stay valid. """ self._K_cache = None self._M_cache = None self._M_lumped_cache = None
[docs] def release_matrix_cache(self) -> None: """Drop the cached assembled K / M matrices. Each cached matrix is roughly ``nnz × 12 bytes`` — on a 1.2 M-DOF plate with ~100 M nnz that's ~1.2 GB each, so releasing both saves ~2.4 GB. Call after a ``modal_solve`` / ``static_solve`` finishes and the caller is done with further analyses on this Model. Subsequent ``stiffness_matrix()`` or ``mass_matrix()`` calls will re-assemble. """ self._K_cache = None self._M_cache = None self._M_lumped_cache = None
[docs] def n(self, num: int, x: float = 0.0, y: float = 0.0, z: float = 0.0) -> None: """MAPDL ``N`` — define or overwrite a node.""" self._staged_nodes[int(num)] = (float(x), float(y), float(z)) self._dirty = True self._invalidate_topology_caches()
[docs] def e(self, *node_nums: int, num: int | None = None) -> int: """MAPDL ``E`` — define an element from node numbers. Uses the current TYPE / MAT / REAL stamps. Returns the element number (auto-assigned to ``max(existing) + 1`` when ``num`` is omitted). """ if num is not None: elem_num = int(num) else: # Max over existing grid + any already-staged elements. The # grid is authoritative once ``_rebuild_grid`` has run, so a # lone ``staged`` lookup is insufficient. existing_max = 0 if self._grid.n_cells > 0: existing_max = int(self._grid.cell_data["ansys_elem_num"].max()) staged_max = max(self._staged_elements, default=0) elem_num = max(existing_max, staged_max) + 1 self._staged_elements[elem_num] = ( [int(n) for n in node_nums], self._current_type, self._current_mat, self._current_real, ) self._dirty = True self._invalidate_topology_caches() return elem_num
[docs] def et(self, et_id: int, name: str | ElementType) -> None: """MAPDL ``ET`` — declare an element type. ``name`` accepts either a raw string (``"SOLID185"``) or an :class:`~femorph_solver.ElementType` member (``ElementType.SOLID185``). """ name_str = _token(name).upper() # Validate early: registered element classes only. _get_element(name_str) self._etypes[int(et_id)] = name_str self._invalidate_topology_caches()
[docs] def type(self, et_id: int) -> None: """MAPDL ``TYPE`` — set the current element-type stamp for ``E``.""" self._current_type = int(et_id)
[docs] def mat(self, mat_id: int) -> None: """MAPDL ``MAT`` — set the current material stamp for ``E``.""" self._current_mat = int(mat_id)
[docs] def real(self, real_id: int) -> None: """MAPDL ``REAL`` — set the current real-constant stamp for ``E``.""" self._current_real = int(real_id)
[docs] def mp(self, prop: str | MaterialProperty, mat_id: int, value: float) -> None: """MAPDL ``MP`` — set a scalar material property. ``prop`` accepts either a raw string (``"EX"``) or a :class:`~femorph_solver.MaterialProperty` member (``MaterialProperty.EX``). Unknown names are rejected to catch typos like ``"EXX"`` before they silently mask a zero default in an element kernel. """ prop_str = _token(prop).upper() # Validate at call time so silent misses (typos) can't turn # into NaN / zero surprises deep inside assembly. Leading # underscore bypasses the whitelist so elements can stash # private config (e.g. PLANE182's ``_PLANE_MODE``) here. if not prop_str.startswith("_"): try: MaterialProperty(prop_str) except ValueError as exc: allowed = ", ".join(m.value for m in MaterialProperty) raise ValueError( f"unknown material property {prop!r}; expected one of: {allowed}" ) from exc self._materials.setdefault(int(mat_id), {})[prop_str] = float(value) self._invalidate_matrix_cache()
[docs] def r(self, real_id: int, *values: float) -> None: """MAPDL ``R`` — set the real-constant set.""" self._real_constants[int(real_id)] = np.asarray(values, dtype=np.float64) self._invalidate_matrix_cache()
# --------------------------------------------------------------------- # Loads / boundary conditions # --------------------------------------------------------------------- _DOF_LABELS: dict[str, int] = { "UX": 0, "UY": 1, "UZ": 2, "ROTX": 3, "ROTY": 4, "ROTZ": 5, } _FORCE_LABELS: dict[str, int] = { "FX": 0, "FY": 1, "FZ": 2, "MX": 3, "MY": 4, "MZ": 5, }
[docs] def d(self, node: int, label: str | DOFLabel, value: float = 0.0) -> None: """MAPDL ``D`` — constrain a nodal DOF. ``label`` accepts a raw string, a :class:`~femorph_solver.DOFLabel` member, or the sentinel string ``"ALL"``, which expands to every DOF active at that node (as determined by the elements that touch it). """ lbl = _token(label).upper() if lbl == "ALL": labels = self._node_dof_labels().get(int(node), _CANONICAL_DOFS[:3]) for active in labels: self._disp_bc[(int(node), self._DOF_LABELS[active])] = float(value) return if lbl not in self._DOF_LABELS: raise ValueError(f"unknown displacement DOF label {label!r}") self._disp_bc[(int(node), self._DOF_LABELS[lbl])] = float(value)
[docs] def f(self, node: int, label: str | ForceLabel, value: float) -> None: """MAPDL ``F`` — apply a nodal force or moment. ``label`` accepts a raw string (``"FZ"``) or a :class:`~femorph_solver.ForceLabel` member. """ lbl = _token(label).upper() if lbl not in self._FORCE_LABELS: raise ValueError(f"unknown force DOF label {label!r}") self._force_bc[(int(node), self._FORCE_LABELS[lbl])] = float(value)
# --------------------------------------------------------------------- # Grid materialisation # --------------------------------------------------------------------- def _rebuild_grid(self) -> None: """Merge the existing grid with any staged ``N`` / ``E`` additions and emit a fresh MAPDL-sorted grid with int32 cell structure. The grid is the Model's sole permanent storage for geometry and connectivity; the ``_staged_*`` dicts only hold the deltas since the last rebuild and are cleared here. Merging via dicts (keyed on MAPDL number) honours overwrite semantics — a later ``n()`` with an existing number replaces the grid's coordinate. """ # Pull existing grid rows back into dicts, then overlay staged # additions (overwrite wins). Cheap because the grid is already # numpy; zip into a {int: tuple} dict for the bulk merge path. nodes: dict[int, tuple[float, float, float]] = {} if self._grid.n_points > 0: pts = np.asarray(self._grid.points, dtype=np.float64) nums = np.asarray(self._grid.point_data["ansys_node_num"], dtype=np.int32) for nn, pt in zip(nums, pts): nodes[int(nn)] = (float(pt[0]), float(pt[1]), float(pt[2])) nodes.update(self._staged_nodes) elements: dict[int, tuple[list[int], int, int, int]] = {} if self._grid.n_cells > 0: elem_nums_existing = np.asarray(self._grid.cell_data["ansys_elem_num"], dtype=np.int32) et_existing = np.asarray(self._grid.cell_data["ansys_elem_type_num"], dtype=np.int32) mat_existing = np.asarray(self._grid.cell_data["ansys_material_type"], dtype=np.int32) real_existing = np.asarray(self._grid.cell_data["ansys_real_constant"], dtype=np.int32) pt_nums_existing = np.asarray(self._grid.point_data["ansys_node_num"], dtype=np.int32) conn_flat = np.asarray(self._grid.cell_connectivity, dtype=np.int32) offsets = np.asarray(self._grid.offset, dtype=np.int32) for i, en in enumerate(elem_nums_existing): vtk_ids = conn_flat[offsets[i] : offsets[i + 1]] elements[int(en)] = ( [int(pt_nums_existing[j]) for j in vtk_ids], int(et_existing[i]), int(mat_existing[i]), int(real_existing[i]), ) elements.update(self._staged_elements) if not nodes: self._grid = pv.UnstructuredGrid() self._staged_nodes.clear() self._staged_elements.clear() self._dirty = False return # MAPDL node numbers are 1-based and may be sparse; map to contiguous # VTK point indices by sorted MAPDL number. node_nums = np.fromiter(sorted(nodes), dtype=np.int32) points = np.array([nodes[n] for n in node_nums], dtype=np.float64) num_to_idx = {n: i for i, n in enumerate(node_nums)} if elements: elem_nums = np.fromiter(sorted(elements), dtype=np.int32) cells_flat: list[int] = [] cell_types: list[int] = [] etype_nums = np.empty(len(elem_nums), dtype=np.int32) mat_ids = np.empty(len(elem_nums), dtype=np.int32) real_ids = np.empty(len(elem_nums), dtype=np.int32) for i, en in enumerate(elem_nums): nns, et_id, mat_id, real_id = elements[int(en)] if et_id not in self._etypes: raise KeyError(f"element {en} uses TYPE={et_id} but no ET is defined for it") cls = self._kernel_for(et_id, len(nns)) cell_types.append(cls.vtk_cell_type) cells_flat.append(len(nns)) for nn in nns: if nn not in num_to_idx: raise KeyError(f"element {en} references undefined node {nn}") cells_flat.append(num_to_idx[nn]) etype_nums[i] = et_id mat_ids[i] = mat_id real_ids[i] = real_id grid = pv.UnstructuredGrid( np.array(cells_flat, dtype=np.int64), np.array(cell_types, dtype=np.uint8), points, ) grid.cell_data["ansys_elem_num"] = elem_nums grid.cell_data["ansys_elem_type_num"] = etype_nums grid.cell_data["ansys_material_type"] = mat_ids grid.cell_data["ansys_real_constant"] = real_ids else: grid = pv.UnstructuredGrid() grid.points = points grid.point_data["ansys_node_num"] = node_nums _force_int32_cells(grid) self._grid = grid self._staged_nodes.clear() self._staged_elements.clear() self._dirty = False # --------------------------------------------------------------------- # Global DOF layout + sparse K/M assembly # --------------------------------------------------------------------- # Degenerate-topology aliases for MAPDL ET names that share a single # element code across multiple shapes. mapdl-archive (and VTK) splits # such elements by cell type / node count; this table picks the right # kernel without forcing callers to spell the degenerate name. _DEGENERATE_ALIASES: dict[str, dict[int, str]] = { "SOLID186": {20: "SOLID186", 15: "SOLID186W", 13: "SOLID186P"}, } def _kernel_for(self, et_id: int, n_nodes: int) -> type[ElementBase]: """Resolve the element class for a given ET id + actual node count. A single MAPDL ET (e.g. 186) can correspond to multiple shapes once corner nodes are collapsed; the connectivity length in the grid disambiguates them. """ name = self._etypes[int(et_id)] aliases = self._DEGENERATE_ALIASES.get(name) if aliases is not None: alt = aliases.get(int(n_nodes)) if alt is not None: return _get_element(alt) cls = _get_element(name) if n_nodes != cls.n_nodes: raise ValueError( f"{name} expects {cls.n_nodes} nodes per element but an element " f"with {n_nodes} nodes was recorded; no degenerate kernel known" ) return cls def _sorted_node_nums(self) -> np.ndarray: # After ``from_grid`` / ``_rebuild_grid`` the grid is guaranteed # MAPDL-sorted, so the ``point_data`` view is the answer verbatim # (zero copy). Fall back through ``.grid`` if the model is dirty. g = self.grid if g.n_points == 0: return np.empty(0, dtype=np.int32) return np.asarray(g.point_data["ansys_node_num"], dtype=np.int32)
[docs] def node_coord(self, node_num: int) -> tuple[float, float, float]: """Return ``(x, y, z)`` for the given MAPDL node number. The grid is the only permanent storage, so single-node lookups aren't O(1) the way the old ``_nodes[nn]`` dict lookup was — but callers that do this in a hot loop should lift the full ``grid.points`` array via :meth:`Model.grid` instead. """ g = self.grid nums = np.asarray(g.point_data["ansys_node_num"]) # ``nums`` is MAPDL-sorted after canonicalisation, so searchsorted # is O(log n) and matches the dict's effective lookup cost. idx = np.searchsorted(nums, node_num) if idx >= nums.size or int(nums[idx]) != int(node_num): raise KeyError(f"node {node_num} not in model") pt = g.points[idx] return (float(pt[0]), float(pt[1]), float(pt[2]))
[docs] def element_info(self, elem_num: int) -> tuple[list[int], int, int, int]: """Return ``(node_nums, et_id, mat_id, real_id)`` for an element. Convenience accessor for the stress-recovery / strain-export paths that previously reached into ``model._elements[en]``. O(log n) lookup; lift ``grid.cell_data`` / ``grid.cell_connectivity`` yourself for bulk iteration. """ g = self.grid elem_nums = np.asarray(g.cell_data["ansys_elem_num"]) idx = np.searchsorted(elem_nums, elem_num) if idx >= elem_nums.size or int(elem_nums[idx]) != int(elem_num): raise KeyError(f"element {elem_num} not in model") offsets = np.asarray(g.offset, dtype=np.int32) conn = np.asarray(g.cell_connectivity, dtype=np.int32) pt_nums = np.asarray(g.point_data["ansys_node_num"], dtype=np.int32) vtk_ids = conn[offsets[idx] : offsets[idx + 1]] nns = [int(pt_nums[j]) for j in vtk_ids] return ( nns, int(g.cell_data["ansys_elem_type_num"][idx]), int(g.cell_data["ansys_material_type"][idx]), int(g.cell_data["ansys_real_constant"][idx]), )
def _grid_element_arrays( self, ) -> tuple[ np.ndarray, # elem_nums (n_elem,) int32 np.ndarray, # n_nodes per elem (n_elem,) int32 np.ndarray, # offsets into cell_connectivity (n_elem + 1,) int32 np.ndarray, # cell_connectivity flat (total_nns,) int32 — VTK point indices np.ndarray, # et_ids (n_elem,) int32 np.ndarray, # mat_ids (n_elem,) int32 np.ndarray, # real_ids (n_elem,) int32 np.ndarray, # point_nums (n_points,) int32 MAPDL node number per VTK point np.ndarray, # points (n_points, 3) float64 ]: """Bulk numpy view of the grid for the assembly / strain hot paths. Every array is a zero-copy view onto the grid's internal buffers (assuming the grid has been canonicalised + int32-coerced) — no dict iteration, no per-element Python lookups. Centralised so the hot paths don't each reach into the pyvista API. """ g = self.grid offsets = np.asarray(g.offset, dtype=np.int32) conn = np.asarray(g.cell_connectivity, dtype=np.int32) n_nodes_per_elem = np.diff(offsets).astype(np.int32, copy=False) return ( np.asarray(g.cell_data["ansys_elem_num"], dtype=np.int32), n_nodes_per_elem, offsets, conn, np.asarray(g.cell_data["ansys_elem_type_num"], dtype=np.int32), np.asarray(g.cell_data["ansys_material_type"], dtype=np.int32), np.asarray(g.cell_data["ansys_real_constant"], dtype=np.int32), np.asarray(g.point_data["ansys_node_num"], dtype=np.int32), np.asarray(g.points, dtype=np.float64), ) def _node_dof_labels(self) -> dict[int, tuple[str, ...]]: """Return ``{node: (dof_label, ...)}`` in MAPDL's canonical DOF order. The DOF set for a node is the union of ``dof_labels`` across every element that touches it. Nodes not referenced by any element default to the three translational DOFs so unconnected nodes still fit the global layout. Tolerant of partial state: if a user stages an element referencing an ET that hasn't been declared yet (``m.et(...)``), the element is simply skipped here — ``_rebuild_grid`` is the method that enforces the ET invariant at solve time. Walks the grid + the staged buffers directly without forcing a rebuild, so pre-rebuild introspection doesn't crash on intentionally-partial models. """ if self._node_dof_labels_cache is not None: return self._node_dof_labels_cache default = _CANONICAL_DOFS[:3] per_node: dict[int, set[str]] = {} # Walk the grid (authoritative committed state). if self._grid.n_points > 0: pt_nums = np.asarray(self._grid.point_data["ansys_node_num"], dtype=np.int32) for n in pt_nums: per_node[int(n)] = set() if self._grid.n_cells > 0: offsets = np.asarray(self._grid.offset, dtype=np.int32) conn = np.asarray(self._grid.cell_connectivity, dtype=np.int32) et_ids = np.asarray(self._grid.cell_data["ansys_elem_type_num"], dtype=np.int32) pt_nums = np.asarray(self._grid.point_data["ansys_node_num"], dtype=np.int32) for i in range(self._grid.n_cells): et_id = int(et_ids[i]) if et_id not in self._etypes: continue vtk_ids = conn[offsets[i] : offsets[i + 1]] cls = self._kernel_for(et_id, vtk_ids.size) for j in vtk_ids: per_node[int(pt_nums[j])].update(cls.dof_labels) # Overlay staged incremental additions (not yet merged into the grid). for nn in self._staged_nodes: per_node.setdefault(int(nn), set()) for nns, et_id, _, _ in self._staged_elements.values(): if et_id not in self._etypes: continue cls = self._kernel_for(et_id, len(nns)) for nn in nns: per_node.setdefault(int(nn), set()).update(cls.dof_labels) out: dict[int, tuple[str, ...]] = {} for nn, s in per_node.items(): out[nn] = tuple(lbl for lbl in _CANONICAL_DOFS if lbl in s) or default self._node_dof_labels_cache = out return out def _dof_layout( self, ) -> tuple[np.ndarray, dict[int, tuple[int, tuple[str, ...]]], int]: """Return ``(sorted_node_nums, {node: (offset, labels)}, N)``. ``offset`` is the first global DOF index for a node; ``labels`` is its DOF-label tuple in canonical order. Cached across calls; invalidated by any structural mutation (n, e, et). """ if self._layout_cache is not None: return self._layout_cache node_nums = self._sorted_node_nums() node_dofs = self._node_dof_labels() layout: dict[int, tuple[int, tuple[str, ...]]] = {} offset = 0 for nn in node_nums: labels = node_dofs[int(nn)] layout[int(nn)] = (offset, labels) offset += len(labels) self._layout_cache = (node_nums, layout, offset) return self._layout_cache
[docs] def dof_map(self) -> np.ndarray: """Return an ``(N, 2)`` array of ``(mapdl_node_num, local_dof_idx)``. Row ``i`` of K/M corresponds to ``dof_map()[i]``. ``local_dof_idx`` is the 0-based position in MAPDL's canonical order ``(UX, UY, UZ, ROTX, ROTY, ROTZ)``. Nodes only get rows for the DOFs the elements at them actually need, so a truss model stays 3-DOF while a beam model gets 6. """ node_nums, layout, total = self._dof_layout() out = np.empty((total, 2), dtype=np.int32) for nn in node_nums: offset, labels = layout[int(nn)] for k, lbl in enumerate(labels): out[offset + k] = (int(nn), self._DOF_LABELS[lbl]) return out
def _assemble( self, which: str, lumped: bool = False, *, full_to_free: np.ndarray | None = None, triu_only: bool = False, pattern: tuple[np.ndarray, np.ndarray] | None = None, ) -> _sp.csr_array: """Assemble the global K or M matrix. ``full_to_free`` (optional): length-N ``int32`` array mapping each full-system DOF index to its reduced-system index (-1 for fixed DOFs that should be dropped entirely). When supplied, returned matrix has shape ``(n_free, n_free)`` and is built directly in the reduced numbering — the full-system K/M are never materialised, which roughly halves peak assembly RSS on BC-constrained problems. ``triu_only`` drops entries below the diagonal during the scatter so the result is upper-triangular (diagonal + strictly-upper). Lets callers feed MKL Pardiso's SPD Cholesky factor directly without a separate ``scipy.sparse.triu`` extraction step — halves the CSR's nnz footprint and skips one ``~n_free × 12 B`` transient copy. The returned matrix is tagged ``.femorph_triu = True`` so downstream solvers can detect the storage convention. ``pattern=(row_ptr, col_idx)`` skips the symbolic pattern build and scatters into the supplied arrays directly. Lets the caller share indptr + indices between K and M (they're both keyed on the same element connectivity + reduced-DOF mask, so the sparsity structures are identical when both are assembled with the same ``triu_only`` / ``full_to_free`` flags). The returned matrix aliases the supplied arrays — modifying them after the fact mutates the returned CSR. """ _ = self.grid # triggers _rebuild_grid if dirty (also validates nodes/types) _, layout, N = self._dof_layout() if full_to_free is None: N_out = N else: if full_to_free.shape[0] != N: raise ValueError(f"full_to_free length {full_to_free.shape[0]} != N={N}") # Maximum free index + 1 is the reduced dimension. N_out = int(full_to_free.max()) + 1 if N > 0 else 0 if N == 0 or self._grid.n_cells == 0: return _sp.csr_array((N_out, N_out), dtype=np.float64) # Grid-backed numpy views of the element table. ``conn`` holds # VTK point indices (0-based, contiguous after canonicalisation); # ``pt_nums`` maps them back to MAPDL node numbers when we need # them for DOF-layout lookup. ``points`` is indexed by VTK # point index directly, so per-group coord gather is a single # fancy-index (no ``searchsorted``). ( _elem_nums, n_nodes_per_elem, offsets, conn, et_ids, mat_ids, real_ids, pt_nums, points, ) = self._grid_element_arrays() # Group by (et_id, n_nodes, material-VALUE, real-VALUE) — NOT # raw ``mat_id`` / ``real_id``. Decks like ``ptr.cdb`` assign a # unique ``mat_id`` per cell even when every entry holds # identical material properties; keying on the ID value fans out # 5000+ singleton groups and defeats ``ke_batch`` / ``me_batch``. # Keying on the hashed property tuple collapses it to the real # element-type groups (4 on PTR: 3069 hex + 497 pyramid + # 19 wedge + 1581 tet) and restores batching. # ``n_nodes`` joins the key so degenerate topologies under a # shared MAPDL ET id (e.g. SOLID186 hex / wedge / pyramid) land # in distinct groups. mat_cache: dict[int, tuple] = {} real_cache: dict[int, tuple] = {} def _mat_key(mid: int) -> tuple: key = mat_cache.get(mid) if key is None: props = self._materials.get(mid, {}) # Sort items so dict-ordering doesn't split identical # materials across groups. key = tuple(sorted(props.items())) mat_cache[mid] = key return key def _real_key(rid: int) -> tuple: key = real_cache.get(rid) if key is None: vals = self._real_constants.get(rid, np.empty(0, dtype=np.float64)) # ``tobytes`` is O(n_vals) but n_vals is tiny (≤ 10 for # any real-constant set). key = (vals.shape, np.ascontiguousarray(vals).tobytes()) real_cache[rid] = key return key # Key -> (et_id, n_nodes, representative_mat_id, # representative_real_id, cell_idx_list). Keep a # representative ``mat_id`` / ``real_id`` for each group so we # can look up the material dict / real-constant array below. groups: dict[tuple, tuple[int, int, int, int, list[int]]] = {} for i in range(et_ids.size): mid = int(mat_ids[i]) rid = int(real_ids[i]) key = (int(et_ids[i]), int(n_nodes_per_elem[i]), _mat_key(mid), _real_key(rid)) entry = groups.get(key) if entry is None: groups[key] = (int(et_ids[i]), int(n_nodes_per_elem[i]), mid, rid, [i]) else: entry[4].append(i) # Per-group element matrices and DOF maps. ``dofs_per_elem`` is # int32 — what the C++ scatter and scipy CSR both want. group_dofs: list[np.ndarray] = [] group_mats: list[np.ndarray] = [] for et_id, nn_per_elem, mat_id, real_id, cell_idx_list in groups.values(): cls = self._kernel_for(et_id, nn_per_elem) material = self._materials.get(mat_id, {}) real = self._real_constants.get(real_id, np.empty(0, dtype=np.float64)) cell_idx = np.asarray(cell_idx_list, dtype=np.int32) # (n_grp, nn_per_elem) VTK point indices for each element in # the group. ``offsets[cell_idx][:, None] + arange(nn)`` is a # vectorised gather over the flat connectivity. base = offsets[cell_idx][:, None] per_elem_cols = np.arange(nn_per_elem, dtype=np.int32)[None, :] elem_vtk_ids = conn[base + per_elem_cols] # (n_grp, nn_per_elem) coords_arr = points[elem_vtk_ids] # (n_grp, nn_per_elem, 3) — zero dict hits if which == "K": batch_fn = getattr(cls, "ke_batch", None) mats = batch_fn(coords_arr, material, real) if batch_fn else None if mats is None: mats = np.stack( [cls.ke(coords_arr[i], material, real) for i in range(cell_idx.size)] ) else: batch_fn = getattr(cls, "me_batch", None) mats = batch_fn(coords_arr, material, real, lumped=lumped) if batch_fn else None if mats is None: mats = np.stack( [ cls.me(coords_arr[i], material, real, lumped=lumped) for i in range(cell_idx.size) ] ) # Per-element DOF indices. Layout is keyed by MAPDL node # number, so translate through ``pt_nums`` once per unique # node touched by this group — cheaper than doing it per # (elem, node) pair. unique_vtk, inverse = np.unique(elem_vtk_ids.ravel(), return_inverse=True) unique_mapdl = pt_nums[unique_vtk] # Fast path: homogeneous DOF labels across every node in the # group. Holds whenever a node is touched only by elements # sharing the same ``dof_labels`` — in particular, the common # case where the whole model is one element kind (SOLID185, # BEAM188, etc.). At 300×300×2 SOLID185 (271k nodes × 3 # labels per K/M assembly) the vectorised path replaces # ~800k Python iterations with two numpy ops, roughly # 200 ms → 5 ms per assembly call. When labels vary per # node (mixed-element models where a node sits between a # SOLID185 and a BEAM188 group) we fall back to the # per-node lookup below. first_labels = layout[int(unique_mapdl[0])][1] homogeneous = True for nn in unique_mapdl[1:]: if layout[int(nn)][1] != first_labels: homogeneous = False break if homogeneous: bases = np.fromiter( (layout[int(nn)][0] for nn in unique_mapdl), dtype=np.int32, count=unique_mapdl.size, ) label_offsets = np.array( [first_labels.index(lbl) for lbl in cls.dof_labels], dtype=np.int32, ) node_dofs = bases[:, None] + label_offsets[None, :] else: node_dofs = np.empty((unique_vtk.size, cls.n_dof_per_node), dtype=np.int32) for u_idx, nn in enumerate(unique_mapdl): layout_base, labels = layout[int(nn)] for k, lbl in enumerate(cls.dof_labels): node_dofs[u_idx, k] = layout_base + labels.index(lbl) dofs_per_elem = node_dofs[inverse].reshape( cell_idx.size, nn_per_elem * cls.n_dof_per_node ) if full_to_free is not None: # Remap to reduced numbering; fixed DOFs become -1 and the # C++ scatter drops their rows + cols entirely. dofs_per_elem = full_to_free[dofs_per_elem] group_dofs.append(np.ascontiguousarray(dofs_per_elem, dtype=np.int32)) group_mats.append(np.ascontiguousarray(mats, dtype=np.float64)) # Build (or reuse) the symbolic CSR pattern from every group's # connectivity. When the pattern is for the full system it's # cached for reuse across K and M (geometry-only). For the # reduced path we rebuild per call — the mask is a property of # the BC set, not the mesh, and the cost is dominated by the # factor anyway. ``triu_only`` always bypasses the cache — the # triu pattern is K-specific. if pattern is not None: row_ptr, col_idx = pattern elif full_to_free is None and not triu_only: row_ptr, col_idx = self._get_or_build_csr_pattern(group_dofs, N) else: row_ptr, col_idx = self._build_csr_pattern(group_dofs, N_out, triu_only=triu_only) data = np.zeros(col_idx.shape[0], dtype=np.float64) for dofs_pe, mats in zip(group_dofs, group_mats): _core.scatter_into_csr( dofs_pe, mats, row_ptr, col_idx, data, N_out, triu_only=triu_only ) # The standard ``_sp.csr_array((data, col_idx, row_ptr), ...)`` # constructor runs ``check_format`` which copies ``data`` and # ``col_idx`` into fresh arrays — a transient 2× spike that # shows up as ~90 MB of avoidable peak RSS on a 200×200×2 # triu SOLID185 plate. We've just populated the arrays with # the C++ scatter so we know they're valid (sorted column # indices per row from ``build_csr_pattern``, matching dtype, # contiguous) — construct the csr_array by assigning to the # private attributes directly so scipy skips the copies. out = _sp.csr_array.__new__(_sp.csr_array) out._shape = (N_out, N_out) out.indptr = row_ptr out.indices = col_idx out.data = data # Required by scipy's internal caches / maxprint: out._maxprint = 50 if triu_only: # Tag so Pardiso / ARPACK can skip their own triu extraction and # avoid calling K @ x (which would only compute the triu-product). out.femorph_triu = True return out def _build_csr_pattern( self, group_dofs: list[np.ndarray], N: int, *, triu_only: bool = False ) -> tuple[np.ndarray, np.ndarray]: """Symbolic CSR pattern from element-DOF groups, no caching. Supports ``-1`` sentinels in ``group_dofs`` (the reduced-assembly path uses them for filtered-out fixed DOFs); the C++ builder and the scipy fallback both ignore them. ``triu_only=True`` drops entries below the diagonal so the result is upper-triangular — see :meth:`_assemble` for why that's useful. """ if len(group_dofs) == 1: return _core.build_csr_pattern(group_dofs[0], N, triu_only=triu_only) # Mixed ``ndof_per_elem`` across groups: pad each group's per-row # DOF array up to the max with ``-1`` sentinels, stack, then hand # the merged block to the single-group C++ builder. The kernel # already treats ``-1`` entries as fixed-and-dropped (matches # the reduced-assembly contract), so padded slots are ignored # with zero extra bookkeeping. # # The cost of the extra "wasted" cells is small: on ``ptr.cdb`` # padding 1581 SOLID187 rows (10 nodes × 3 DOF = 30) up to 60 # plus 497 SOLID186P rows (13 × 3 = 39) adds ~50 k cells to a # 5166-row block. In exchange we skip the scipy ``coo → csr`` # fallback (which materialised ~1.47 M COO entries, sorted them # and deduplicated with a Python-level unique) AND pick up the # OpenMP row-parallel speedup in the C++ builder. Net on PTR: # ~500 ms → ~120 ms pattern build at 4 threads. max_ndof = max(int(g.shape[1]) for g in group_dofs) n_elem_total = sum(int(g.shape[0]) for g in group_dofs) padded = np.full((n_elem_total, max_ndof), -1, dtype=np.int32) off = 0 for g in group_dofs: n, nd = g.shape padded[off : off + n, :nd] = g off += n return _core.build_csr_pattern(padded, N, triu_only=triu_only) def _get_or_build_csr_pattern( self, group_dofs: list[np.ndarray], N: int ) -> tuple[np.ndarray, np.ndarray]: cached = self._csr_pattern_cache if cached is not None and cached[0] == N: return cached[1], cached[2] row_ptr, col_idx = self._build_csr_pattern(group_dofs, N) self._csr_pattern_cache = (N, row_ptr, col_idx) return row_ptr, col_idx
[docs] def stiffness_matrix(self) -> _sp.csr_array: """Assemble the global stiffness matrix as a ``scipy.sparse.csr_array``. Row/column ``i`` corresponds to :meth:`dof_map` row ``i``. Cached on the Model: repeat calls on an unchanged Model skip the ~2 s assembly phase entirely. The cache is invalidated by any structural (``n`` / ``e`` / ``et``) or material (``mp`` / ``r``) mutation. """ if self._K_cache is not None and not self._dirty: return self._K_cache K = self._assemble("K") self._K_cache = K return K
[docs] def mass_matrix(self, lumped: bool = False) -> _sp.csr_array: """Assemble the global mass matrix as a ``scipy.sparse.csr_array``. Same caching semantics as :meth:`stiffness_matrix` — consistent and lumped variants are cached independently. """ if not self._dirty: cached = self._M_lumped_cache if lumped else self._M_cache if cached is not None: return cached M = self._assemble("M", lumped=lumped) if lumped: self._M_lumped_cache = M else: self._M_cache = M return M
# --------------------------------------------------------------------- # Analysis drivers # --------------------------------------------------------------------- def _global_index( self, node: int, dof_canonical: int, layout: dict[int, tuple[int, tuple[str, ...]]], ) -> int: if node not in layout: raise KeyError(f"DOF referenced undefined node {node}") offset, labels = layout[node] label = _CANONICAL_DOFS[dof_canonical] if label not in labels: raise KeyError(f"node {node} has no {label} DOF (active labels: {labels})") return offset + labels.index(label) def _load_vector(self) -> np.ndarray: _, layout, N = self._dof_layout() F = np.zeros(N, dtype=np.float64) for (node, dof), value in self._force_bc.items(): F[self._global_index(int(node), dof, layout)] = value return F def _prescribed_dofs(self) -> dict[int, float]: _, layout, _ = self._dof_layout() out: dict[int, float] = {} for (node, dof), value in self._disp_bc.items(): out[self._global_index(int(node), dof, layout)] = value return out
[docs] def strain(self, displacement: np.ndarray) -> dict[int, np.ndarray]: """Compute per-element elastic strain from a global displacement vector. Returns a dict mapping MAPDL element number → ``(n_nodes, 6)`` engineering-shear Voigt strain sampled at each element node. Shares the kernel path with :func:`write_static_result` / :func:`write_modal_result` but stays in memory — nothing hits disk. Use this to recover strain from any displacement field (static result, one modal shape, a morphed guess) without round-tripping through ``.pv``. ``displacement`` must be indexed by :meth:`dof_map` (the same layout the solvers produce). Elements whose kernel returns ``None`` from ``eel_batch`` are skipped. """ _ = self.grid _, layout, _N = self._dof_layout() displacement = np.ascontiguousarray(displacement, dtype=np.float64) out: dict[int, np.ndarray] = {} if self._grid.n_cells == 0: return out ( elem_nums_arr, n_nodes_per_elem, offsets, conn, et_ids, mat_ids, real_ids, pt_nums, points, ) = self._grid_element_arrays() # Group by (et_id, n_nodes, mat_id, real_id) so each batch calls # a single ``eel_batch`` kernel on a contiguous coord buffer. groups: dict[tuple[int, int, int, int], list[int]] = {} for i in range(et_ids.size): key = (int(et_ids[i]), int(n_nodes_per_elem[i]), int(mat_ids[i]), int(real_ids[i])) groups.setdefault(key, []).append(i) for (et_id, nn_per_elem, _mat, _real), cell_idx_list in groups.items(): cls = self._kernel_for(et_id, nn_per_elem) batch_fn = getattr(cls, "eel_batch", None) if batch_fn is None: continue cell_idx = np.asarray(cell_idx_list, dtype=np.int32) base = offsets[cell_idx][:, None] per_elem_cols = np.arange(nn_per_elem, dtype=np.int32)[None, :] elem_vtk_ids = conn[base + per_elem_cols] # (n_grp, nn_per_elem) coords = points[elem_vtk_ids] # Gather each element's global DOFs from the layout, same as # the assembly path. Translate VTK → MAPDL once per unique # node before the per-unique Python dict hit. unique_vtk, inverse = np.unique(elem_vtk_ids.ravel(), return_inverse=True) unique_mapdl = pt_nums[unique_vtk] node_dofs = np.empty((unique_vtk.size, cls.n_dof_per_node), dtype=np.int64) for u_idx, nn in enumerate(unique_mapdl): layout_base, labels = layout[int(nn)] for k, lbl in enumerate(cls.dof_labels): node_dofs[u_idx, k] = layout_base + labels.index(lbl) dofs_per_elem = node_dofs[inverse].reshape( cell_idx.size, nn_per_elem * cls.n_dof_per_node ) u_e = np.ascontiguousarray(displacement[dofs_per_elem]) eps = batch_fn(np.ascontiguousarray(coords), u_e) if eps is None: continue group_elem_nums = elem_nums_arr[cell_idx] for i, en in enumerate(group_elem_nums): out[int(en)] = np.asarray(eps[i]) return out
[docs] def solve( self, *, linear_solver: str = "auto", progress: bool = False, ) -> StaticResult: """Run a linear static analysis (MAPDL ``ANTYPE,STATIC`` equivalent). Uses the K assembled from current elements, the D-staged Dirichlet BCs, and the F-staged nodal forces. Returns a ``StaticResult`` whose ``displacement``/``reaction`` arrays are indexed by :meth:`dof_map`. ``linear_solver`` picks the sparse backend (see :func:`femorph_solver.solvers.linear.list_linear_solvers`). Parameters ---------- linear_solver : str, default ``"auto"`` Sparse linear backend name. progress : bool, default ``False`` Show a per-stage progress display (``rich`` → ``tqdm`` → logger fallback). Off by default for scripted use; the CLI front-end flips it on. """ from femorph_solver._progress import ProgressReporter # noqa: PLC0415 from femorph_solver.solvers.static import solve_static as _solve_static # noqa: PLC0415 with ProgressReporter(enabled=progress) as report: with report.stage("assemble K"): K = self.stiffness_matrix() with report.stage("build load vector"): F = self._load_vector() prescribed = self._prescribed_dofs() with report.stage("factor + solve"): return _solve_static(K, F, prescribed, linear_solver=linear_solver)
[docs] def modal_solve( self, n_modes: int = 10, lumped: bool = False, *, eigen_solver: str = "arpack", linear_solver: str = "auto", tol: float = 1e-12, release_inputs: bool = True, mixed_precision: bool | None = False, progress: bool = False, ) -> ModalResult: """Run a linear modal analysis (MAPDL ``ANTYPE,MODAL`` equivalent). Solves ``K φ = ω² M φ`` with D-staged Dirichlet BCs applied. Returns the lowest ``n_modes`` modes, ordered by frequency. ``eigen_solver`` picks the sparse eigenbackend (``"arpack"``, ``"lobpcg"``, ``"primme"``, ``"dense"``); ``linear_solver`` picks the inner shift-invert factorizer. The default ``"auto"`` picks the fastest SPD direct solver available (pardiso → cholmod → umfpack → superlu); pass an explicit name (``"superlu"``, ``"cholmod"``, …) to override. See :func:`femorph_solver.solvers.eigen.list_eigen_solvers` and :func:`femorph_solver.solvers.linear.list_linear_solvers`. ``release_inputs=True`` (the **default**) drops the Model's cached full K / M the moment the BC-reduced ``K_ff`` / ``M_ff`` are built, and before the Pardiso factor allocates. On a 1.2 M-DOF SOLID186 problem that saves ~2 GB of peak RSS (the full K and M each run ~1 GB + scaling with density). Time cost: negligible on the first call — CPython's refcount frees the full K / M immediately when the BC-reduce helper returns (no ``gc.collect`` involved; the sparse-matrix graph has no cycles so a full GC would be pure overhead). On **repeat calls on the same Model** however the full K / M must be re-assembled from elements (~1-3 s at large sizes) because the cache was released; parametric sweeps should pass ``release_inputs=False`` to keep the warm-assembly cache. ``mixed_precision`` controls the inner Pardiso factor precision (ignored by non-Pardiso backends): * ``False`` (**default**) — factor and solve in double precision. Identical frequencies to MAPDL at machine-epsilon. * ``True`` — factor in single precision and refine with double-precision residuals. Halves the factor's permanent footprint when the linked MKL honours the request; the refinement typically recovers ≥13-digit eigenvalue accuracy on well-conditioned SPD stiffness matrices. * ``None`` — let femorph-solver decide: opts in for Pardiso SPD factorisation above ~50 k free DOFs, where the memory saving matters most. Not recommended for production models that are known to be ill-conditioned (e.g. highly anisotropic composites, degenerate meshes with near-zero-Jacobian cells). .. note:: Mixed-precision Pardiso depends on MKL exposing ``iparm(28)`` through pypardiso; some MKL builds silently no-op the request. Check ``lu._solver.get_iparm(28)`` after a factorize if you need to confirm the request was honoured. """ from femorph_solver._progress import ProgressReporter # noqa: PLC0415 with ProgressReporter(enabled=progress) as report: if release_inputs: return self._modal_solve_release( n_modes=n_modes, lumped=lumped, eigen_solver=eigen_solver, linear_solver=linear_solver, tol=tol, mixed_precision=mixed_precision, report=report, ) from femorph_solver.solvers.modal import solve_modal as _solve_modal # noqa: PLC0415 with report.stage("assemble K"): K = self.stiffness_matrix() with report.stage("assemble M"): M = self.mass_matrix(lumped=lumped) prescribed = self._prescribed_dofs() with report.stage("reduce BCs + eigensolve"): return _solve_modal( K, M, prescribed, n_modes=n_modes, eigen_solver=eigen_solver, linear_solver=linear_solver, tol=tol, mixed_precision=mixed_precision, )
def _modal_solve_release( self, *, n_modes: int, lumped: bool, eigen_solver: str, linear_solver: str, tol: float, mixed_precision: bool | None = False, report: object | None = None, ) -> ModalResult: """Peak-RSS-minimising modal solve — see ``modal_solve(release_inputs=True)``. Goes directly from elements to the BC-reduced ``K_ff`` / ``M_ff`` via a single element-level scatter pass per matrix (see :meth:`_bc_reduce_and_release`). The full ``K`` / ``M`` are never materialised, so peak assembly RSS is bounded by ``K_ff + M_ff`` rather than ``K + M + K_ff + M_ff``. When the resolved linear-solver backend is Pardiso, K_ff is assembled as upper-triangular only — halves its CSR storage and skips the duplicate ``sp.triu`` extraction Pardiso would otherwise make. Shift-invert ARPACK at σ=0 never calls ``K @ x`` (verified via scipy's internal path), so the triu storage doesn't break the eigensolve. """ from femorph_solver._progress import ProgressReporter # noqa: PLC0415 from femorph_solver.solvers.linear import select_default_linear_solver # noqa: PLC0415 from femorph_solver.solvers.modal import solve_modal_reduced # noqa: PLC0415 _null = ProgressReporter(enabled=False) report = report if report is not None else _null prescribed = self._prescribed_dofs() # Decide whether to assemble K_ff as triu-only. Needs: # - the linear solver to accept upper-triangular SPD input (only # Pardiso currently — CHOLMOD's wrapper does its own sp.tril); # - the eigensolver to be an ARPACK-flavour shift-invert at σ=0 # (the only mode that doesn't call ``K @ x``). _, _, N = self._dof_layout() n_free_est = N - len(prescribed) # upper bound; rigid-body reduction may shrink resolved_linear = ( select_default_linear_solver(spd=True, n_dof=n_free_est) if linear_solver == "auto" else linear_solver ) triu_k = resolved_linear == "pardiso" and eigen_solver in ("auto", "arpack") # M-as-triu is orthogonal to K — it only needs the eigen backend # to route M @ x through ``_maybe_parallel_matvec`` (which # detects ``femorph_triu`` and applies the symmetric wrapper). # The scipy dense path in ``solve_modal_reduced`` also honours # the tag via ``la.eigh(..., lower=False)``. Safe for every # eigen backend we ship. triu_m = eigen_solver in ("auto", "arpack") with report.stage("assemble K_ff + M_ff"): K_ff, M_ff, free = self._bc_reduce_and_release( prescribed, lumped=lumped, triu_k=triu_k, triu_m=triu_m ) with report.stage("factor + eigensolve"): return solve_modal_reduced( K_ff, M_ff, free, n_modes=n_modes, eigen_solver=eigen_solver, linear_solver=linear_solver, tol=tol, mixed_precision=mixed_precision, ) def _bc_reduce_and_release( self, prescribed: dict[int, float], *, lumped: bool, triu_k: bool = False, triu_m: bool = False, ) -> tuple[_sp.csr_array, _sp.csr_array, np.ndarray]: """Assemble BC-reduced K_ff / M_ff directly from element contributions. Builds the reduced matrices in one element-level scatter pass each — the full N×N K / M are never materialised, so peak RSS during assembly is roughly ``K_ff + M_ff`` instead of ``K + M + K_ff + M_ff``. On a 500 k-DOF plate that saves ~700 MB of peak. ``triu_k=True`` assembles K_ff as upper-triangular only (diagonal + strictly upper). Halves K's CSR footprint and lets the Pardiso SPD path skip its own ``sp.triu`` extraction. ``triu_m=True`` assembles M_ff as upper-triangular only too. Halves M's CSR footprint. Callers that need ``M @ x`` (ARPACK generalized shift-invert) must route through ``_maybe_parallel_matvec`` which detects the tag and applies ``y = M @ x + M.T @ x - diag(M) * x`` — the symmetric product, no lower-triangular mirror allocation. Only safe when M is truly symmetric (all consistent / lumped element-mass matrices on this codebase are). Rigid-body DOFs (zero diagonal stiffness, e.g. LINK180 transverse) are caught post-assembly via ``K_ff.diagonal()`` and dropped with a second ``csr_submatrix_free`` pass — the common case (SOLID186 / SOLID185 etc.) has none so the extra pass is skipped entirely. """ _, _layout, N = self._dof_layout() # Drop any stale cached K / M so the element-level build below # doesn't double-allocate alongside them. self.release_matrix_cache() mask_fixed = np.zeros(N, dtype=bool) for idx in prescribed: mask_fixed[int(idx)] = True free_pre = ~mask_fixed n_free_pre = int(free_pre.sum()) if n_free_pre == 0 or N == 0 or self._grid.n_cells == 0: empty = _sp.csr_array((0, 0), dtype=np.float64) return empty, empty, free_pre full_to_free = np.full(N, -1, dtype=np.int32) full_to_free[free_pre] = np.arange(n_free_pre, dtype=np.int32) K_ff = self._assemble("K", full_to_free=full_to_free, triu_only=triu_k) # K and M share element connectivity + the reduced-DOF mask, so # their sparsity patterns (row_ptr + col_idx) are identical when # both are built with the same ``triu_only`` flag. Pass K's # pattern arrays into M's build so M aliases them instead of # rebuilding its own copies — saves ~(n_free + nnz) × 4 B # (≈30 MB on a 200×200×2 triu SOLID185 plate). shared_pattern = (K_ff.indptr, K_ff.indices) if triu_k == triu_m else None M_ff = self._assemble( "M", lumped=lumped, full_to_free=full_to_free, triu_only=triu_m, pattern=shared_pattern, ) # Rigid-body pass — drop rows whose K diagonal is below a relative # pivot tolerance. Matches the tolerance from the old post-hoc # ``_bc_reduce`` path so parity with existing tests holds. The # diagonal is present in both full and triu CSR so this check works # regardless of ``triu_k``. k_diag = K_ff.diagonal() scale = float(np.max(np.abs(k_diag))) if k_diag.size else 1.0 tol_pivot = 1e-12 * scale if scale > 0 else 1e-30 rigid = np.abs(k_diag) <= tol_pivot if rigid.any(): n_free = int(n_free_pre - rigid.sum()) reduce_map = np.full(n_free_pre, -1, dtype=np.int32) reduce_map[~rigid] = np.arange(n_free, dtype=np.int32) K_ff = self._csr_submatrix(K_ff, reduce_map, n_free) M_ff = self._csr_submatrix(M_ff, reduce_map, n_free) if triu_k: # csr_submatrix_free preserves the triu-ness of the input # (it only picks rows/cols, no transpose), but the tagging # doesn't survive the construction of the new csr_array. K_ff.femorph_triu = True if triu_m: M_ff.femorph_triu = True # Propagate the rigid drop back into the full-DOF mask so the # free-mask returned to ``solve_modal_reduced`` reflects both # prescribed + rigid-body removals. pre_free_indices = np.where(free_pre)[0] mask_fixed[pre_free_indices[rigid]] = True # Drop the per-node DOF layout caches before the caller # allocates the Pardiso factor. These are pure memoization # (``{node: (offset, labels)}`` dicts, ~O(n_nodes) with dict # overhead) reconstructable from ``self._grid`` / # ``self._etypes`` in a few ms. At 100×100×2 SOLID185 they # hold ~18 MB; at 300×300×2, ~160 MB. Holding them through # the 1-3 GB Pardiso factor + eigsh shift-invert is dead # weight — the assembly that needed them is finished, and # neither Pardiso nor eigsh touches them. A subsequent # ``modal_solve`` on the same Model rebuilds the caches on # demand; the only cost is re-walking the grid once per # solve, which is a tiny fraction of a second. self._layout_cache = None self._node_dof_labels_cache = None # glibc arena hygiene before the caller (typically # ``solve_modal_reduced``) allocates the 1-3 GB Pardiso # Cholesky factor on top. The large transient scratches in # assembly (``ke_batch`` / ``me_batch`` at hundreds of MB, # ``_build_csr_pattern`` scratch at tens of MB) go through # ``mmap`` directly — those are already returned to the OS # on ``free()`` and ``malloc_trim`` can't touch them. What # IS sometimes reclaimable: smaller allocations (coords_arr # copies, dof-map scratch, scipy CSR temporaries) that land # in glibc's arena and sit there as cached chunks. On a # 200×200×2 SOLID185 plate this typically reclaims 5-15 MB # post-assembly — a small win but no-cost on platforms # where ``malloc_trim`` exists (Linux/glibc) and a no-op # everywhere else. Kept here as cheap hygiene, not a # headline optimisation: the real peak-RSS driver is the # Pardiso factor itself, which is attacked separately. from femorph_solver._malloc import trim_allocator # noqa: PLC0415 trim_allocator() return K_ff, M_ff, ~mask_fixed @staticmethod def _csr_submatrix(A: _sp.csr_array, full_to_free: np.ndarray, n_free: int) -> _sp.csr_array: """Extract ``A[keep][:, keep]`` in CSR using the C++ parallel path.""" ip, ix, d = _core.csr_submatrix_free( np.ascontiguousarray(A.indptr, dtype=np.int32), np.ascontiguousarray(A.indices, dtype=np.int32), np.ascontiguousarray(A.data, dtype=np.float64), full_to_free, n_free, ) return _sp.csr_array((d, ix, ip), shape=(n_free, n_free))
[docs] def transient_solve( self, *, dt: float, n_steps: int, load: Callable[[float], np.ndarray] | np.ndarray | None = None, lumped: bool = False, damping: _sp.csr_array | None = None, u0: np.ndarray | None = None, v0: np.ndarray | None = None, beta: float = 0.25, gamma: float = 0.5, ) -> TransientResult: """Run a linear transient analysis (MAPDL ``ANTYPE,TRANS`` equivalent). Integrates ``M ü + C u̇ + K u = F(t)`` with Newmark-β. The load ``F(t)`` defaults to the F-staged nodal forces held constant over the interval; pass a callable ``t -> (N,)`` for time-varying loads or a fixed array to override the staged values. """ from femorph_solver.solvers.transient import solve_transient as _solve_transient K = self.stiffness_matrix() M = self.mass_matrix(lumped=lumped) prescribed = self._prescribed_dofs() F = self._load_vector() if load is None else load return _solve_transient( K, M, F, dt=dt, n_steps=n_steps, prescribed=prescribed, C=damping, u0=u0, v0=v0, beta=beta, gamma=gamma, )
# --------------------------------------------------------------------- # Post-processing # ---------------------------------------------------------------------
[docs] def eel( self, u: np.ndarray, *, nodal_avg: bool = True, ) -> np.ndarray | dict[int, np.ndarray]: """Elastic strain from a displacement vector (MAPDL ``S,EPEL`` equivalent). Strain is evaluated at each element's own nodes by ε(ξ_node) = B(ξ_node)·u_e — direct nodal evaluation rather than Gauss extrapolation, but the two agree exactly for uniform-strain fields and converge on mesh refinement. Output is 6-component Voigt with engineering shears ``[εxx, εyy, εzz, γxy, γyz, γxz]``. Parameters ---------- u : (N,) float64 Global displacement vector indexed by :meth:`dof_map` — i.e. the :attr:`StaticResult.displacement` from :meth:`solve` or a mode shape from :meth:`modal_solve`. nodal_avg : bool, default True If ``True`` (MAPDL ``PLNSOL`` default), values at shared nodes are averaged across every element touching them — returns a ``(n_nodes_global, 6)`` array indexed by :attr:`node_numbers`. If ``False``, returns the per-element dict ``{elem_num: (n_nodes_in_elem, 6)}`` — unaveraged, like ``PLESOL``. """ _ = self.grid node_nums_sorted, layout, N = self._dof_layout() u = np.ascontiguousarray(np.asarray(u, dtype=np.float64).ravel()) if u.shape[0] != N: raise ValueError(f"u has shape {u.shape}, expected ({N},) DOFs") if N == 0 or self._grid.n_cells == 0: return np.zeros((0, 6), dtype=np.float64) if nodal_avg else {} ( elem_nums_arr, n_nodes_per_elem, offsets, conn, et_ids, _mat_ids, _real_ids, pt_nums, points, ) = self._grid_element_arrays() # Group by (et_id, n_nodes) — material / real don't affect strain but # degenerate topologies need their own kernel. groups: dict[tuple[int, int], list[int]] = {} for i in range(et_ids.size): groups.setdefault((int(et_ids[i]), int(n_nodes_per_elem[i])), []).append(i) # Per-group (element_nums, nns_mapdl, eps) — nns_mapdl[i] is the # MAPDL node numbers for the strain rows in eps[i]; used downstream # for nodal averaging. per_group: list[tuple[np.ndarray, np.ndarray, np.ndarray]] = [] for (et_id, _nn), cell_idx_list in groups.items(): cls = self._kernel_for(et_id, _nn) batch_fn = getattr(cls, "eel_batch", None) if batch_fn is None: raise NotImplementedError(f"{cls.name}.eel_batch is not implemented") nn_per_elem = cls.n_nodes ndof_pn = cls.n_dof_per_node cell_idx = np.asarray(cell_idx_list, dtype=np.int32) base = offsets[cell_idx][:, None] per_elem_cols = np.arange(nn_per_elem, dtype=np.int32)[None, :] elem_vtk_ids = conn[base + per_elem_cols] coords_arr = points[elem_vtk_ids] nns_mapdl = pt_nums[elem_vtk_ids] # MAPDL node nums (n_grp, nn_per_elem) unique_vtk, inverse = np.unique(elem_vtk_ids.ravel(), return_inverse=True) unique_mapdl = pt_nums[unique_vtk] node_dofs = np.empty((unique_vtk.size, ndof_pn), dtype=np.int32) for u_idx, nn in enumerate(unique_mapdl): layout_base, labels = layout[int(nn)] for k, lbl in enumerate(cls.dof_labels): node_dofs[u_idx, k] = layout_base + labels.index(lbl) dofs_per_elem = node_dofs[inverse].reshape(cell_idx.size, nn_per_elem * ndof_pn) u_e = u[dofs_per_elem] eps = batch_fn(coords_arr, u_e) if eps is None: raise RuntimeError( f"C++ extension unavailable — {cls.name} has no Python strain fallback" ) per_group.append((elem_nums_arr[cell_idx], nns_mapdl, np.asarray(eps))) if not nodal_avg: out: dict[int, np.ndarray] = {} for enums, _nns, eps in per_group: for i, en in enumerate(enums): out[int(en)] = eps[i] return out # Nodal-averaged: sum contributions per node, divide by count. sums = np.zeros((node_nums_sorted.size, 6), dtype=np.float64) counts = np.zeros(node_nums_sorted.size, dtype=np.int64) for _enums, nns_arr, eps in per_group: flat_nn = nns_arr.ravel() flat_eps = eps.reshape(-1, 6) idx = np.searchsorted(node_nums_sorted, flat_nn) np.add.at(sums, idx, flat_eps) np.add.at(counts, idx, 1) avg = np.zeros_like(sums) mask = counts > 0 avg[mask] = sums[mask] / counts[mask, None] return avg