Source code for femorph_solver.solvers.eigen._arpack

"""ARPACK Implicitly Restarted Lanczos via ``scipy.sparse.linalg.eigsh``.

Shift-invert mode at σ = 0 finds the ``n_modes`` eigenvalues closest to
zero — exactly the lowest natural frequencies we want for modal
analysis.  The inner ``(K - σM)^-1`` apply goes through the pluggable
femorph-solver linear-solver registry so callers can pick SuperLU (default),
CHOLMOD, Pardiso, or anything else registered.

K·v and M·v inside ARPACK's generalized-eigenvalue iteration are
wrapped in a custom OpenMP parallel CSR matvec (``_core.csr_matvec``)
when the matrix is sufficiently large — scipy's built-in
``csr_matvec`` is single-threaded and costs ~13 % of wall time on a
600 k-DOF ptr-sized system.

References
----------
- Lehoucq, R. B. & Sorensen, D. C.  "Deflation Techniques for an
  Implicitly Restarted Arnoldi Iteration."  SIAM J. Matrix Anal.
  Appl. 17 (4), 789-821 (1996).  [implicit-restart Arnoldi
  algorithm at the core of ARPACK's ``eigsh``]
- Lehoucq, R. B., Sorensen, D. C., & Yang, C.  *ARPACK Users'
  Guide: Solution of Large-Scale Eigenvalue Problems with
  Implicitly Restarted Arnoldi Methods.*  SIAM (1998).
  [shipped reference for the ARPACK library scipy wraps,
  including the shift-invert mode used here]
- Ericsson, T. & Ruhe, A.  "The Spectral Transformation Lanczos
  Method for the Numerical Solution of Large Sparse Generalized
  Symmetric Eigenvalue Problems."  Math. Comp. 35 (152), 1251-1268
  (1980).  [spectral-transformation theory justifying the
  ``(K - σM)^-1`` shift-invert applied on top of Lanczos]
"""

from __future__ import annotations

from typing import ClassVar

import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg as spla

from ..linear import get_linear_solver
from ._base import EigenSolverBase

#: Problems with fewer non-zeros than this fall back to scipy's
#: single-threaded matvec — the thread-spawn overhead of our OpenMP
#: matvec dominates below ~1 M nnz.
_PARALLEL_MATVEC_NNZ_THRESHOLD = 1_000_000


def _maybe_parallel_matvec(A: sp.spmatrix | sp.sparray) -> spla.LinearOperator:
    """Wrap ``A`` in a LinearOperator that uses our parallel CSR matvec.

    Falls back to scipy's own matvec for small matrices where our
    OpenMP fork/join dominates.

    When ``A`` has the ``femorph_triu`` tag it's assumed to be a
    symmetric matrix stored as its upper triangle.  The returned
    operator then computes ``A_sym @ x`` via the single-pass C++
    kernel :func:`_core.csr_sym_matvec_triu` which touches each triu
    nnz exactly once — roughly 2× faster than the previous
    ``A @ x + A.T @ x - diag * x`` three-pass Python expression since
    the latter walked the same sparse storage three times.

    Crucially views ``A``'s indptr/indices/data arrays directly when
    they already have the expected dtype + contiguous layout (which
    is the case for everything the reduced assembler produces via
    ``_core.scatter_into_csr``).  The previous ``.astype(np.float64)``
    call was an unconditional copy — on a 160×160×2 HEX8 M_ff
    that's an extra 168 MB of peak RSS per call (and
    ``_maybe_parallel_matvec`` is called once for M during every
    modal solve).
    """
    from femorph_solver import _core

    n = A.shape[0]
    triu_tag = bool(getattr(A, "femorph_triu", False))

    if triu_tag and hasattr(_core, "csr_sym_matvec_triu"):
        # Single-pass C++ symmetric matvec from triu storage.  Pin the
        # indptr / indices / data references here so every matvec skips
        # the attribute lookup + dtype coercion on the hot path.
        A_csr = A if sp.isspmatrix_csr(A) or isinstance(A, sp.csr_array) else A.tocsr()
        indptr = np.ascontiguousarray(A_csr.indptr, dtype=np.int32)
        indices = np.ascontiguousarray(A_csr.indices, dtype=np.int32)
        data = A_csr.data
        if data.dtype != np.float64 or not data.flags.c_contiguous:
            data = np.ascontiguousarray(data, dtype=np.float64)

        def _sym_matvec(x: np.ndarray) -> np.ndarray:
            x = np.ascontiguousarray(x, dtype=np.float64).ravel()
            y = np.empty(n, dtype=np.float64)
            _core.csr_sym_matvec_triu(indptr, indices, data, x, y)
            return y

        return spla.LinearOperator((n, n), matvec=_sym_matvec, dtype=np.float64)

    if triu_tag:
        # Fallback for older _core builds without the C++ sym kernel:
        # scipy CSR + CSC matvec + diag subtract.
        A_csr = A if sp.isspmatrix_csr(A) or isinstance(A, sp.csr_array) else A.tocsr()
        A_csc = A_csr.T  # O(1) view on csr_array / csc_array
        diag = A_csr.diagonal()

        def _sym_matvec_fallback(x: np.ndarray) -> np.ndarray:
            x = np.ascontiguousarray(x, dtype=np.float64).ravel()
            y = A_csr @ x
            y += A_csc @ x
            y -= diag * x
            return y

        return spla.LinearOperator((n, n), matvec=_sym_matvec_fallback, dtype=np.float64)

    if not hasattr(_core, "csr_matvec") or A.nnz < _PARALLEL_MATVEC_NNZ_THRESHOLD:
        return A  # scipy handles its own matvec — fine for small systems.

    # csr_array already has indptr/indices/data attrs; tocsr() is a
    # no-op when A is already CSR.  Avoid ``.astype(copy=True)``.
    A_csr = A if sp.isspmatrix_csr(A) or isinstance(A, sp.csr_array) else A.tocsr()
    indptr = np.ascontiguousarray(A_csr.indptr, dtype=np.int32)
    indices = np.ascontiguousarray(A_csr.indices, dtype=np.int32)
    # Only cast when needed — ``np.ascontiguousarray`` returns the input
    # unchanged if the dtype + contiguity already match.
    data = A_csr.data
    if data.dtype != np.float64 or not data.flags.c_contiguous:
        data = np.ascontiguousarray(data, dtype=np.float64)

    def _matvec(x: np.ndarray) -> np.ndarray:
        x = np.ascontiguousarray(x, dtype=np.float64).ravel()
        y = np.empty(n, dtype=np.float64)
        _core.csr_matvec(indptr, indices, data, x, y)
        return y

    return spla.LinearOperator((n, n), matvec=_matvec, dtype=np.float64)


[docs] class ArpackSolver(EigenSolverBase): """scipy.sparse.linalg.eigsh with shift-invert, pluggable inner solver.""" name: ClassVar[str] = "arpack" kind: ClassVar[str] = "sparse-lanczos" spd_only: ClassVar[bool] = False install_hint: ClassVar[str] = "bundled with SciPy — no install needed"
[docs] @staticmethod def solve( K: sp.spmatrix | sp.sparray, M: sp.spmatrix | sp.sparray, n_modes: int, *, sigma: float = 0.0, linear_solver: str = "auto", tol: float = 0.0, mixed_precision: bool | None = False, v0: np.ndarray | None = None, ) -> tuple[np.ndarray, np.ndarray]: # Skip the full-matrix ``.tocsc()`` round-trip that the previous # implementation did — the Pardiso / CHOLMOD / umfpack SPD # factorisers all accept CSR and our CSR is already f64 + # contiguous from ``_core.csr_submatrix_free``. Eliding the # CSC copy saves ~1.2 GB on a 1.2 M-DOF problem. k_is_triu = bool(getattr(K, "femorph_triu", False)) if sigma != 0.0: if k_is_triu: raise ValueError( "ArpackSolver: sigma != 0 requires a full symmetric K; " "callers that built K_ff as triu (``femorph_triu=True``) " "must use sigma=0 shift-invert." ) A = K - sigma * M else: A = K Solver = get_linear_solver(linear_solver, n_dof=A.shape[0]) lu = Solver(A, assume_spd=True, mixed_precision=mixed_precision) # Drop the local ``A`` reference so the CSR triu kept by # ``lu._A`` is the only surviving copy; for sigma != 0 this # frees the ``K - σM`` temporary early (~1 GB). For sigma = 0 # this is a no-op since ``A is K``. del A OPinv = spla.LinearOperator(K.shape, matvec=lu.solve, dtype=np.float64) if sigma == 0.0: # Shift-invert at σ=0 never calls ``A @ x`` (verified against # scipy.sparse.linalg.eigsh's internal path). Passing a zero # LinearOperator lets us use a triu-only K for Pardiso without # accidentally computing triu(K) @ x when something else in the # call path tries to — and saves wrapping K in the parallel # matvec LinearOperator. n = K.shape[0] K_op = spla.LinearOperator( (n, n), matvec=lambda x: np.zeros_like(x, dtype=np.float64), dtype=np.float64, ) else: K_op = _maybe_parallel_matvec(K) M_op = _maybe_parallel_matvec(M) # ARPACK's Lanczos starts from a random vector by default; passing # a non-trivial ``v0`` (typically a previous mode shape from a # parametric sweep) can cut convergence iterations by 1.5–3× # when the new eigenvectors are close to the old ones. eigsh_kwargs = dict( k=n_modes, M=M_op, sigma=sigma, which="LM", OPinv=OPinv, tol=tol, ) if v0 is not None: v0_flat = np.ascontiguousarray(v0, dtype=np.float64).ravel() if v0_flat.size != K.shape[0]: raise ValueError(f"v0 must have length {K.shape[0]} (shape 1-D), got {v0.shape}") eigsh_kwargs["v0"] = v0_flat w, v = spla.eigsh(K_op, **eigsh_kwargs) order = np.argsort(w) return w[order], v[:, order]