Source code for femorph_solver.solvers.cyclic

"""Cyclic-symmetry modal analysis for a single base sector.

A rotor with ``N`` identical sectors spanning 360° satisfies, at each
*harmonic index* (nodal diameter) ``k ∈ {0, 1, …, N/2}``,

    u_sector_{j+1}(global) = e^{i k α} · R(α) · u_sector_j(global)

with ``α = 2π/N`` and ``R(α)`` the spatial rotation about the symmetry
axis.  Applied at the base-sector's high-angle face (which is identified
with sector 1's low-angle face), this gives the constraint

    u_high(global) = e^{i k α} · R(α) · u_low(global)

relating paired low-/high-face DOFs.  That constraint lets a modal solve
work on **one sector** and produce the full-rotor spectrum one k at a
time — an N-times speedup relative to meshing the whole rotor.

The reduction here matches MAPDL's CYCLIC approach in its *complex* form.
Base-sector DOFs are partitioned into interior ``I`` and low-face ``L``
/ high-face ``H`` sets, with a one-to-one (low-node, high-node)
correspondence given by the user.  High DOFs are eliminated via the
constraint above; the remaining ``[u_I; u_L]`` vector satisfies a
Hermitian generalised eigenproblem

    K_red φ = ω² M_red φ

with ``K_red = P^H K P`` and ``M_red = P^H M P`` (K, M real SPD; P the
complex constraint matrix that embeds the reduced vector back into the
full one).  For k=0 and k=N/2 (even N) the phase is ±1 and the reduced
problem can be real; for intermediate k it is complex Hermitian.

Per-pair rotation
-----------------
Stiffness / mass are usually assembled in a global XYZ frame, which
means the cyclic phase relation picks up the same rotation that maps
sector j to sector j+1.  Pass ``pair_rotation`` as the ``(d, d)`` matrix
R(α) acting on each node pair's ``d`` translational DOFs (3 for a 3D
solid sweeping about z).  Omit it (``None``) when face DOFs are already
in a cylindrical / per-sector local frame, in which case R(α) reduces to
the identity.

Harmonic-index counting
-----------------------
- k=0            — single real eigenmodes (in-phase family)
- k=N/2 (even N) — single real eigenmodes (anti-phase family)
- 0 < k < N/2    — each eigenvalue corresponds to a degenerate pair in
  the full rotor (travelling-wave directions).  The complex reduced
  solve returns them once per k; the full-rotor multiset is recovered
  by counting them twice.

References
----------
- Thomas, D. L.  "Dynamics of rotationally periodic structures."
  International Journal for Numerical Methods in Engineering 14 (1),
  81-102 (1979).  [the foundational complex-valued cyclic reduction
  applied here — one base sector factored at each harmonic index
  ``k`` recovers the full-rotor spectrum]
- Wildheim, S. J.  "Vibrations of rotationally periodic
  structures."  Ph.D. dissertation, Linköping University (1979).
  [companion treatment of the constraint matrix ``P`` and its
  Hermitian reduction ``P^H K P``]
- Bathe, K.-J.  *Finite Element Procedures*, Prentice Hall (1996),
  §10.3.4 (cyclic symmetry).  [textbook derivation matching the
  partitioned ``[u_I; u_L]`` variable convention used below]
"""

from __future__ import annotations

from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING

import numpy as np
import scipy.linalg as la
import scipy.sparse as sp
import scipy.sparse.linalg as spla

from .._threads import limit as _thread_limit
from .linear import get_linear_solver, select_default_linear_solver

if TYPE_CHECKING:
    from femorph_solver.model import Model

# Above this reduced-sector DOF count we switch from LAPACK dense eigh to
# sparse shift-invert eigsh.  The dense complex-Hermitian eigh allocates
# two ``n_keep²`` matrices (~16 GB real / 32 GB complex at n_keep = 45k);
# 1500 DOFs fits comfortably in memory while still leaving ample headroom
# for LAPACK's internal work arrays.
_DENSE_CUTOFF = 1500


[docs] @dataclass class CyclicModalResult: """Modes for one harmonic index of a cyclic-symmetry modal solve.""" harmonic_index: int #: Number of sectors in the full rotor (same for every result in a sweep). n_sectors: int #: (n_modes,) — ω² = (2πf)²; numerically negative values clipped to 0. omega_sq: np.ndarray #: (n_modes,) — cyclic frequencies f [Hz]. frequency: np.ndarray #: (N, n_modes) — complex mode shapes on the base sector, indexed by #: the original DOF layout (not the reduced layout). For k=0 and #: k=N/2 the imaginary part is identically zero. mode_shapes: np.ndarray #: (3,) — point on the rotor rotation axis. ``None`` means "not #: recorded by the caller"; writers default to the origin in that #: case. Populated by :func:`solve_cyclic_modal` when a #: ``pair_rotation`` is supplied. axis_point: np.ndarray | None = None #: (3,) — unit vector along the rotor axis. ``None`` means "not #: recorded"; writers default to ``+Z``. Populated by #: :func:`solve_cyclic_modal` from the supplied ``pair_rotation``. axis_dir: np.ndarray | None = None
[docs] def save_cyclic_modal_results( results: Sequence[CyclicModalResult], path: str | Path, model: Model, *, unit_system: str = "SI", extra_field_data: dict[str, np.ndarray] | None = None, ) -> Path: """Serialise a cyclic-modal sweep to a single disk-backed ``.pv`` file. Parameters ---------- results : sequence of CyclicModalResult Per-harmonic-index results as returned by :func:`solve_cyclic_modal`. All entries must share the same ``n_sectors``. path : str or pathlib.Path Destination file. model : femorph_solver.model.Model Model whose K / M matrices this sweep was computed on; supplies the mesh and DOF layout. extra_field_data : mapping, optional Extra ``field_data`` entries (e.g. solver statistics). Returns ------- pathlib.Path Canonicalised absolute path to the written file. Notes ----- The DOF-indexed ``(N, n_modes)`` complex mode shapes on each per-``k`` result get re-projected onto per-``(k, mode)``, per-node ``(n_modes, n_points, 6)`` complex arrays — the layout :func:`~femorph_solver.result.cyclic_modal.write_cyclic_modal_result` expects. """ if not results: raise ValueError("results must be non-empty") n_sectors = {int(r.n_sectors) for r in results} if len(n_sectors) != 1: raise ValueError(f"all results must share the same n_sectors; saw {sorted(n_sectors)}") (n_sectors_value,) = n_sectors from femorph_solver.result._material import MaterialTable from femorph_solver.result.cyclic_modal import write_cyclic_modal_result grid = model.grid dof_map = model.dof_map() node_nums = np.asarray(grid.point_data["ansys_node_num"], dtype=np.int64) node_to_row = {int(n): i for i, n in enumerate(node_nums)} node_row = np.fromiter( (node_to_row[int(n)] for n in dof_map[:, 0]), dtype=np.int64, count=dof_map.shape[0], ) comp = dof_map[:, 1].astype(np.int64) modes_by_k: dict[int, tuple[np.ndarray, np.ndarray]] = {} for r in results: n_modes = int(r.mode_shapes.shape[1]) shapes = np.zeros((n_modes, grid.n_points, 6), dtype=np.complex128) # Scatter each mode's DOF-indexed vector onto (node, component). for k_mode in range(n_modes): shapes[k_mode, node_row, comp] = r.mode_shapes[:, k_mode] modes_by_k[int(r.harmonic_index)] = ( np.asarray(r.frequency, dtype=np.float64), shapes, ) # Collect the rotor axis stamp. Every result in a sweep comes # from one cyclic solve so their axis info must agree; just take # the first one that actually carries the stamp. axis_point: np.ndarray | None = None axis_dir: np.ndarray | None = None for r in results: if r.axis_point is not None and axis_point is None: axis_point = np.asarray(r.axis_point, dtype=np.float64) if r.axis_dir is not None and axis_dir is None: axis_dir = np.asarray(r.axis_dir, dtype=np.float64) material_table = MaterialTable.from_model(model, unit_system=str(unit_system)) return write_cyclic_modal_result( path, grid, n_sectors_value, modes_by_k, axis_point=axis_point, axis_dir=axis_dir, material_table=material_table, unit_system=unit_system, extra_field_data=extra_field_data, )
def _harmonic_indices(n_sectors: int) -> list[int]: return list(range(n_sectors // 2 + 1))
[docs] def solve_cyclic_modal( K: sp.spmatrix | sp.sparray, M: sp.spmatrix | sp.sparray, low_node_dofs: np.ndarray, high_node_dofs: np.ndarray, n_sectors: int, n_modes: int = 10, *, pair_rotation: np.ndarray | None = None, harmonic_indices: Sequence[int] | None = None, prescribed: dict[int, float] | None = None, thread_limit: int | None = None, linear_solver: str = "auto", ) -> list[CyclicModalResult]: """Solve the cyclic-symmetry modal problem on a single base sector. Parameters ---------- K, M : scipy.sparse Global stiffness / mass of the **base sector**, before cyclic or Dirichlet reduction. Assumed real SPD. low_node_dofs, high_node_dofs : (P, d) int arrays Global DOF indices at ``P`` paired boundary nodes, each with ``d`` DOFs (typically 3 for a 3D solid). ``low_node_dofs[i, :]`` and ``high_node_dofs[i, :]`` must address the same physical DOFs (same labels, same order) at the node pair that's identified by the cyclic BC. A 1-D array is accepted for scalar (``d=1``) problems. n_sectors : int Number of repetitions that close the rotor (``N``). n_modes : int, default 10 Number of lowest modes per harmonic index. pair_rotation : (d, d) array, optional Spatial rotation R(α) that maps sector j's local frame to sector j+1's, applied per node pair. Pass ``None`` when face DOFs are already in a per-sector local frame (then R=I implied). harmonic_indices : sequence of int, optional Harmonic indices to solve. Defaults to ``0, 1, …, N//2``. prescribed : mapping, optional Dirichlet BCs on the sector (global DOF index → 0). Currently only zero-value Dirichlet is supported; if a cyclic-face DOF is prescribed its partner must also be prescribed (so the cyclic constraint is trivially satisfied at that DOF). thread_limit : int or None, optional Cap on BLAS / OpenMP threads used inside the per-harmonic dense Hermitian eigensolve. ``None`` defers to the global limit (see :func:`femorph_solver.set_thread_limit`). linear_solver : str, default ``"auto"`` Name of the registered linear backend used for the inner shift-invert factorisation in the sparse path. ``"auto"`` picks the fastest available SPD direct solver in the priority chain ``pardiso → cholmod → umfpack → superlu`` and emits a one-shot warning if pardiso is missing on a problem large enough to benefit from it. See :func:`femorph_solver.solvers.linear.list_linear_solvers`. Returns ------- list[CyclicModalResult] One :class:`CyclicModalResult` per requested harmonic index, in the order requested. """ if n_sectors < 2: raise ValueError(f"n_sectors must be >= 2, got {n_sectors}") low = np.atleast_2d(np.asarray(low_node_dofs, dtype=np.int64)) high = np.atleast_2d(np.asarray(high_node_dofs, dtype=np.int64)) if low.shape != high.shape: raise ValueError( f"low_node_dofs and high_node_dofs must have the same shape, " f"got {low.shape} vs {high.shape}" ) if low.ndim != 2: raise ValueError(f"node-DOF arrays must be 2-D (P, d), got shape {low.shape}") P, d = low.shape if pair_rotation is None: R = np.eye(d) else: R = np.asarray(pair_rotation, dtype=np.float64) if R.shape != (d, d): raise ValueError(f"pair_rotation must be ({d}, {d}), got {R.shape}") if harmonic_indices is None: harmonic_indices = _harmonic_indices(n_sectors) N = K.shape[0] if M.shape != K.shape: raise ValueError(f"K and M must have the same shape, got {K.shape} vs {M.shape}") fixed_mask = np.zeros(N, dtype=bool) if prescribed: for idx, val in prescribed.items(): if float(val) != 0.0: raise NotImplementedError( "non-zero Dirichlet BCs are not supported by the cyclic " "reduction yet — all prescribed values must be 0" ) fixed_mask[int(idx)] = True low_flat = low.ravel() high_flat = high.ravel() lo_fixed = fixed_mask[low_flat] hi_fixed = fixed_mask[high_flat] if (lo_fixed != hi_fixed).any(): raise ValueError( "cyclic face Dirichlet is only allowed when both low and high " "partners are fixed to zero" ) face_mask = np.zeros(N, dtype=bool) face_mask[high_flat] = True # eliminated DOFs # Reduced DOF ordering: every DOF that is neither fixed nor on the # high face keeps an index. Low-face DOFs are the "masters". keep = ~(fixed_mask | face_mask) n_keep = int(keep.sum()) if n_keep == 0: raise ValueError("no free DOFs remain after cyclic + Dirichlet reduction") red_of_global = -np.ones(N, dtype=np.int64) red_of_global[keep] = np.arange(n_keep) # Precompute the per-pair dense block ``T · R`` as a function of phase. # The constraint is u_high[i, :] = T · R · u_low[i, :] (vector of d). # That's equivalent to building an auxiliary (L_flat × n_keep) sparse # matrix S with entries S[high_dof, reduced-index-of-low-dof] = T·R[a,b]. # Plan the reduction topology once per input matrix. The per-harmonic # cost then collapses to three sparse scalings + two adds (see # :class:`_CyclicReductionPlan`) instead of a full Python-level COO walk # per k. On an N-sector sweep this replaces ``2 × (N/2 + 1)`` COO walks # with just two, which is an order of magnitude less Python overhead # for rotor-scale sectors. K_plan = _plan_cyclic_reduction(K.tocoo(), red_of_global, fixed_mask, face_mask, low, high, R) M_plan = _plan_cyclic_reduction(M.tocoo(), red_of_global, fixed_mask, face_mask, low, high, R) # Extract the rotor axis from the caller's ``pair_rotation`` so # the result can travel with its geometry stamp. For d != 3 this # is a no-op (the axis concept is 3D-specific). axis_point_stamp: np.ndarray | None = None axis_dir_stamp: np.ndarray | None = None if pair_rotation is not None and d == 3: from femorph_solver.result._cyclic_expand import axis_from_rotation axis_dir_stamp, _sector_angle = axis_from_rotation(R.real.astype(np.float64)) # The axis passes through the origin in the global frame unless # the caller has an asymmetric mesh. We default to origin and # let the Model / writer override if needed. axis_point_stamp = np.zeros(3, dtype=np.float64) results: list[CyclicModalResult] = [] alpha = 2.0 * np.pi / n_sectors with _thread_limit(thread_limit): for k in harmonic_indices: T = complex(np.exp(1j * k * alpha)) TR = T * R.astype(np.complex128) K_red = K_plan.apply(T) M_red = M_plan.apply(T) n_req = min(n_modes, n_keep) w, v = _solve_reduced_eig(K_red, M_red, n_req, n_keep, linear_solver) w_clean = np.where(w > 0, w, 0.0) phi = np.zeros((N, n_req), dtype=np.complex128) phi[keep] = v # Expand the eliminated high DOFs back: u_high[i,:] = T R · u_low[i,:]. u_low_blocks = phi[low_flat].reshape(P, d, n_req) u_high_blocks = np.einsum("ab,pbm->pam", TR, u_low_blocks) phi[high_flat] = u_high_blocks.reshape(P * d, n_req) results.append( CyclicModalResult( harmonic_index=int(k), n_sectors=n_sectors, omega_sq=w_clean, frequency=np.sqrt(w_clean) / (2.0 * np.pi), mode_shapes=phi, axis_point=axis_point_stamp, axis_dir=axis_dir_stamp, ) ) return results
def _is_real_harmonic(A: sp.csr_array, atol: float = 1e-14) -> bool: """Return True if A's sparse data is (numerically) real. For k=0 the phase factor ``T = e^{ikα}`` is ``1`` and K_red / M_red have identically zero imaginary parts. For k=N/2 on even N the phase is ``-1`` — again real. Other harmonics produce complex Hermitian reductions. We look at the actual data rather than the harmonic index so the caller can pass any sparse matrix without having to re-derive the index. """ data = A.data return bool(np.all(np.abs(data.imag) <= atol)) def _hermitian_to_real_augment(A: sp.csr_array) -> sp.csr_array: """Embed a complex-Hermitian sparse ``A`` (n×n) as real-symmetric (2n×2n). For Hermitian ``A = A_re + i A_im`` (A_re symmetric, A_im antisymmetric) the block matrix A_aug = [[A_re, -A_im], [A_im, A_re]] is real-symmetric of size 2n×2n, and its spectrum is ``{λ}`` from A's spectrum but with each eigenvalue repeated twice (eigenvectors ``[Re(v); Im(v)]`` and ``[-Im(v); Re(v)]``). This lets us solve Hermitian generalised eigenproblems with ``scipy.sparse.linalg.eigsh`` (real-symmetric only) without pulling in PRIMME. """ # Build the 2×2 block via ``sp.bmat`` — ~2× faster than the previous # 4-way COO concatenation + bool-mask filter on typical # cyclic-reduced nnz (the inner concatenation doubled every COO # entry; bmat keeps one buffer per block and reuses the input # indptr/indices verbatim). A_re and A_im inherit A's CSR pattern # directly — no tocoo round-trip — so the only real work is the # 2×2 block stitching inside scipy. A_re = sp.csr_array((A.data.real, A.indices, A.indptr), shape=A.shape) A_im = sp.csr_array((A.data.imag, A.indices, A.indptr), shape=A.shape) aug = sp.bmat([[A_re, -A_im], [A_im, A_re]], format="csr") # Drop explicit zeros so the factor doesn't walk dead slots. This is # relevant at diagonal of Re(A) (imag part structurally zero at # diagonals for Hermitian A), producing ~n explicit zeros in each # cross-diagonal block. aug.eliminate_zeros() return aug def _shift_invert_opinv( A: sp.csr_array | sp.csc_array, linear_solver: str, *, assume_spd: bool ) -> spla.LinearOperator: """Build the ``(K - σM)^-1`` operator used by ``eigsh`` shift-invert. Routes the inner factorisation through the registered linear-solver chain (pardiso → cholmod → umfpack → superlu for ``"auto"``), matching what ``ArpackSolver`` does for plain modal. Warns once per process if pardiso is missing and the problem is large enough to benefit. ``assume_spd`` controls the factorisation path: pass ``True`` only when the caller knows the reduced operator is strictly SPD (e.g. Dirichlet-constrained, sigma < 0). With ``False`` the auto-chain resolves to a general-purpose backend (pardiso/umfpack/superlu) that handles indefinite / semi-definite operators via LU — needed for free-free cyclic (K_red has a rigid-body null-space at k=0 and σ=0) and for the complex-Hermitian real augmentation. """ A_csc = A.tocsc() if linear_solver == "auto": resolved = select_default_linear_solver(spd=assume_spd, n_dof=A_csc.shape[0]) Solver = get_linear_solver(resolved) else: Solver = get_linear_solver(linear_solver, n_dof=A_csc.shape[0]) lu = Solver(A_csc, assume_spd=assume_spd) return spla.LinearOperator(A_csc.shape, matvec=lu.solve, dtype=A_csc.dtype) def _complex_as_real_augmented_matvec(A: sp.csr_array) -> spla.LinearOperator: """Wrap a complex Hermitian ``A`` (n×n) as the 2n real augmentation matvec. The real 2n augmentation ``[[Re(A), -Im(A)], [Im(A), Re(A)]]`` acts on ``x = [u; v]`` (two real length-n halves) as ``[[Re(A), -Im(A)], [Im(A), Re(A)]] · [u; v] = [Re(A · (u + iv)); Im(A · (u + iv))]`` So ``eigsh`` gets the same 2n operator action from one complex CSR matvec on ``A`` — no need to materialise the 4-block real CSR. On a 45k-DOF rotor sector this saves ~200 MB of peak RSS per k (the augmentation has ~4× K_red's nnz) and the ``sp.bmat`` build time per matrix per harmonic. The complex matvec itself is ~2× faster than the real 2n matvec at equal triplet count, since it touches n complex numbers (2n floats) instead of 2n real rows × 2 blocks. """ n = A.shape[0] def _matvec(x: np.ndarray) -> np.ndarray: u = x[:n] v = x[n:] w = A @ (u + 1j * v) return np.concatenate((w.real, w.imag)) return spla.LinearOperator((2 * n, 2 * n), matvec=_matvec, dtype=np.float64) def _complex_opinv_for_augmented(K_red: sp.csr_array) -> spla.LinearOperator: """Build an OPinv on the real 2n augmentation via a *complex* factor of K_red. The ``eigsh`` shift-invert loop on the augmented pair ``([[Re,-Im],[Im,Re]], …)`` at ``σ=0`` needs ``K_aug^{-1} · x`` for each iteration. Writing ``x = [u; v]`` (two real length-n halves) and ``Z = K_red`` (complex Hermitian), a little algebra on the block form gives K_aug^{-1} · [u; v] = [Re(Z^{-1} (u + i v)); Im(Z^{-1} (u + i v))]. So one complex ``splu`` on K_red (size n) replaces a real ``splu`` on the full augmentation (size 2n) — cheaper to compute and apply. scipy's ``splu`` handles complex CSC directly. We stay on SuperLU here (rather than routing through our pluggable linear-solver chain) because: * most registered backends — pardiso, cholmod, umfpack — are real-only; * the eigenvalue pair-dedup loop downstream is calibrated against SuperLU's convergence, and switching backends previously produced cross-Python-version mismatches. """ n = K_red.shape[0] lu = spla.splu(K_red.tocsc()) def _matvec(x: np.ndarray) -> np.ndarray: # x is (2n,) real from eigsh's Lanczos loop. u = x[:n] v = x[n:] w = lu.solve(u + 1j * v) return np.concatenate((w.real, w.imag)) return spla.LinearOperator((2 * n, 2 * n), matvec=_matvec, dtype=np.float64) def _solve_reduced_eig( K_red: sp.csr_array, M_red: sp.csr_array, n_req: int, n_keep: int, linear_solver: str, ) -> tuple[np.ndarray, np.ndarray]: """Lowest ``n_req`` modes of ``K_red φ = ω² M_red φ`` (complex Hermitian). Small problems (``n_keep <= _DENSE_CUTOFF``) go through LAPACK ``eigh`` directly for maximum accuracy. Larger sectors use a sparse shift-invert path whose inner ``K^-1`` solve is routed via :func:`get_linear_solver` so pardiso is the default when installed (same chain as plain modal; warns once if missing on large problems): * For real harmonics (k=0 / k=N/2) the reduced operators are strictly real-symmetric and ``scipy.sparse.linalg.eigsh`` with ``sigma=0`` returns the lowest modes via shift-invert. * For complex harmonics we embed the Hermitian pair ``(K_red, M_red)`` into a 2n-sized real-symmetric pair via :func:`_hermitian_to_real_augment`, run ``eigsh`` on that, and deduplicate each eigenvalue (the embedding doubles them). The complex eigenvector ``v`` is reconstructed from the augmented eigenvector ``[a; b]`` as ``v = a + i b``. """ if n_keep <= _DENSE_CUTOFF: K_dense = K_red.toarray() M_dense = M_red.toarray() K_dense = 0.5 * (K_dense + K_dense.conj().T) M_dense = 0.5 * (M_dense + M_dense.conj().T) w, v = la.eigh(K_dense, M_dense, subset_by_index=[0, n_req - 1]) return w, v.astype(np.complex128, copy=False) if _is_real_harmonic(K_red) and _is_real_harmonic(M_red): # Pure real-symmetric sparse path. K_red can be semi-definite at # k=0 when the sector has no Dirichlet BCs (rigid-body nullspace), # so ``assume_spd=False`` keeps the factorisation on LU rather than # Cholesky. K_r = sp.csr_array((K_red.data.real, K_red.indices, K_red.indptr), shape=K_red.shape) M_r = sp.csr_array((M_red.data.real, M_red.indices, M_red.indptr), shape=M_red.shape) K_r = 0.5 * (K_r + K_r.T) M_r = 0.5 * (M_r + M_r.T) OPinv = _shift_invert_opinv(K_r, linear_solver, assume_spd=False) w, v = spla.eigsh(K_r, k=n_req, M=M_r, sigma=0.0, which="LM", OPinv=OPinv) order = np.argsort(w) return w[order], v[:, order].astype(np.complex128, copy=False) # Complex Hermitian → 2n real-symmetric augmentation. The augmented # matrix shares the spectrum of the Hermitian original (each eigenvalue # doubled). # # Shift-invert at σ=0 needs ``K_aug^-1 · x`` inside the Lanczos loop. # Instead of factoring the 2n real augmentation directly (which was # scipy's default when we passed ``OPinv=None``), exploit the block # identity # # [[Re(Z), -Im(Z)], [Im(Z), Re(Z)]]^{-1} · [u; v] # = [Re(Z^{-1}(u+iv)); Im(Z^{-1}(u+iv))] # # with ``Z = K_red`` (complex Hermitian, size n). A single complex # ``splu`` factor on ``K_red`` is ~3× cheaper than a real ``splu`` on # the 2n augmentation on rotor-scale sectors — the augmentation has # ~4× more nnz (four blocks of K_red's pattern) and factor cost grows # super-linearly with problem size. # Skip materialising the 2n real ``K_aug`` / ``M_aug`` CSRs entirely — # wrap the complex K_red / M_red as real 2n LinearOperators via the # block-matvec identity (see :func:`_complex_as_real_augmented_matvec`). # On a rotor-scale sector this drops ~200 MB of peak RSS per k (the # augmentation has ~4× K_red's nnz for each of K_aug / M_aug) and # skips a ``sp.bmat`` build per matrix per harmonic. eigsh in # shift-invert mode drives the Lanczos loop through OPinv and the M # matvec only — A itself is inspected for shape/dtype but never # matvec'd — so the K LinearOperator here is purely a shape handle. OPinv = _complex_opinv_for_augmented(K_red) K_op = _complex_as_real_augmented_matvec(K_red) M_op = _complex_as_real_augmented_matvec(M_red) # Each true eigenvalue appears twice in the augmented spectrum; ask # for 2·n_req and then dedupe. Slight padding (min(2·n_req+4, 2n_keep-1)) # absorbs clustered pairs that land just outside the first 2·n_req # convergence block. n_aug = 2 * n_keep n_request = min(2 * n_req + 4, n_aug - 1) w_aug, v_aug = spla.eigsh(K_op, k=n_request, M=M_op, sigma=0.0, which="LM", OPinv=OPinv) order = np.argsort(w_aug) w_aug = w_aug[order] v_aug = v_aug[:, order] # Merge consecutive duplicates. Two eigenvectors of the augmented # matrix span the same complex eigenmode; Gram-Schmidt on the first # of each pair gives the complex ``v`` up to a global phase. w_out: list[float] = [] v_out: list[np.ndarray] = [] i = 0 while i < len(w_aug) and len(w_out) < n_req: # Take the current augmented eigenvector as the complex mode. a = v_aug[:n_keep, i] b = v_aug[n_keep:, i] v_complex = a + 1j * b nrm = np.linalg.norm(v_complex) if nrm > 0: v_complex = v_complex / nrm w_out.append(w_aug[i]) v_out.append(v_complex) # Skip the paired partner (same eigenvalue, 90° phase-rotated). # The partner is expected within a tiny relative tolerance. j = i + 1 if j < len(w_aug) and abs(w_aug[j] - w_aug[i]) <= 1e-8 * max(abs(w_aug[i]), 1.0): i = j + 1 else: i = j w_final = np.array(w_out[:n_req], dtype=np.float64) v_final = np.stack(v_out[:n_req], axis=1).astype(np.complex128) return w_final, v_final @dataclass class _CyclicReductionPlan: """Cached ``P^H A P`` topology for an N-sector cyclic sweep. The reduced triplets partition by phase exponent ``e ∈ {-1, 0, +1}`` — a function of where each input ``(row, col)`` lives relative to the cyclic boundary, independent of the harmonic index ``k``. Once per input matrix we bucket the reduced triplets into three real CSRs (``A_zero``, ``A_pos``, ``A_neg``); per-harmonic evaluation is then A_red(k) = A_zero + e^{ikα} · A_pos + e^{-ikα} · A_neg with ``|T|² = 1`` cancellation making the high-×-high block phase-independent (see :func:`_plan_cyclic_reduction` for the derivation). For a PTR-scale rotor this replaces ~36 Python-heavy COO walks (2 matrices × 18 harmonics) with just 2 walks plus 36 O(nnz) sparse scalings — roughly an order of magnitude less Python overhead on the cyclic sweep hot path. """ A_zero: sp.csr_array A_pos: sp.csr_array A_neg: sp.csr_array def apply(self, T: complex) -> sp.csr_array: # Real-harmonic fast paths: k=0 and k=N/2 stay purely real so the # downstream sparse eigsh path can take its real-symmetric branch # without a ``.real`` copy. if T == 1.0 + 0.0j: return self.A_zero + self.A_pos + self.A_neg if T == -1.0 + 0.0j: return self.A_zero - self.A_pos - self.A_neg return self.A_zero + T * self.A_pos + T.conjugate() * self.A_neg def _plan_cyclic_reduction( A_coo: sp.coo_matrix | sp.coo_array, red_of_global: np.ndarray, fixed_mask: np.ndarray, face_mask: np.ndarray, low: np.ndarray, # (P, d) high: np.ndarray, # (P, d) R: np.ndarray, # (d, d) real rotation — phase-independent ) -> _CyclicReductionPlan: """Plan ``P^H A P`` and return a per-harmonic evaluator. ``A`` (real SPD) is walked once. For each (r, c, v) triplet four cases arise by ``(row_is_hi, col_is_hi)``: * ``(False, False)`` — non-face × non-face, emits one triplet with coefficient ``v``. Phase exponent ``0``. * ``(True, False)`` — high row: expands into ``d`` triplets, one per low-partner DOF ``b``, each weighted by ``R[a_r, b]``. Phase factor in the full evaluation is ``conj(T·R[a_r, b]) = conj(T)·R``, so the base coefficient is the real part ``v·R[a_r, b]`` and the phase exponent is ``-1``. * ``(False, True)`` — high col: mirror of the above, phase exponent ``+1``, base coefficient ``v·R[a_c, b']``. * ``(True, True)`` — high × high: ``d²`` triplets. The two phase factors ``conj(T·R[a_r, b])`` and ``T·R[a_c, b']`` multiply to ``|T|² · R[a_r, b] · R[a_c, b'] = R[a_r, b] · R[a_c, b']`` — the unit-modulus phase cancels. Phase exponent ``0``. The three phase buckets become three real CSRs with duplicates summed within each bucket. :meth:`_CyclicReductionPlan.apply` then combines them per-k with the scalar phase factor. """ rows = A_coo.row cols = A_coo.col data = np.asarray(A_coo.data, dtype=np.float64) # A is real SPD P, d = low.shape n_total = A_coo.shape[0] n_keep = int((red_of_global >= 0).sum()) # Lookup: map each high-face global DOF to its (pair-index p, a-index) # so we can fetch the rotation-row that rewrites that DOF. Dense # ``(N,)`` sentinels — only high DOFs matter, rest stay at -1. high_p = -np.ones(n_total, dtype=np.int64) high_a = -np.ones(n_total, dtype=np.int64) p_idx = np.broadcast_to(np.arange(P)[:, None], (P, d)).ravel() a_idx = np.broadcast_to(np.arange(d)[None, :], (P, d)).ravel() high_p[high.ravel()] = p_idx high_a[high.ravel()] = a_idx # Drop fixed rows/cols up front. keep_entry = ~(fixed_mask[rows] | fixed_mask[cols]) rows = rows[keep_entry] cols = cols[keep_entry] data = data[keep_entry] is_row_hi = face_mask[rows] is_col_hi = face_mask[cols] zero_rows: list[np.ndarray] = [] zero_cols: list[np.ndarray] = [] zero_vals: list[np.ndarray] = [] # --- Case A: (non-face, non-face) — phase exponent 0 ----------------- m = ~is_row_hi & ~is_col_hi # ``red_of_global`` is -1 for face AND fixed rows; fixed are already # dropped and ~is_row_hi / ~is_col_hi rules out face, so the mapped # indices here are guaranteed >= 0. zero_rows.append(red_of_global[rows[m]]) zero_cols.append(red_of_global[cols[m]]) zero_vals.append(data[m]) # --- Case B: (high row, non-face col) — phase exponent -1 ------------ m = is_row_hi & ~is_col_hi r_sub = rows[m] c_sub = cols[m] v_sub = data[m] p_r = high_p[r_sub] a_r = high_a[r_sub] red_c_b = red_of_global[c_sub] # Expand the ``d`` low-partner contributions per entry in one batch. b_arange = np.arange(d) low_lookup = low[p_r[:, None], b_arange[None, :]] # (n, d) red_r_b = red_of_global[low_lookup] # (n, d); -1 when partner is fixed R_row = R[a_r[:, None], b_arange[None, :]] # (n, d) real coef_b = v_sub[:, None] * R_row # (n, d) real valid_b = red_r_b >= 0 # red_c_b guaranteed >= 0 cols_b_full = np.broadcast_to(red_c_b[:, None], red_r_b.shape) neg_rows = red_r_b[valid_b] neg_cols = cols_b_full[valid_b] neg_vals = coef_b[valid_b] # --- Case C: (non-face row, high col) — phase exponent +1 ------------ m = ~is_row_hi & is_col_hi r_sub = rows[m] c_sub = cols[m] v_sub = data[m] red_r_c = red_of_global[r_sub] p_c = high_p[c_sub] a_c = high_a[c_sub] bp_arange = np.arange(d) low_lookup = low[p_c[:, None], bp_arange[None, :]] # (n, d) red_c_c = red_of_global[low_lookup] R_col = R[a_c[:, None], bp_arange[None, :]] coef_c = v_sub[:, None] * R_col valid_c = red_c_c >= 0 rows_c_full = np.broadcast_to(red_r_c[:, None], red_c_c.shape) pos_rows = rows_c_full[valid_c] pos_cols = red_c_c[valid_c] pos_vals = coef_c[valid_c] # --- Case D: (high row, high col) — phase exponent 0 ----------------- m = is_row_hi & is_col_hi r_sub = rows[m] c_sub = cols[m] v_sub = data[m] p_r = high_p[r_sub] a_r = high_a[r_sub] p_c = high_p[c_sub] a_c = high_a[c_sub] # Full (n, d, d) expansion. R-weights combine as R[a_r, b] · R[a_c, b']. low_r = low[p_r[:, None, None], b_arange[None, :, None]] # (n, d, 1) low_c = low[p_c[:, None, None], bp_arange[None, None, :]] # (n, 1, d) low_r = np.broadcast_to(low_r, (r_sub.size, d, d)) low_c = np.broadcast_to(low_c, (r_sub.size, d, d)) red_rr = red_of_global[low_r] red_cc = red_of_global[low_c] Rr = R[a_r[:, None, None], b_arange[None, :, None]] Rc = R[a_c[:, None, None], bp_arange[None, None, :]] coef_hh = v_sub[:, None, None] * Rr * Rc # (n, d, d) valid_hh = (red_rr >= 0) & (red_cc >= 0) zero_rows.append(red_rr[valid_hh]) zero_cols.append(red_cc[valid_hh]) zero_vals.append(coef_hh[valid_hh]) shape = (n_keep, n_keep) def _to_csr(rs: np.ndarray, cs: np.ndarray, vs: np.ndarray) -> sp.csr_array: if rs.size == 0: return sp.csr_array(shape, dtype=np.float64) return sp.coo_array( ( vs.astype(np.float64, copy=False), (rs.astype(np.int64, copy=False), cs.astype(np.int64, copy=False)), ), shape=shape, ).tocsr() A_zero = _to_csr( np.concatenate(zero_rows), np.concatenate(zero_cols), np.concatenate(zero_vals) ) A_pos = _to_csr(pos_rows, pos_cols, pos_vals) A_neg = _to_csr(neg_rows, neg_cols, neg_vals) return _CyclicReductionPlan(A_zero=A_zero, A_pos=A_pos, A_neg=A_neg)