Source code for femorph_solver.solvers.cyclic

"""Cyclic-symmetry modal analysis for a single base sector.

A rotor with ``N`` identical sectors spanning 360° satisfies, at each
*harmonic index* (nodal diameter) ``k ∈ {0, 1, …, N/2}``,

    u_sector_{j+1}(global) = e^{i k α} · R(α) · u_sector_j(global)

with ``α = 2π/N`` and ``R(α)`` the spatial rotation about the symmetry
axis.  Applied at the base-sector's high-angle face (which is identified
with sector 1's low-angle face), this gives the constraint

    u_high(global) = e^{i k α} · R(α) · u_low(global)

relating paired low-/high-face DOFs.  That constraint lets a modal solve
work on **one sector** and produce the full-rotor spectrum one k at a
time — an N-times speedup relative to meshing the whole rotor.

The reduction here mirrors the standard cyclic-symmetry reduction in its *complex* form.
Base-sector DOFs are partitioned into interior ``I`` and low-face ``L``
/ high-face ``H`` sets, with a one-to-one (low-node, high-node)
correspondence given by the user.  High DOFs are eliminated via the
constraint above; the remaining ``[u_I; u_L]`` vector satisfies a
Hermitian generalised eigenproblem

    K_red φ = ω² M_red φ

with ``K_red = P^H K P`` and ``M_red = P^H M P`` (K, M real SPD; P the
complex constraint matrix that embeds the reduced vector back into the
full one).  For k=0 and k=N/2 (even N) the phase is ±1 and the reduced
problem can be real; for intermediate k it is complex Hermitian.

Per-pair rotation
-----------------
Stiffness / mass are usually assembled in a global XYZ frame, which
means the cyclic phase relation picks up the same rotation that maps
sector j to sector j+1.  Pass ``pair_rotation`` as the ``(d, d)`` matrix
R(α) acting on each node pair's ``d`` translational DOFs (3 for a 3D
solid sweeping about z).  Omit it (``None``) when face DOFs are already
in a cylindrical / per-sector local frame, in which case R(α) reduces to
the identity.

Harmonic-index counting
-----------------------
- k=0            — single real eigenmodes (in-phase family)
- k=N/2 (even N) — single real eigenmodes (anti-phase family)
- 0 < k < N/2    — each eigenvalue corresponds to a degenerate pair in
  the full rotor (travelling-wave directions).  The complex reduced
  solve returns them once per k; the full-rotor multiset is recovered
  by counting them twice.

References
----------
- Thomas, D. L.  "Dynamics of rotationally periodic structures."
  International Journal for Numerical Methods in Engineering 14 (1),
  81-102 (1979).  [the foundational complex-valued cyclic reduction
  applied here — one base sector factored at each harmonic index
  ``k`` recovers the full-rotor spectrum]
- Wildheim, S. J.  "Vibrations of rotationally periodic
  structures."  Ph.D. dissertation, Linköping University (1979).
  [companion treatment of the constraint matrix ``P`` and its
  Hermitian reduction ``P^H K P``]
- Bathe, K.-J.  *Finite Element Procedures*, Prentice Hall (1996),
  §10.3.4 (cyclic symmetry).  [textbook derivation matching the
  partitioned ``[u_I; u_L]`` variable convention used below]
"""

from __future__ import annotations

from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any

import numpy as np
import scipy.linalg as la
import scipy.sparse as sp
import scipy.sparse.linalg as spla

from .._threads import limit as _thread_limit
from .linear import (
    LinearSolverBase,
    SolverUnavailableError,
    get_linear_solver,
    select_default_linear_solver,
)

if TYPE_CHECKING:
    from femorph_solver.model import Model

# Above this reduced-sector DOF count we switch from LAPACK dense eigh to
# sparse shift-invert eigsh.  The dense complex-Hermitian eigh allocates
# two ``n_keep²`` matrices (~16 GB real / 32 GB complex at n_keep = 45k);
# 1500 DOFs fits comfortably in memory while still leaving ample headroom
# for LAPACK's internal work arrays.
_DENSE_CUTOFF = 1500


def _have_primme() -> bool:
    """Return True if the PRIMME Python bindings can be imported.

    PRIMME exposes a Krylov-Schur / JDQMR-style eigen_solver that
    handles complex Hermitian generalised problems natively — no
    2n real augmentation needed.  On rotor-scale K_red this saves
    the ~2× factor in inner-product / matvec work that the scipy
    ARPACK augmented path pays per Lanczos iteration.
    """
    try:
        import primme  # noqa: F401, PLC0415

        return True
    except Exception:
        return False


_PRIMME_FALLBACK_WARNED = False


def _resolve_eigen_solver(eigen_solver: str) -> str:
    """Map ``"auto" | "scipy" | "primme"`` to a concrete backend name."""
    global _PRIMME_FALLBACK_WARNED
    if eigen_solver not in ("auto", "scipy", "primme"):
        raise ValueError(f"eigen_solver must be 'auto', 'scipy', or 'primme', got {eigen_solver!r}")
    if eigen_solver == "scipy":
        return "scipy"
    if eigen_solver == "primme":
        if not _have_primme():
            raise ImportError(
                "eigen_solver='primme' requested but the 'primme' package is "
                "not installed.  Install via `pip install primme` (or use the "
                "shell-primme.nix env in the repo root for NixOS) and retry."
            )
        return "primme"
    # auto
    if _have_primme():
        return "primme"
    if not _PRIMME_FALLBACK_WARNED:
        import warnings  # noqa: PLC0415

        warnings.warn(
            "PRIMME not installed — falling back to scipy ARPACK for the "
            "complex Hermitian cyclic path.  Install `primme` for ~2× fewer "
            "matvecs per harmonic at rotor scale.",
            RuntimeWarning,
            stacklevel=4,
        )
        _PRIMME_FALLBACK_WARNED = True
    return "scipy"


[docs] @dataclass(frozen=True, slots=True) class CyclicModalResult: """Modes for one harmonic index of a cyclic-symmetry modal solve.""" harmonic_index: int #: Number of sectors in the full rotor (same for every result in a sweep). n_sectors: int #: (n_modes,) — ω² = (2πf)²; numerically negative values clipped to 0. omega_sq: np.ndarray #: (n_modes,) — cyclic frequencies f [Hz]. frequency: np.ndarray #: (N, n_modes) — complex mode shapes on the base sector, indexed by #: the original DOF layout (not the reduced layout). For k=0 and #: k=N/2 the imaginary part is identically zero. mode_shapes: np.ndarray #: (3,) — point on the rotor rotation axis. ``None`` means "not #: recorded by the caller"; writers default to the origin in that #: case. Populated by :func:`solve_cyclic` when a #: ``pair_rotation`` is supplied. axis_point: np.ndarray | None = None #: (3,) — unit vector along the rotor axis. ``None`` means "not #: recorded"; writers default to ``+Z``. Populated by #: :func:`solve_cyclic` from the supplied ``pair_rotation``. axis_dir: np.ndarray | None = None
[docs] def save_cyclic_modal_results( results: Sequence[CyclicModalResult], path: str | Path, model: Model, *, unit_system: str = "SI", extra_field_data: dict[str, np.ndarray] | None = None, ) -> Path: """Serialise a cyclic-modal sweep to a single disk-backed ``.pv`` file. Parameters ---------- results : sequence of CyclicModalResult Per-harmonic-index results as returned by :func:`solve_cyclic`. All entries must share the same ``n_sectors``. path : str or pathlib.Path Destination file. model : femorph_solver.model.Model Model whose K / M matrices this sweep was computed on; supplies the mesh and DOF layout. extra_field_data : mapping, optional Extra ``field_data`` entries (e.g. solver statistics). Returns ------- pathlib.Path Canonicalised absolute path to the written file. Notes ----- The DOF-indexed ``(N, n_modes)`` complex mode shapes on each per-``k`` result get re-projected onto per-``(k, mode)``, per-node ``(n_modes, n_points, 6)`` complex arrays — the layout :func:`~femorph_solver.result.cyclic_modal.write_cyclic_modal_result` expects. """ if not results: raise ValueError("results must be non-empty") n_sectors = {int(r.n_sectors) for r in results} if len(n_sectors) != 1: raise ValueError(f"all results must share the same n_sectors; saw {sorted(n_sectors)}") (n_sectors_value,) = n_sectors from femorph_solver.result._material import MaterialTable from femorph_solver.result.cyclic_modal import write_cyclic_modal_result grid = model.grid dof_map = model.dof_map() node_nums = np.asarray(grid.point_data["ansys_node_num"], dtype=np.int64) node_to_row = {int(n): i for i, n in enumerate(node_nums)} node_row = np.fromiter( (node_to_row[int(n)] for n in dof_map[:, 0]), dtype=np.int64, count=dof_map.shape[0], ) comp = dof_map[:, 1].astype(np.int64) modes_by_k: dict[int, tuple[np.ndarray, np.ndarray]] = {} for r in results: n_modes = int(r.mode_shapes.shape[1]) shapes = np.zeros((n_modes, grid.n_points, 6), dtype=np.complex128) # Scatter each mode's DOF-indexed vector onto (node, component). for k_mode in range(n_modes): shapes[k_mode, node_row, comp] = r.mode_shapes[:, k_mode] modes_by_k[int(r.harmonic_index)] = ( np.asarray(r.frequency, dtype=np.float64), shapes, ) # Collect the rotor axis stamp. Every result in a sweep comes # from one cyclic solve so their axis info must agree; just take # the first one that actually carries the stamp. axis_point: np.ndarray | None = None axis_dir: np.ndarray | None = None for r in results: if r.axis_point is not None and axis_point is None: axis_point = np.asarray(r.axis_point, dtype=np.float64) if r.axis_dir is not None and axis_dir is None: axis_dir = np.asarray(r.axis_dir, dtype=np.float64) material_table = MaterialTable.from_model(model, unit_system=str(unit_system)) return write_cyclic_modal_result( path, grid, n_sectors_value, modes_by_k, axis_point=axis_point, axis_dir=axis_dir, material_table=material_table, unit_system=unit_system, extra_field_data=extra_field_data, )
def _harmonic_indices(n_sectors: int) -> list[int]: return list(range(n_sectors // 2 + 1)) # Module-global container that ``_dispatch_parallel_harmonics`` populates # *before* the worker pool forks. POSIX ``fork`` snapshots the parent's # address space copy-on-write, so children see the populated dict without # needing any pickle round-trip on K, M, or the cyclic-reduction plans. # Workers read from this container in :func:`_parallel_worker_solve_subset`. _PARALLEL_HARMONIC_STATE: dict = {} def _dispatch_parallel_harmonics( *, K, # noqa: ANN001 — scipy sparse passthrough M, # noqa: ANN001 low_node_dofs, # noqa: ANN001 high_node_dofs, # noqa: ANN001 n_sectors, n_modes, pair_rotation, harmonic_indices, prescribed, thread_limit, linear_solver, report, parallel_harmonics, eigen_solver: str = "auto", ): """Distribute the harmonic sweep across worker processes. Each worker runs ``solve_cyclic`` (serial path) on a contiguous subset of harmonic indices. Subsets are interleaved (round-robin) so adjacent-k harmonics tend to land in the same worker, preserving the warm-start v0 chain *within* each worker's run. The current process must be on a fork-capable platform (Linux); elsewhere we fall back to the serial path with a warning. """ import multiprocessing as mp # noqa: PLC0415 import platform # noqa: PLC0415 import warnings # noqa: PLC0415 from concurrent import futures as _cf # noqa: PLC0415 if platform.system() != "Linux": warnings.warn( "parallel_harmonics ignored: process pool requires POSIX fork", RuntimeWarning, stacklevel=3, ) # Fall through to serial by recursing without the parallel arg. return solve_cyclic( K, M, low_node_dofs, high_node_dofs, n_sectors, n_modes, pair_rotation=pair_rotation, harmonic_indices=harmonic_indices, prescribed=prescribed, thread_limit=thread_limit, linear_solver=linear_solver, report=report, eigen_solver=eigen_solver, ) if harmonic_indices is None: harmonic_indices = _harmonic_indices(n_sectors) k_list = list(harmonic_indices) n_workers = min(int(parallel_harmonics), len(k_list)) if n_workers <= 1: return solve_cyclic( K, M, low_node_dofs, high_node_dofs, n_sectors, n_modes, pair_rotation=pair_rotation, harmonic_indices=k_list, prescribed=prescribed, thread_limit=thread_limit, linear_solver=linear_solver, report=report, eigen_solver=eigen_solver, ) # Threads-per-worker budget: split ``thread_limit`` across the pool # so the apples-to-apples thread count stays at ``thread_limit``. # ``None`` means "no global cap"; default to 4 to match the typical # MAPDL ``-np 4`` configuration we benchmark against. total_threads = thread_limit if thread_limit is not None else 4 threads_per_worker = max(1, total_threads // n_workers) # Memory headroom check (best effort — ``psutil`` is optional). # Empirical model from the pbs-sector-hd reference rotor (488 k # complex DOFs, K.nnz ≈ 75 M): Pardiso ``mtype=-4`` factor peak + # working memory + ARPACK basis runs ~20 GiB per worker, dominated # by the L factor's fill-in (~17 × nnz_triu × complex128 entries). # Linearise to ~280 B per K.nnz + a 4 GiB Python/state floor. # Conservative — we'd rather warn early than OOM mid-sweep. try: import psutil # noqa: PLC0415 est_per_worker_gb = (K.nnz * 280) / (1024**3) + 4.0 avail_gb = psutil.virtual_memory().available / (1024**3) est_total_gb = est_per_worker_gb * n_workers if est_total_gb > 0.85 * avail_gb: warnings.warn( f"parallel_harmonics={n_workers} estimated peak " f"~{est_total_gb:.1f} GiB ({est_per_worker_gb:.1f} GiB / worker) " f"vs {avail_gb:.1f} GiB available — may OOM. Lower " f"parallel_harmonics or run serially (parallel_harmonics=None).", RuntimeWarning, stacklevel=3, ) except Exception: pass # psutil missing — skip the soft check # Round-robin partition: groups[w] = [k_list[w], k_list[w + n_workers], ...] # Keeps adjacent-k clustering roughly even and lets each worker's # internal sweep see warm-startable transitions. groups: list[list[int]] = [[] for _ in range(n_workers)] for i, k in enumerate(k_list): groups[i % n_workers].append(k) # Pre-populate the fork-globals so workers see K/M/etc without # paying the pickle cost. These objects are read-only in the # workers; the COW copy-on-fork keeps physical pages shared. _PARALLEL_HARMONIC_STATE.update( K=K, M=M, low_node_dofs=low_node_dofs, high_node_dofs=high_node_dofs, n_sectors=n_sectors, n_modes=n_modes, pair_rotation=pair_rotation, prescribed=prescribed, thread_limit=threads_per_worker, linear_solver=linear_solver, eigen_solver=eigen_solver, ) try: ctx = mp.get_context("fork") with _cf.ProcessPoolExecutor( max_workers=n_workers, mp_context=ctx, initializer=_parallel_worker_init, initargs=(threads_per_worker,), ) as pool: futs = {pool.submit(_parallel_worker_solve_subset, g): g for g in groups} collected: dict[int, CyclicModalResult] = {} for fut in _cf.as_completed(futs): for r in fut.result(): collected[int(r.harmonic_index)] = r finally: _PARALLEL_HARMONIC_STATE.clear() return [collected[int(k)] for k in k_list] def _parallel_worker_init(threads_per_worker: int) -> None: """Per-worker MKL/OMP thread cap (replicated in each forked child).""" import os # noqa: PLC0415 os.environ["MKL_NUM_THREADS"] = str(threads_per_worker) os.environ["OMP_NUM_THREADS"] = str(threads_per_worker) os.environ["OPENBLAS_NUM_THREADS"] = str(threads_per_worker) def _parallel_worker_solve_subset(harmonic_subset: list[int]) -> list[CyclicModalResult]: """Worker entry point — runs the serial cyclic-modal solver on a subset.""" state = _PARALLEL_HARMONIC_STATE return solve_cyclic( state["K"], state["M"], state["low_node_dofs"], state["high_node_dofs"], state["n_sectors"], state["n_modes"], pair_rotation=state["pair_rotation"], harmonic_indices=harmonic_subset, prescribed=state["prescribed"], thread_limit=state["thread_limit"], linear_solver=state["linear_solver"], report=None, # progress reporters don't cross process boundaries eigen_solver=state.get("eigen_solver", "auto"), )
[docs] def solve_cyclic( K: sp.spmatrix | sp.sparray, M: sp.spmatrix | sp.sparray, low_node_dofs: np.ndarray, high_node_dofs: np.ndarray, n_sectors: int, n_modes: int = 10, *, pair_rotation: np.ndarray | None = None, harmonic_indices: Sequence[int] | None = None, prescribed: dict[int, float] | None = None, thread_limit: int | None = None, linear_solver: str = "auto", report: Any | None = None, parallel_harmonics: int | None = None, eigen_solver: str = "auto", eigensolver: str | None = None, ) -> list[CyclicModalResult]: """Solve the cyclic-symmetry modal problem on a single base sector. Parameters ---------- K, M : scipy.sparse Global stiffness / mass of the **base sector**, before cyclic or Dirichlet reduction. Assumed real SPD. low_node_dofs, high_node_dofs : (P, d) int arrays Global DOF indices at ``P`` paired boundary nodes, each with ``d`` DOFs (typically 3 for a 3D solid). ``low_node_dofs[i, :]`` and ``high_node_dofs[i, :]`` must address the same physical DOFs (same labels, same order) at the node pair that's identified by the cyclic BC. A 1-D array is accepted for scalar (``d=1``) problems. n_sectors : int Number of repetitions that close the rotor (``N``). n_modes : int, default 10 Number of lowest modes per harmonic index. pair_rotation : (d, d) array, optional Spatial rotation R(α) that maps sector j's local frame to sector j+1's, applied per node pair. Pass ``None`` when face DOFs are already in a per-sector local frame (then R=I implied). harmonic_indices : sequence of int, optional Harmonic indices to solve. Defaults to ``0, 1, …, N//2``. prescribed : mapping, optional Dirichlet BCs on the sector (global DOF index → 0). Currently only zero-value Dirichlet is supported; if a cyclic-face DOF is prescribed its partner must also be prescribed (so the cyclic constraint is trivially satisfied at that DOF). thread_limit : int or None, optional Cap on BLAS / OpenMP threads used inside the per-harmonic dense Hermitian eigensolve. ``None`` defers to the global limit (see :func:`femorph_solver.set_thread_limit`). linear_solver : str, default ``"auto"`` Name of the registered linear backend used for the inner shift-invert factorisation in the sparse path. ``"auto"`` picks the fastest available SPD direct solver in the priority chain ``pardiso → cholmod → umfpack → superlu`` and emits a one-shot warning if pardiso is missing on a problem large enough to benefit from it. See :func:`femorph_solver.solvers.linear.list_linear_solvers`. report : optional In-band :class:`femorph_solver._progress.ProgressReporter`-like object (anything exposing a ``stage(name)`` context manager). Set by :meth:`CyclicModel.solve_modal` so this lower-level entry point can announce its planning + per-harmonic stages on the same reporter the caller already opened; pass ``None`` (the default) to disable progress at this layer. parallel_harmonics : int or None, optional When > 1, distribute the harmonic sweep across this many worker processes (POSIX ``fork`` start method). Each worker runs a contiguous subset of harmonics serially and gets a fair share of the ``thread_limit`` budget for its inner MKL Pardiso solve (``threads_per_worker = max(1, thread_limit // parallel_harmonics)``). Trades the per-sweep symbolic-factor reuse (Pardiso phase 11 is no longer shared across harmonics in different workers) and warm-start v0 chaining across the worker boundary for parallelism; a net win only when the eigsh time per harmonic dominates the analysis cost (typically true at rotor scale). ``None`` (default) runs the sweep serially in the current process. Memory: each worker holds its own Pardiso factor (~10 GB at 488 k complex DOFs). Callers should size ``parallel_harmonics`` so that ``parallel_harmonics × peak_factor_RSS`` fits in available RAM with margin — the dispatcher emits a warning when the headroom looks tight but does not refuse to run. Linux only; on other platforms the parameter is ignored (no ``fork``). eigen_solver : {"auto", "scipy", "primme"}, default "auto" Inner sparse eigen-backend for the per-harmonic reduced eigenproblem ``K_red φ = ω² M_red φ``. Same kwarg name as :meth:`~femorph_solver.Model.solve_modal` and :func:`solve_modal`. * ``"scipy"`` — scipy ARPACK shift-invert. For complex Hermitian harmonics this routes through a 2n real augmentation (the only way scipy's eigsh handles complex matrices), which doubles the matvec work and the basis storage per Lanczos iteration. * ``"primme"`` — PRIMME's Krylov-Schur / JDQMR solver, which handles complex Hermitian generalised problems natively without augmentation. ~2× fewer matvecs and ~30% fewer OPinv (Pardiso) calls vs scipy ARPACK at rotor scale — the biggest single algorithmic lever for closing the gap to MAPDL on multi-harmonic rotor sweeps. Requires the optional ``primme`` Python package; fall back to scipy with a one-shot RuntimeWarning if missing. * ``"auto"`` (default) — use PRIMME when available, scipy otherwise. Real harmonics (k=0, k=N/2) always use scipy since their real-symmetric path doesn't benefit. eigensolver : str, optional Deprecated alias for ``eigen_solver``. Accepted with a ``DeprecationWarning`` for one minor release. Returns ------- list[CyclicModalResult] One :class:`CyclicModalResult` per requested harmonic index, in the order requested. """ if eigensolver is not None: import warnings # noqa: PLC0415 warnings.warn( "The ``eigensolver=`` kwarg on solve_cyclic_modal is deprecated; " "use ``eigen_solver=`` to match the rest of the modal-family API. " "Will be removed in the next minor release.", DeprecationWarning, stacklevel=2, ) eigen_solver = eigensolver if n_sectors < 2: raise ValueError(f"n_sectors must be >= 2, got {n_sectors}") if parallel_harmonics is not None and parallel_harmonics > 1: return _dispatch_parallel_harmonics( K=K, M=M, low_node_dofs=low_node_dofs, high_node_dofs=high_node_dofs, n_sectors=n_sectors, n_modes=n_modes, pair_rotation=pair_rotation, harmonic_indices=harmonic_indices, prescribed=prescribed, thread_limit=thread_limit, linear_solver=linear_solver, report=report, parallel_harmonics=int(parallel_harmonics), eigen_solver=eigen_solver, ) # Resolve the eigen_solver choice. ``"auto"`` prefers PRIMME for the # complex Hermitian path (~2× fewer matvecs vs scipy's augmentation) # but falls back to scipy when PRIMME isn't installed — keep the # warning to a one-shot to avoid spamming long sweeps. eigen_solver_resolved = _resolve_eigen_solver(eigen_solver) low = np.atleast_2d(np.asarray(low_node_dofs, dtype=np.int64)) high = np.atleast_2d(np.asarray(high_node_dofs, dtype=np.int64)) if low.shape != high.shape: raise ValueError( f"low_node_dofs and high_node_dofs must have the same shape, " f"got {low.shape} vs {high.shape}" ) if low.ndim != 2: raise ValueError(f"node-DOF arrays must be 2-D (P, d), got shape {low.shape}") P, d = low.shape if pair_rotation is None: R = np.eye(d) else: R = np.asarray(pair_rotation, dtype=np.float64) if R.shape != (d, d): raise ValueError(f"pair_rotation must be ({d}, {d}), got {R.shape}") if harmonic_indices is None: harmonic_indices = _harmonic_indices(n_sectors) N = K.shape[0] if M.shape != K.shape: raise ValueError(f"K and M must have the same shape, got {K.shape} vs {M.shape}") fixed_mask = np.zeros(N, dtype=bool) if prescribed: for idx, val in prescribed.items(): if float(val) != 0.0: raise NotImplementedError( "non-zero Dirichlet BCs are not supported by the cyclic " "reduction yet — all prescribed values must be 0" ) fixed_mask[int(idx)] = True low_flat = low.ravel() high_flat = high.ravel() lo_fixed = fixed_mask[low_flat] hi_fixed = fixed_mask[high_flat] if (lo_fixed != hi_fixed).any(): raise ValueError( "cyclic face Dirichlet is only allowed when both low and high " "partners are fixed to zero" ) face_mask = np.zeros(N, dtype=bool) face_mask[high_flat] = True # eliminated DOFs # Reduced DOF ordering: every DOF that is neither fixed nor on the # high face keeps an index. Low-face DOFs are the "masters". keep = ~(fixed_mask | face_mask) n_keep = int(keep.sum()) if n_keep == 0: raise ValueError("no free DOFs remain after cyclic + Dirichlet reduction") red_of_global = -np.ones(N, dtype=np.int64) red_of_global[keep] = np.arange(n_keep) # Precompute the per-pair dense block ``T · R`` as a function of phase. # The constraint is u_high[i, :] = T · R · u_low[i, :] (vector of d). # That's equivalent to building an auxiliary (L_flat × n_keep) sparse # matrix S with entries S[high_dof, reduced-index-of-low-dof] = T·R[a,b]. # Plan the reduction topology once per input matrix. The per-harmonic # cost then collapses to three sparse scalings + two adds (see # :class:`_CyclicReductionPlan`) instead of a full Python-level COO walk # per k. On an N-sector sweep this replaces ``2 × (N/2 + 1)`` COO walks # with just two, which is an order of magnitude less Python overhead # for rotor-scale sectors. from femorph_solver._progress import ProgressReporter # noqa: PLC0415 _stage_owner: ProgressReporter | None = None if report is None: _stage_owner = ProgressReporter(enabled=False) _stage_owner.__enter__() report = _stage_owner with report.stage( f"plan cyclic reduction (P={P} pairs, keep={n_keep:,}/{N:,} DOFs, K nnz={K.nnz:,})" ): K_plan = _plan_cyclic_reduction( K.tocoo(), red_of_global, fixed_mask, face_mask, low, high, R ) M_plan = _plan_cyclic_reduction( M.tocoo(), red_of_global, fixed_mask, face_mask, low, high, R ) # Extract the rotor axis from the caller's ``pair_rotation`` so # the result can travel with its geometry stamp. For d != 3 this # is a no-op (the axis concept is 3D-specific). axis_point_stamp: np.ndarray | None = None axis_dir_stamp: np.ndarray | None = None if pair_rotation is not None and d == 3: from femorph_solver.result._cyclic_expand import axis_from_rotation axis_dir_stamp, _sector_angle = axis_from_rotation(R.real.astype(np.float64)) # The axis passes through the origin in the global frame unless # the caller has an asymmetric mesh. We default to origin and # let the Model / writer override if needed. axis_point_stamp = np.zeros(3, dtype=np.float64) results: list[CyclicModalResult] = [] alpha = 2.0 * np.pi / n_sectors k_list = list(harmonic_indices) n_k = len(k_list) # Persist the LU across same-dtype harmonics so its symbolic factor # (Pardiso phase 11) is reused — only the numeric values change. # We track a separate cache slot per dtype so a real → complex → # real sweep can reuse each side's analysis. When Pardiso isn't # installed the complex path falls back to ``spla.splu`` and # returns ``None`` (no symbolic reuse hook in scipy). cached_lu: LinearSolverBase | None = None # Warm-start: previous complex-harmonic's first eigenvector, # projected onto the (2n,) real augmentation that ARPACK works # on. Cyclic eigenmodes vary continuously with k, so seeding the # next complex eigsh with the previous one's mode 0 saves outer # restart cycles. Reset to ``None`` whenever we cross a real ↔ # complex boundary (different vector shapes / no useful relation). prev_v0_complex_aug: np.ndarray | None = None # Multi-vector warm-start for PRIMME: full ``(n_keep, n_req)`` complex # eigenbasis from the previous complex harmonic. PRIMME's # ``primme.eigsh(v0=...)`` accepts a 2-D seed array and uses the # full subspace as initial guesses, which converges in dramatically # fewer iterations than seeding from a single mode. The scipy # augmented path can only consume a 1-D v0, so it falls through to # the existing single-vector ``prev_v0_complex_aug`` chain. prev_v0_complex_full: np.ndarray | None = None try: get_linear_solver("pardiso") pardiso_available = True except SolverUnavailableError: # Pardiso registered but not installed — the complex path will # fall back to ``spla.splu``. pardiso_available = False with _thread_limit(thread_limit): for kpos, k in enumerate(k_list, start=1): T = complex(np.exp(1j * k * alpha)) is_real = int(k) == 0 or (n_sectors % 2 == 0 and int(k) == n_sectors // 2) # Snap to exact ±1 for real harmonics so _CyclicReductionPlan.apply() # hits its fast-path equality checks and K_red stays purely real # (floating-point noise in exp(iπ) would otherwise amplify to # ~1e-7 imaginary parts in a GPa stiffness matrix, causing # _is_real_harmonic to fail and routing k=N/2 through the slow # complex SuperLU path). if is_real: T = complex(round(T.real), 0.0) TR = T * R.astype(np.complex128) phase_kind = "real (k=0)" if int(k) == 0 else "real (k=N/2)" if is_real else "complex" # Drop the cached factor when its dtype no longer matches # the harmonic about to run — keeping a wrong-dtype solver # alive wastes MKL's factor handle. if cached_lu is not None: cache_is_complex = bool(getattr(cached_lu, "_complex", False)) if cache_is_complex == is_real: # mismatch if hasattr(cached_lu, "release"): cached_lu.release() cached_lu = None n_req = min(n_modes, n_keep) if not is_real and not pardiso_available: cache_tag = "n/a (complex SuperLU)" elif cached_lu is None: cache_tag = "fresh" else: cache_tag = "reuse-symbolic" with report.stage( f"k={int(k)} [{kpos}/{n_k}] reduce + eigsh " f"(n_modes={n_req}, phase={phase_kind}, " f"linear={linear_solver}, cache={cache_tag})" ): # On the complex Pardiso + scipy ARPACK fast-path, # K_red full materialisation is dead weight: K_triu # drives the factor and scipy mode 3 ARPACK never # calls A.matvec on K_op (see :func:`_unreachable_matvec`). # PRIMME, in contrast, *does* matvec the system matrix # (K_red itself, not the augmentation), so we must # materialise K_red when ``eigen_solver_resolved == "primme"``. # Small problems (n_keep <= _DENSE_CUTOFF) take the # dense LAPACK path which always needs the full K_red. skip_K_red = ( not is_real and pardiso_available and n_keep > _DENSE_CUTOFF and eigen_solver_resolved != "primme" ) K_red = None if skip_K_red else K_plan.apply(T) M_red = M_plan.apply(T) # For complex Hermitian harmonics that route to Pardiso # mtype=-4, build the upper-triu K_input via the plan's # pre-computed triu buckets — saves an O(nnz) sp.triu # pass per harmonic on a 75 M-nnz K_red. Real harmonics # take the real-symmetric branch which doesn't need triu. K_triu = None if is_real or not pardiso_available else K_plan.apply_triu(T) # Hand the previous complex harmonic's eigenvector to # ARPACK as v0 — only on complex k, only when the # previous run was also complex (vector shape matches). v0_seed = None if is_real else prev_v0_complex_aug v0_full_seed = None if is_real else prev_v0_complex_full w, v, lu_out = _solve_reduced_eig( K_red, M_red, n_req, n_keep, linear_solver, cached_lu=cached_lu, K_triu=K_triu, v0_complex_aug=v0_seed, v0_complex_full=v0_full_seed, eigen_solver=eigen_solver_resolved, ) if lu_out is not None: cached_lu = lu_out # Stash the projected first eigenvector for the next # iteration if we're staying on the complex path. if not is_real and v.size > 0: v0_complex = v[:, 0] v0_aug = np.concatenate([v0_complex.real, v0_complex.imag]).astype( np.float64, copy=False ) nrm = float(np.linalg.norm(v0_aug)) prev_v0_complex_aug = v0_aug / nrm if nrm > 0 else None # Save the full (n_keep, n_req) complex basis for # the PRIMME path's multi-vector v0 warm-start. prev_v0_complex_full = v.astype(np.complex128, copy=False) else: prev_v0_complex_aug = None prev_v0_complex_full = None w_clean = np.where(w > 0, w, 0.0) phi = np.zeros((N, n_req), dtype=np.complex128) phi[keep] = v # Expand the eliminated high DOFs back: u_high[i,:] = T R · u_low[i,:]. u_low_blocks = phi[low_flat].reshape(P, d, n_req) u_high_blocks = np.einsum("ab,pbm->pam", TR, u_low_blocks) phi[high_flat] = u_high_blocks.reshape(P * d, n_req) results.append( CyclicModalResult( harmonic_index=int(k), n_sectors=n_sectors, omega_sq=w_clean, frequency=np.sqrt(w_clean) / (2.0 * np.pi), mode_shapes=phi, axis_point=axis_point_stamp, axis_dir=axis_dir_stamp, ) ) # Release MKL's factor handle (Pardiso holds the factor on a 64-slot # ``pt`` array that survives Python GC; ``release()`` runs phase -1). if cached_lu is not None and hasattr(cached_lu, "release"): cached_lu.release() if _stage_owner is not None: _stage_owner.__exit__(None, None, None) return results
def _is_real_harmonic(A: sp.csr_array, atol: float = 1e-14) -> bool: """Return True if A's sparse data is (numerically) real. For k=0 the phase factor ``T = e^{ikα}`` is ``1`` and K_red / M_red have identically zero imaginary parts. For k=N/2 on even N the phase is ``-1`` — again real. Other harmonics produce complex Hermitian reductions. We look at the actual data rather than the harmonic index so the caller can pass any sparse matrix without having to re-derive the index. """ data = A.data return bool(np.all(np.abs(data.imag) <= atol)) def _hermitian_to_real_augment(A: sp.csr_array) -> sp.csr_array: """Embed a complex-Hermitian sparse ``A`` (n×n) as real-symmetric (2n×2n). For Hermitian ``A = A_re + i A_im`` (A_re symmetric, A_im antisymmetric) the block matrix A_aug = [[A_re, -A_im], [A_im, A_re]] is real-symmetric of size 2n×2n, and its spectrum is ``{λ}`` from A's spectrum but with each eigenvalue repeated twice (eigenvectors ``[Re(v); Im(v)]`` and ``[-Im(v); Re(v)]``). This lets us solve Hermitian generalised eigenproblems with ``scipy.sparse.linalg.eigsh`` (real-symmetric only) without pulling in PRIMME. """ # Build the 2×2 block via ``sp.bmat`` — ~2× faster than the previous # 4-way COO concatenation + bool-mask filter on typical # cyclic-reduced nnz (the inner concatenation doubled every COO # entry; bmat keeps one buffer per block and reuses the input # indptr/indices verbatim). A_re and A_im inherit A's CSR pattern # directly — no tocoo round-trip — so the only real work is the # 2×2 block stitching inside scipy. A_re = sp.csr_array((A.data.real, A.indices, A.indptr), shape=A.shape) A_im = sp.csr_array((A.data.imag, A.indices, A.indptr), shape=A.shape) aug = sp.bmat([[A_re, -A_im], [A_im, A_re]], format="csr") # Drop explicit zeros so the factor doesn't walk dead slots. This is # relevant at diagonal of Re(A) (imag part structurally zero at # diagonals for Hermitian A), producing ~n explicit zeros in each # cross-diagonal block. aug.eliminate_zeros() return aug def _shift_invert_opinv( A: sp.csr_array | sp.csc_array, linear_solver: str, *, assume_spd: bool, cached_lu: LinearSolverBase | None = None, ) -> tuple[spla.LinearOperator, LinearSolverBase]: """Build the ``(K - σM)^-1`` operator used by ``eigsh`` shift-invert. Routes the inner factorisation through the registered linear-solver chain (pardiso → cholmod → umfpack → superlu for ``"auto"``), matching what ``ArpackSolver`` does for plain modal. Warns once per process if pardiso is missing and the problem is large enough to benefit. ``assume_spd`` controls the factorisation path: pass ``True`` only when the caller knows the reduced operator is strictly SPD (e.g. Dirichlet-constrained, sigma < 0). With ``False`` the auto-chain resolves to a general-purpose backend (pardiso/umfpack/superlu) that handles indefinite / semi-definite operators via LU — needed for free-free cyclic (K_red has a rigid-body null-space at k=0 and σ=0) and for the complex-Hermitian real augmentation. ``cached_lu`` lets sweep-style callers (cyclic harmonic loops, parametric modal sweeps where only ``A.data`` changes) hand back the previous solver so the analysis (Pardiso phase 11) is reused. Returned as the second element of the tuple — the caller is responsible for passing it back on the next call. """ A_csc = A.tocsc() if cached_lu is not None: cached_lu.refactor(A_csc) lu = cached_lu else: if linear_solver == "auto": resolved = select_default_linear_solver(spd=assume_spd, n_dof=A_csc.shape[0]) Solver = get_linear_solver(resolved) else: Solver = get_linear_solver(linear_solver, n_dof=A_csc.shape[0]) lu = Solver(A_csc, assume_spd=assume_spd) return ( spla.LinearOperator(A_csc.shape, matvec=lu.solve, dtype=A_csc.dtype), lu, ) def _complex_as_real_augmented_matvec(A: sp.csr_array) -> spla.LinearOperator: """Wrap a complex Hermitian ``A`` (n×n) as the 2n real augmentation matvec. The real 2n augmentation ``[[Re(A), -Im(A)], [Im(A), Re(A)]]`` acts on ``x = [u; v]`` (two real length-n halves) as ``[[Re(A), -Im(A)], [Im(A), Re(A)]] · [u; v] = [Re(A · (u + iv)); Im(A · (u + iv))]`` So ``eigsh`` gets the same 2n operator action from one complex CSR matvec on ``A`` — no need to materialise the 4-block real CSR. On a 45k-DOF rotor sector this saves ~200 MB of peak RSS per k (the augmentation has ~4× K_red's nnz) and the ``sp.bmat`` build time per matrix per harmonic. When ``femorph_solver._mkl`` finds ``mkl_rt`` (typically present via the ``pardiso`` extra), the complex matvec routes through MKL Sparse BLAS via :class:`~femorph_solver._mkl.ComplexSparseMatVec` — ~2.24× faster than scipy's pure-C complex sparse matmul at rotor scale (488 k DOFs / 25 M nnz). Without MKL the path falls back to ``A @ x``, so the SuperLU + scipy install keeps working. Pack/unpack uses the interleaved-view memcpy from ``_mkl.pack_complex`` / ``unpack_complex`` to avoid the ``u + 1j * v`` / ``np.concatenate([w.real, w.imag])`` temporaries. """ from femorph_solver import _mkl # noqa: PLC0415 n = A.shape[0] sparse_op = _mkl.ComplexSparseMatVec(A, expected_calls=128) z_buf = np.empty(n, dtype=np.complex128) sp_out_buf = np.empty(n, dtype=np.complex128) out_buf = np.empty(2 * n, dtype=np.float64) def _matvec(x: np.ndarray) -> np.ndarray: _mkl.pack_complex(x[:n], x[n:], out=z_buf) w = sparse_op.dot(z_buf, out=sp_out_buf) return _mkl.unpack_complex(w, out=out_buf) return spla.LinearOperator((2 * n, 2 * n), matvec=_matvec, dtype=np.float64) def _unreachable_matvec(_x: np.ndarray) -> np.ndarray: """Sentinel matvec for the K-side LinearOperator handed to scipy eigsh in shift-invert mode 3. scipy explicitly forbids a real matvec for ``A`` in this mode (only OPinv + M_matvec are touched). If this function ever fires, scipy's ARPACK dispatch has changed and the cyclic Pardiso path needs a real K matvec; raising loudly is better than silently corrupting eigenvalues. """ raise RuntimeError( "K_op matvec was called inside scipy eigsh — the cyclic complex " "Pardiso path assumes mode 3 ARPACK never touches A.matvec. " "Either scipy's ARPACK dispatch changed or this function was " "passed a non-mode-3 problem; rebuild K_red and use " "_complex_as_real_augmented_matvec." ) def _complex_opinv_for_augmented( K_red: sp.csr_array, *, cached_lu: LinearSolverBase | None = None, K_triu: sp.csr_array | None = None, ) -> tuple[spla.LinearOperator, LinearSolverBase | object]: """Build an OPinv on the real 2n augmentation via a *complex* factor of K_red. The ``eigsh`` shift-invert loop on the augmented pair ``([[Re,-Im],[Im,Re]], …)`` at ``σ=0`` needs ``K_aug^{-1} · x`` for each iteration. Writing ``x = [u; v]`` (two real length-n halves) and ``Z = K_red`` (complex Hermitian), a little algebra on the block form gives K_aug^{-1} · [u; v] = [Re(Z^{-1} (u + i v)); Im(Z^{-1} (u + i v))]. So one complex factor on K_red (size n) replaces a real factor on the full augmentation (size 2n) — cheaper to compute and apply. Backend dispatch: * **Pardiso when available** (``mtype=4`` complex Hermitian indefinite, Bunch-Kaufman pivoting on the upper-triangular CSR copy of K_red) — METIS column ordering, ~2-3× less fill than SuperLU's MMD on 3D-FEM patterns, half the data + analysis cost vs the unsymmetric ``mtype=13`` branch this code used before #700, and supports symbolic-factor reuse via :meth:`~femorph_solver.solvers.linear.LinearSolverBase.refactor` (MKL phase 22). Sweep-style callers (cyclic harmonics) can hand the previous solver back as ``cached_lu`` and skip the analysis cost on subsequent harmonics — only the numeric factor runs. * **Otherwise SuperLU** via :func:`scipy.sparse.linalg.splu`. No symbolic-factor reuse hook in scipy for complex CSCs, so every harmonic pays the full factorisation cost. Returns ``cached_lu`` as ``None`` so the caller knows not to thread anything forward. """ # K_red may be None on the complex Pardiso fast-path — the caller # skipped K_plan.apply(T) because K_triu drives the factor and # ARPACK mode 3 doesn't matvec K_op. In that case derive shape # from K_triu instead. if K_red is not None: n = K_red.shape[0] elif K_triu is not None: n = K_triu.shape[0] else: raise ValueError("_complex_opinv_for_augmented needs at least one of K_red / K_triu") try: PardisoSolver = get_linear_solver("pardiso") except SolverUnavailableError: PardisoSolver = None if PardisoSolver is not None: # K_red is Hermitian by construction (Wildheim's P^H K P # reduction). Hand Pardiso the upper-triangular CSR + the # ``assume_hermitian=True`` hint so it routes via ``mtype=-4`` # (Bunch-Kaufman pivoting on Hermitian indefinite). # ``femorph_triu`` skips the duplicate ``sp.triu`` that # PardisoSolver._factor would otherwise do. if K_triu is not None: # Caller built triu via K_plan.apply_triu(T) — skip the # ~3 s sp.triu pass per harmonic on a 75 M-nnz K_red. K_input = K_triu.astype(np.complex128, copy=False) else: K_input = sp.triu(K_red, format="csr").astype(np.complex128, copy=False) K_input.femorph_triu = True if cached_lu is not None and getattr(cached_lu, "_complex", False): cached_lu.refactor(K_input) lu = cached_lu else: if cached_lu is not None and hasattr(cached_lu, "release"): cached_lu.release() lu = PardisoSolver(K_input, assume_hermitian=True) # Pack the (2n,) real ARPACK Lanczos vector into a complex # (n,) for Pardiso, then split the complex solution back. # ``femorph_solver._mkl.pack_complex`` / ``unpack_complex`` use # numpy's interleaved complex128 layout to skip the temp # multiply / concat that ``u + 1j*v`` and # ``np.concatenate([w.real, w.imag])`` allocate per call — # ~1.7-1.8× faster at rotor-scale (488 k DOFs) and amortises # across ~50 ARPACK iterations per complex harmonic. from femorph_solver import _mkl # noqa: PLC0415 # Pre-allocated scratch — pack_complex output (the RHS Pardiso # consumes) and unpack_complex output (the (2n,) ARPACK return). # Pardiso allocates its own ``np.empty_like(b)`` per solve (we # can't share that buffer because the caller may keep the # result), but the pack/unpack scratch lives entirely inside # this closure so reuse is safe. Saves ~2 small alloc/free # pairs per ARPACK iter — at rotor scale the pack alloc is # 8 MB and the unpack alloc is 8 MB, ~7 GB of churn over the # 9-harmonic sweep. z_buf = np.empty(n, dtype=np.complex128) out_buf = np.empty(2 * n, dtype=np.float64) def _matvec(x: np.ndarray) -> np.ndarray: _mkl.pack_complex(x[:n], x[n:], out=z_buf) w = lu.solve(z_buf) return _mkl.unpack_complex(w, out=out_buf) return ( spla.LinearOperator((2 * n, 2 * n), matvec=_matvec, dtype=np.float64), lu, ) # SuperLU fallback path needs a CSC. K_csc = K_red.tocsc() # SuperLU fallback — no symbolic-factor reuse for complex CSCs. if cached_lu is not None and hasattr(cached_lu, "release"): cached_lu.release() lu_su = spla.splu(K_csc) from femorph_solver import _mkl # noqa: PLC0415 def _matvec_su(x: np.ndarray) -> np.ndarray: z = _mkl.pack_complex(x[:n], x[n:]) w = lu_su.solve(z) return _mkl.unpack_complex(w) return ( spla.LinearOperator((2 * n, 2 * n), matvec=_matvec_su, dtype=np.float64), None, ) # Deterministic seed for the ARPACK / PRIMME ``v0`` Lanczos starting # vector on the first complex harmonic (no warm-start chain available) # and the real-harmonic eigsh. Without an explicit ``v0``, scipy and # PRIMME both consume from ``np.random`` global state - which is # process-global and therefore depends on whatever other tests ran # first under pytest-xdist. On near-degenerate eigenvalues (cyclic # modal often has them) that ordering dependence shows up as # occasional mode-pair swaps in the parity tests against MAPDL. Using # a fixed-seed local RNG eliminates the cross-test order coupling # while still giving ARPACK a "random-enough" starting vector to # break degeneracy. _DETERMINISTIC_V0_SEED = 0 def _seeded_real_v0(n: int) -> np.ndarray: """Deterministic real-valued Lanczos start vector for ARPACK eigsh.""" rng = np.random.default_rng(_DETERMINISTIC_V0_SEED) return rng.standard_normal(n).astype(np.float64) def _seeded_complex_v0(n: int) -> np.ndarray: """Deterministic complex-valued Lanczos start vector for PRIMME.""" rng = np.random.default_rng(_DETERMINISTIC_V0_SEED) return (rng.standard_normal(n) + 1j * rng.standard_normal(n)).astype(np.complex128) def _solve_reduced_eig( K_red: sp.csr_array, M_red: sp.csr_array, n_req: int, n_keep: int, linear_solver: str, cached_lu: LinearSolverBase | None = None, K_triu: sp.csr_array | None = None, v0_complex_aug: np.ndarray | None = None, v0_complex_full: np.ndarray | None = None, eigen_solver: str = "scipy", ) -> tuple[np.ndarray, np.ndarray, LinearSolverBase | None]: """Lowest ``n_req`` modes of ``K_red φ = ω² M_red φ`` (complex Hermitian). Small problems (``n_keep <= _DENSE_CUTOFF``) go through LAPACK ``eigh`` directly for maximum accuracy. Larger sectors use a sparse shift-invert path whose inner ``K^-1`` solve is routed via :func:`get_linear_solver` so pardiso is the default when installed (same chain as plain modal; warns once if missing on large problems): * For real harmonics (k=0 / k=N/2) the reduced operators are strictly real-symmetric and ``scipy.sparse.linalg.eigsh`` with ``sigma=0`` returns the lowest modes via shift-invert. * For complex harmonics we embed the Hermitian pair ``(K_red, M_red)`` into a 2n-sized real-symmetric pair via :func:`_hermitian_to_real_augment`, run ``eigsh`` on that, and deduplicate each eigenvalue (the embedding doubles them). The complex eigenvector ``v`` is reconstructed from the augmented eigenvector ``[a; b]`` as ``v = a + i b``. """ # ``K_red is None`` means the caller skipped K_plan.apply(T) on the # complex Pardiso fast-path because K_triu drives the factor and # ARPACK mode 3 doesn't touch K_op.matvec. In that case we must be # on the complex-Hermitian sparse branch — skip the dense and # real-harmonic guards below (they need the full K_red). if K_red is not None and n_keep <= _DENSE_CUTOFF: K_dense = K_red.toarray() M_dense = M_red.toarray() K_dense = 0.5 * (K_dense + K_dense.conj().T) M_dense = 0.5 * (M_dense + M_dense.conj().T) w, v = la.eigh(K_dense, M_dense, subset_by_index=[0, n_req - 1]) return w, v.astype(np.complex128, copy=False), None if K_red is not None and _is_real_harmonic(K_red) and _is_real_harmonic(M_red): # Pure real-symmetric sparse path. K_red can be semi-definite at # k=0 when the sector has no Dirichlet BCs (rigid-body nullspace), # so ``assume_spd=False`` keeps the factorisation on LU rather than # Cholesky. K_r = sp.csr_array((K_red.data.real, K_red.indices, K_red.indptr), shape=K_red.shape) M_r = sp.csr_array((M_red.data.real, M_red.indices, M_red.indptr), shape=M_red.shape) K_r = 0.5 * (K_r + K_r.T) M_r = 0.5 * (M_r + M_r.T) # Reuse the cached factor only when its dtype matches — flipping # between real (k=0, k=N/2) and complex harmonics in the same # sweep would corrupt MKL's analysis. reuse = cached_lu if (cached_lu is not None and cached_lu._A.dtype == np.float64) else None OPinv, lu = _shift_invert_opinv(K_r, linear_solver, assume_spd=False, cached_lu=reuse) # Tighter ARPACK basis (see complex-harmonic comment below). eigsh_ncv = min(max(2 * n_req + 1, n_req + 8), n_keep - 1) w, v = spla.eigsh( K_r, k=n_req, M=M_r, sigma=0.0, which="LM", OPinv=OPinv, ncv=eigsh_ncv, v0=_seeded_real_v0(K_r.shape[0]), ) order = np.argsort(w) return w[order], v[:, order].astype(np.complex128, copy=False), lu # ---------------------------------------------------------------- # Complex Hermitian harmonic # ---------------------------------------------------------------- # Two backends: # * eigen_solver == "primme" → PRIMME native complex Hermitian # generalised eigsh (no augmentation, no dedupe, ~2× fewer # matvecs and ~30% fewer Pardiso solves at rotor scale). # * eigen_solver == "scipy" → scipy ARPACK on the 2n real # augmentation (legacy path; only choice when PRIMME isn't # installed). if eigen_solver == "primme": return _primme_complex_hermitian( K_red, M_red, n_req, n_keep, cached_lu=cached_lu, K_triu=K_triu, v0_complex_aug=v0_complex_aug, v0_complex_full=v0_complex_full, ) # Complex Hermitian → 2n real-symmetric augmentation. The augmented # matrix shares the spectrum of the Hermitian original (each eigenvalue # doubled). # # Shift-invert at σ=0 needs ``K_aug^-1 · x`` inside the Lanczos loop. # Instead of factoring the 2n real augmentation directly (which was # scipy's default when we passed ``OPinv=None``), exploit the block # identity # # [[Re(Z), -Im(Z)], [Im(Z), Re(Z)]]^{-1} · [u; v] # = [Re(Z^{-1}(u+iv)); Im(Z^{-1}(u+iv))] # # with ``Z = K_red`` (complex Hermitian, size n). A single complex # ``splu`` factor on ``K_red`` is ~3× cheaper than a real ``splu`` on # the 2n augmentation on rotor-scale sectors — the augmentation has # ~4× more nnz (four blocks of K_red's pattern) and factor cost grows # super-linearly with problem size. # Skip materialising the 2n real ``K_aug`` / ``M_aug`` CSRs entirely — # wrap the complex K_red / M_red as real 2n LinearOperators via the # block-matvec identity (see :func:`_complex_as_real_augmented_matvec`). # On a rotor-scale sector this drops ~200 MB of peak RSS per k (the # augmentation has ~4× K_red's nnz for each of K_aug / M_aug) and # skips a ``sp.bmat`` build per matrix per harmonic. eigsh in # shift-invert mode drives the Lanczos loop through OPinv and the M # matvec only — A itself is inspected for shape/dtype but never # matvec'd — so the K LinearOperator here is purely a shape handle. # Reuse the cached complex factor only when its dtype matches — # crossing real ↔ complex would corrupt the analysis. reuse = cached_lu if (cached_lu is not None and getattr(cached_lu, "_complex", False)) else None OPinv, complex_lu = _complex_opinv_for_augmented(K_red, cached_lu=reuse, K_triu=K_triu) # Build K_op: scipy's eigsh in mode 3 (sigma=0 + M provided) # explicitly *forbids* a matvec on A — see scipy/sparse/linalg/ # _eigen/arpack/arpack.py:506 ("matvec must not be specified for # mode=3"). Pass a stub LinearOperator that satisfies the # shape/dtype probe but raises if eigsh ever does call its # matvec. Lets the caller skip materialising the full K_red in # the complex Pardiso path entirely once the upper-triu copy is # the only thing the factor needs. if K_red is None: n = K_triu.shape[0] K_op = spla.LinearOperator( (2 * n, 2 * n), matvec=_unreachable_matvec, dtype=np.float64, ) else: K_op = _complex_as_real_augmented_matvec(K_red) M_op = _complex_as_real_augmented_matvec(M_red) # Each true eigenvalue appears twice in the augmented spectrum; ask # for 2·n_req and then dedupe. Slight padding (min(2·n_req+4, 2n_keep-1)) # absorbs clustered pairs that land just outside the first 2·n_req # convergence block. n_aug = 2 * n_keep n_request = min(2 * n_req + 4, n_aug - 1) # ARPACK convergence: scipy's default ``ncv=max(2k+1, 20)`` is too # loose for shift-invert mode 3 on rotor-scale K_red — local probe # at 488 k DOFs / n_request=12 shows the *opposite* of the # intuitive "more basis = fewer restarts": ``ncv=34`` (the old # ``max(2k+10, k+20)`` formula) takes 47 OPinv calls / 104 s while # ``ncv=25`` (``max(2k+1, k+8)``) takes 40 calls / 89 s. The # quadratic per-restart cost in ncv outweighs the basis-slack # benefit at this scale, so we tighten the formula and pay the # ~7 fewer Pardiso solves per harmonic — ~15 s × 9 complex k on # the bench rotor. Keep ``tol=0`` (machine precision); looser # values broke the small-sector MAPDL parity tests via # near-cluster mode mixing. eigsh_ncv = min(max(2 * n_request + 1, n_request + 8), n_aug - 1) # Warm-start: when the caller supplies the previous complex # harmonic's first eigenvector (projected onto the (2n,) real # augmentation), seed ARPACK with it. Cyclic eigenmodes vary # continuously with k so the previous mode is a good Lanczos # initial vector — typically cuts 5-15 outer restart cycles per # harmonic on rotor-scale K_red. Only safe when the dtype matches # the current harmonic (caller filters real ↔ complex transitions). v0 = ( v0_complex_aug if (v0_complex_aug is not None and v0_complex_aug.shape == (2 * n_keep,)) else _seeded_real_v0(2 * n_keep) ) w_aug, v_aug = spla.eigsh( K_op, k=n_request, M=M_op, sigma=0.0, which="LM", OPinv=OPinv, ncv=eigsh_ncv, v0=v0, ) order = np.argsort(w_aug) w_aug = w_aug[order] v_aug = v_aug[:, order] # Merge consecutive duplicates. Two eigenvectors of the augmented # matrix span the same complex eigenmode; Gram-Schmidt on the first # of each pair gives the complex ``v`` up to a global phase. w_out: list[float] = [] v_out: list[np.ndarray] = [] i = 0 while i < len(w_aug) and len(w_out) < n_req: # Take the current augmented eigenvector as the complex mode. a = v_aug[:n_keep, i] b = v_aug[n_keep:, i] v_complex = a + 1j * b nrm = np.linalg.norm(v_complex) if nrm > 0: v_complex = v_complex / nrm w_out.append(w_aug[i]) v_out.append(v_complex) # Skip the paired partner (same eigenvalue, 90° phase-rotated). # The partner is expected within a tiny relative tolerance. j = i + 1 if j < len(w_aug) and abs(w_aug[j] - w_aug[i]) <= 1e-8 * max(abs(w_aug[i]), 1.0): i = j + 1 else: i = j w_final = np.array(w_out[:n_req], dtype=np.float64) v_final = np.stack(v_out[:n_req], axis=1).astype(np.complex128) return w_final, v_final, complex_lu def _primme_complex_hermitian( K_red: sp.csr_array, M_red: sp.csr_array, n_req: int, n_keep: int, *, cached_lu: LinearSolverBase | None = None, K_triu: sp.csr_array | None = None, v0_complex_aug: np.ndarray | None = None, v0_complex_full: np.ndarray | None = None, ) -> tuple[np.ndarray, np.ndarray, LinearSolverBase | None]: """Solve ``K_red φ = ω² M_red φ`` for the lowest ``n_req`` eigenpairs with PRIMME's native complex Hermitian generalised solver. Avoids scipy's 2n real augmentation entirely. Inner ``K^-1`` solves still go through Pardiso (mtype=-4 Hermitian indefinite) so the factor-reuse-across-harmonics machinery is unchanged — only the Krylov outer loop swaps backends. PRIMME's ``which="SA"`` (smallest algebraic) returns the lowest eigenvalues directly; ``OPinv=K^-1`` accelerates convergence exactly the way scipy ARPACK uses it as a preconditioner. """ import primme # noqa: PLC0415 # Build the inner Pardiso factor on K_red (complex Hermitian). # Reuse cached_lu when its complex flag matches; otherwise build # fresh. Same logic as :func:`_complex_opinv_for_augmented`. try: PardisoSolver = get_linear_solver("pardiso") except SolverUnavailableError: PardisoSolver = None if PardisoSolver is not None: if K_triu is not None: K_input = K_triu.astype(np.complex128, copy=False) else: K_input = sp.triu(K_red, format="csr").astype(np.complex128, copy=False) K_input.femorph_triu = True if cached_lu is not None and getattr(cached_lu, "_complex", False): cached_lu.refactor(K_input) lu = cached_lu else: if cached_lu is not None and hasattr(cached_lu, "release"): cached_lu.release() lu = PardisoSolver(K_input, assume_hermitian=True) n = K_input.shape[0] else: # No Pardiso → fall back to scipy splu on K_red (CSC). if cached_lu is not None and hasattr(cached_lu, "release"): cached_lu.release() K_csc = K_red.tocsc().astype(np.complex128, copy=False) lu_su = spla.splu(K_csc) class _SuperLUBackedLU: # quack-typed mini-wrapper _complex = True _A = K_csc def solve(self, b): return lu_su.solve(b.astype(np.complex128)) def release(self): pass lu = _SuperLUBackedLU() n = K_csc.shape[0] # OPinv as a complex (n, n) LinearOperator — PRIMME treats it as a # preconditioner; an exact inverse is the strongest possible. # Supports both 1-D and 2-D (block) RHS so PRIMME's ``maxBlockSize`` # parameter (batched matvec) can route through Pardiso's native # multi-RHS phase 33 in a single call instead of N sequential solves. def _opinv(b: np.ndarray) -> np.ndarray: b = np.ascontiguousarray(b, dtype=np.complex128) if b.ndim == 2: # Pardiso solves a column-major dense block directly; route # each column through the cached path and stack. When the # underlying solver supports multi-RHS we'd hand the matrix # in one shot — pypardiso's solve() takes (n, nrhs) but the # bypassed _call_pardiso path needs a 2D b too, which it # already supports via the ``b.shape[1]`` nrhs computation. return lu.solve(b) return lu.solve(b) OPinv = spla.LinearOperator( (n, n), matvec=_opinv, matmat=_opinv, dtype=np.complex128, ) # Warm-start: PRIMME accepts a (N, k_seed) 2-D array of initial # guesses. Counter-intuitive but measured: passing the full # ``(n_keep, n_req)`` previous-harmonic basis as v0 *triples* the # matvec count vs no warm-start (k=2 went from ~135 to ~350 # matvecs in a one-harmonic stats probe). PRIMME's GD+k method # treats v0 columns as expansion vectors, and a near-linearly- # dependent set (which adjacent-k modes are) wastes iterations on # redundant search directions. Pass *just* the first eigenvector # — best of both worlds: useful initial direction, no basis bloat. v0 = None if v0_complex_full is not None and v0_complex_full.shape == (n_keep, n_req): v_first = v0_complex_full[:, 0] nrm = np.linalg.norm(v_first) if nrm > 0: v0 = (v_first / nrm)[:, None] elif v0_complex_aug is not None and v0_complex_aug.shape == (2 * n_keep,): v0_complex = (v0_complex_aug[:n_keep] + 1j * v0_complex_aug[n_keep:]).astype(np.complex128) nrm = np.linalg.norm(v0_complex) if nrm > 0: v0 = (v0_complex / nrm)[:, None] if v0 is None: # Cold-start: deterministic Lanczos seed (see top-of-file # ``_DETERMINISTIC_V0_SEED`` rationale). Without this, PRIMME # consumes from ``np.random`` global state and the cyclic-modal # parity test occasionally ends up with mode-pair swaps when # other tests reorder under xdist. v0 = _seeded_complex_v0(n_keep)[:, None] # Solve. ``which="SA"`` = smallest algebraic eigenvalues (lowest # modes). ``method="PRIMME_DEFAULT_MIN_MATVECS"`` selects PRIMME's # GD+k variant, which minimises the number of matvec calls — the # key metric for our Pardiso-bound inner loop where each matvec # is a sparse Hermitian solve. ``tol=1e-8`` (relative residual # bound) gives ~8-digit eigenvalue accuracy — comfortably above # the ~1e-3 frequency tolerance the MAPDL parity tests require, # and on the bench rotor lands the sweep at ~2.0× MAPDL wall (vs # ~1.95× at tol=1e-10 and ~1.90× at tol=0 / machine epsilon). # NB: tried ``maxBlockSize=4`` — regressed badly on the bench # rotor (per-worker wall jumped from ~135 s to >40 min for the # par4+PRIMME sweep before we killed it). The block-Krylov path # apparently needs many more total matvecs to converge on our # rotor-scale K with this method/OPinv pairing, swamping the # per-RHS cache benefit. Stay at the default block size 1. w, v = primme.eigsh( K_red.astype(np.complex128, copy=False), k=n_req, M=M_red.astype(np.complex128, copy=False), which="SA", OPinv=OPinv, v0=v0, tol=1e-8, method="PRIMME_DEFAULT_MIN_MATVECS", return_eigenvectors=True, return_unconverged=False, raise_for_unconverged=True, ) order = np.argsort(w.real) return w.real[order], v[:, order].astype(np.complex128, copy=False), lu @dataclass class _CyclicReductionPlan: """Cached ``P^H A P`` topology for an N-sector cyclic sweep. The reduced triplets partition by phase exponent ``e ∈ {-1, 0, +1}`` — a function of where each input ``(row, col)`` lives relative to the cyclic boundary, independent of the harmonic index ``k``. Once per input matrix we bucket the reduced triplets into three real CSRs (``A_zero``, ``A_pos``, ``A_neg``); per-harmonic evaluation is then A_red(k) = A_zero + e^{ikα} · A_pos + e^{-ikα} · A_neg with ``|T|² = 1`` cancellation making the high-×-high block phase-independent (see :func:`_plan_cyclic_reduction` for the derivation). For a PTR-scale rotor this replaces ~36 Python-heavy COO walks (2 matrices × 18 harmonics) with just 2 walks plus 36 O(nnz) sparse scalings — roughly an order of magnitude less Python overhead on the cyclic sweep hot path. :meth:`apply_triu` returns the upper-triangular projection of ``A_red(T)`` without an O(nnz) ``sp.triu`` per harmonic — the three buckets' triu copies are precomputed lazily on first use, so the Hermitian-Pardiso path (``mtype=-4``) only pays the triu cost once per matrix instead of once per complex harmonic. On the pbs-sector-hd reference that's ~3 s × 9 complex harmonics ≈ 27 s saved across the full sweep. """ A_zero: sp.csr_array A_pos: sp.csr_array A_neg: sp.csr_array _triu_zero: sp.csr_array | None = None _triu_pos: sp.csr_array | None = None _triu_neg: sp.csr_array | None = None def apply(self, T: complex) -> sp.csr_array: # Real-harmonic fast paths: k=0 and k=N/2 stay purely real so the # downstream sparse eigsh path can take its real-symmetric branch # without a ``.real`` copy. if T == 1.0 + 0.0j: return self.A_zero + self.A_pos + self.A_neg if T == -1.0 + 0.0j: return self.A_zero - self.A_pos - self.A_neg return self.A_zero + T * self.A_pos + T.conjugate() * self.A_neg def _ensure_triu_buckets(self) -> None: if self._triu_zero is None: # ``sp.triu`` once per bucket; cache so subsequent # complex-harmonic ``apply_triu(T)`` calls just do three # scalar-scaled CSR adds against the already-triu storage. self._triu_zero = sp.triu(self.A_zero, format="csr") self._triu_pos = sp.triu(self.A_pos, format="csr") self._triu_neg = sp.triu(self.A_neg, format="csr") def apply_triu(self, T: complex) -> sp.csr_array: """Return ``triu(A_red(T))`` for direct hand-off to a Hermitian solver (Pardiso ``mtype=-4`` / ``mtype=2``). Same combination as :meth:`apply` but on pre-computed triu buckets — saves an ``sp.triu`` pass at every harmonic. """ self._ensure_triu_buckets() if T == 1.0 + 0.0j: return self._triu_zero + self._triu_pos + self._triu_neg if T == -1.0 + 0.0j: return self._triu_zero - self._triu_pos - self._triu_neg return self._triu_zero + T * self._triu_pos + T.conjugate() * self._triu_neg def _plan_cyclic_reduction( A_coo: sp.coo_matrix | sp.coo_array, red_of_global: np.ndarray, fixed_mask: np.ndarray, face_mask: np.ndarray, low: np.ndarray, # (P, d) high: np.ndarray, # (P, d) R: np.ndarray, # (d, d) real rotation — phase-independent ) -> _CyclicReductionPlan: """Plan ``P^H A P`` and return a per-harmonic evaluator. ``A`` (real SPD) is walked once. For each (r, c, v) triplet four cases arise by ``(row_is_hi, col_is_hi)``: * ``(False, False)`` — non-face × non-face, emits one triplet with coefficient ``v``. Phase exponent ``0``. * ``(True, False)`` — high row: expands into ``d`` triplets, one per low-partner DOF ``b``, each weighted by ``R[a_r, b]``. Phase factor in the full evaluation is ``conj(T·R[a_r, b]) = conj(T)·R``, so the base coefficient is the real part ``v·R[a_r, b]`` and the phase exponent is ``-1``. * ``(False, True)`` — high col: mirror of the above, phase exponent ``+1``, base coefficient ``v·R[a_c, b']``. * ``(True, True)`` — high × high: ``d²`` triplets. The two phase factors ``conj(T·R[a_r, b])`` and ``T·R[a_c, b']`` multiply to ``|T|² · R[a_r, b] · R[a_c, b'] = R[a_r, b] · R[a_c, b']`` — the unit-modulus phase cancels. Phase exponent ``0``. The three phase buckets become three real CSRs with duplicates summed within each bucket. :meth:`_CyclicReductionPlan.apply` then combines them per-k with the scalar phase factor. """ rows = A_coo.row cols = A_coo.col data = np.asarray(A_coo.data, dtype=np.float64) # A is real SPD P, d = low.shape n_total = A_coo.shape[0] n_keep = int((red_of_global >= 0).sum()) # Lookup: map each high-face global DOF to its (pair-index p, a-index) # so we can fetch the rotation-row that rewrites that DOF. Dense # ``(N,)`` sentinels — only high DOFs matter, rest stay at -1. high_p = -np.ones(n_total, dtype=np.int64) high_a = -np.ones(n_total, dtype=np.int64) p_idx = np.broadcast_to(np.arange(P)[:, None], (P, d)).ravel() a_idx = np.broadcast_to(np.arange(d)[None, :], (P, d)).ravel() high_p[high.ravel()] = p_idx high_a[high.ravel()] = a_idx # Drop fixed rows/cols up front. keep_entry = ~(fixed_mask[rows] | fixed_mask[cols]) rows = rows[keep_entry] cols = cols[keep_entry] data = data[keep_entry] is_row_hi = face_mask[rows] is_col_hi = face_mask[cols] zero_rows: list[np.ndarray] = [] zero_cols: list[np.ndarray] = [] zero_vals: list[np.ndarray] = [] # --- Case A: (non-face, non-face) — phase exponent 0 ----------------- m = ~is_row_hi & ~is_col_hi # ``red_of_global`` is -1 for face AND fixed rows; fixed are already # dropped and ~is_row_hi / ~is_col_hi rules out face, so the mapped # indices here are guaranteed >= 0. zero_rows.append(red_of_global[rows[m]]) zero_cols.append(red_of_global[cols[m]]) zero_vals.append(data[m]) # --- Case B: (high row, non-face col) — phase exponent -1 ------------ m = is_row_hi & ~is_col_hi r_sub = rows[m] c_sub = cols[m] v_sub = data[m] p_r = high_p[r_sub] a_r = high_a[r_sub] red_c_b = red_of_global[c_sub] # Expand the ``d`` low-partner contributions per entry in one batch. b_arange = np.arange(d) low_lookup = low[p_r[:, None], b_arange[None, :]] # (n, d) red_r_b = red_of_global[low_lookup] # (n, d); -1 when partner is fixed R_row = R[a_r[:, None], b_arange[None, :]] # (n, d) real coef_b = v_sub[:, None] * R_row # (n, d) real valid_b = red_r_b >= 0 # red_c_b guaranteed >= 0 cols_b_full = np.broadcast_to(red_c_b[:, None], red_r_b.shape) neg_rows = red_r_b[valid_b] neg_cols = cols_b_full[valid_b] neg_vals = coef_b[valid_b] # --- Case C: (non-face row, high col) — phase exponent +1 ------------ m = ~is_row_hi & is_col_hi r_sub = rows[m] c_sub = cols[m] v_sub = data[m] red_r_c = red_of_global[r_sub] p_c = high_p[c_sub] a_c = high_a[c_sub] bp_arange = np.arange(d) low_lookup = low[p_c[:, None], bp_arange[None, :]] # (n, d) red_c_c = red_of_global[low_lookup] R_col = R[a_c[:, None], bp_arange[None, :]] coef_c = v_sub[:, None] * R_col valid_c = red_c_c >= 0 rows_c_full = np.broadcast_to(red_r_c[:, None], red_c_c.shape) pos_rows = rows_c_full[valid_c] pos_cols = red_c_c[valid_c] pos_vals = coef_c[valid_c] # --- Case D: (high row, high col) — phase exponent 0 ----------------- m = is_row_hi & is_col_hi r_sub = rows[m] c_sub = cols[m] v_sub = data[m] p_r = high_p[r_sub] a_r = high_a[r_sub] p_c = high_p[c_sub] a_c = high_a[c_sub] # Full (n, d, d) expansion. R-weights combine as R[a_r, b] · R[a_c, b']. low_r = low[p_r[:, None, None], b_arange[None, :, None]] # (n, d, 1) low_c = low[p_c[:, None, None], bp_arange[None, None, :]] # (n, 1, d) low_r = np.broadcast_to(low_r, (r_sub.size, d, d)) low_c = np.broadcast_to(low_c, (r_sub.size, d, d)) red_rr = red_of_global[low_r] red_cc = red_of_global[low_c] Rr = R[a_r[:, None, None], b_arange[None, :, None]] Rc = R[a_c[:, None, None], bp_arange[None, None, :]] coef_hh = v_sub[:, None, None] * Rr * Rc # (n, d, d) valid_hh = (red_rr >= 0) & (red_cc >= 0) zero_rows.append(red_rr[valid_hh]) zero_cols.append(red_cc[valid_hh]) zero_vals.append(coef_hh[valid_hh]) shape = (n_keep, n_keep) def _to_csr(rs: np.ndarray, cs: np.ndarray, vs: np.ndarray) -> sp.csr_array: if rs.size == 0: return sp.csr_array(shape, dtype=np.float64) return sp.coo_array( ( vs.astype(np.float64, copy=False), (rs.astype(np.int64, copy=False), cs.astype(np.int64, copy=False)), ), shape=shape, ).tocsr() A_zero = _to_csr( np.concatenate(zero_rows), np.concatenate(zero_cols), np.concatenate(zero_vals) ) A_pos = _to_csr(pos_rows, pos_cols, pos_vals) A_neg = _to_csr(neg_rows, neg_cols, neg_vals) return _CyclicReductionPlan(A_zero=A_zero, A_pos=A_pos, A_neg=A_neg)