"""Cyclic-symmetry modal analysis for a single base sector.
A rotor with ``N`` identical sectors spanning 360° satisfies, at each
*harmonic index* (nodal diameter) ``k ∈ {0, 1, …, N/2}``,
u_sector_{j+1}(global) = e^{i k α} · R(α) · u_sector_j(global)
with ``α = 2π/N`` and ``R(α)`` the spatial rotation about the symmetry
axis. Applied at the base-sector's high-angle face (which is identified
with sector 1's low-angle face), this gives the constraint
u_high(global) = e^{i k α} · R(α) · u_low(global)
relating paired low-/high-face DOFs. That constraint lets a modal solve
work on **one sector** and produce the full-rotor spectrum one k at a
time — an N-times speedup relative to meshing the whole rotor.
The reduction here matches MAPDL's CYCLIC approach in its *complex* form.
Base-sector DOFs are partitioned into interior ``I`` and low-face ``L``
/ high-face ``H`` sets, with a one-to-one (low-node, high-node)
correspondence given by the user. High DOFs are eliminated via the
constraint above; the remaining ``[u_I; u_L]`` vector satisfies a
Hermitian generalised eigenproblem
K_red φ = ω² M_red φ
with ``K_red = P^H K P`` and ``M_red = P^H M P`` (K, M real SPD; P the
complex constraint matrix that embeds the reduced vector back into the
full one). For k=0 and k=N/2 (even N) the phase is ±1 and the reduced
problem can be real; for intermediate k it is complex Hermitian.
Per-pair rotation
-----------------
Stiffness / mass are usually assembled in a global XYZ frame, which
means the cyclic phase relation picks up the same rotation that maps
sector j to sector j+1. Pass ``pair_rotation`` as the ``(d, d)`` matrix
R(α) acting on each node pair's ``d`` translational DOFs (3 for a 3D
solid sweeping about z). Omit it (``None``) when face DOFs are already
in a cylindrical / per-sector local frame, in which case R(α) reduces to
the identity.
Harmonic-index counting
-----------------------
- k=0 — single real eigenmodes (in-phase family)
- k=N/2 (even N) — single real eigenmodes (anti-phase family)
- 0 < k < N/2 — each eigenvalue corresponds to a degenerate pair in
the full rotor (travelling-wave directions). The complex reduced
solve returns them once per k; the full-rotor multiset is recovered
by counting them twice.
References
----------
- Thomas, D. L. "Dynamics of rotationally periodic structures."
International Journal for Numerical Methods in Engineering 14 (1),
81-102 (1979). [the foundational complex-valued cyclic reduction
applied here — one base sector factored at each harmonic index
``k`` recovers the full-rotor spectrum]
- Wildheim, S. J. "Vibrations of rotationally periodic
structures." Ph.D. dissertation, Linköping University (1979).
[companion treatment of the constraint matrix ``P`` and its
Hermitian reduction ``P^H K P``]
- Bathe, K.-J. *Finite Element Procedures*, Prentice Hall (1996),
§10.3.4 (cyclic symmetry). [textbook derivation matching the
partitioned ``[u_I; u_L]`` variable convention used below]
"""
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
import numpy as np
import scipy.linalg as la
import scipy.sparse as sp
import scipy.sparse.linalg as spla
from .._threads import limit as _thread_limit
from .linear import get_linear_solver, select_default_linear_solver
if TYPE_CHECKING:
from femorph_solver.model import Model
# Above this reduced-sector DOF count we switch from LAPACK dense eigh to
# sparse shift-invert eigsh. The dense complex-Hermitian eigh allocates
# two ``n_keep²`` matrices (~16 GB real / 32 GB complex at n_keep = 45k);
# 1500 DOFs fits comfortably in memory while still leaving ample headroom
# for LAPACK's internal work arrays.
_DENSE_CUTOFF = 1500
[docs]
@dataclass
class CyclicModalResult:
"""Modes for one harmonic index of a cyclic-symmetry modal solve."""
harmonic_index: int
#: Number of sectors in the full rotor (same for every result in a sweep).
n_sectors: int
#: (n_modes,) — ω² = (2πf)²; numerically negative values clipped to 0.
omega_sq: np.ndarray
#: (n_modes,) — cyclic frequencies f [Hz].
frequency: np.ndarray
#: (N, n_modes) — complex mode shapes on the base sector, indexed by
#: the original DOF layout (not the reduced layout). For k=0 and
#: k=N/2 the imaginary part is identically zero.
mode_shapes: np.ndarray
#: (3,) — point on the rotor rotation axis. ``None`` means "not
#: recorded by the caller"; writers default to the origin in that
#: case. Populated by :func:`solve_cyclic_modal` when a
#: ``pair_rotation`` is supplied.
axis_point: np.ndarray | None = None
#: (3,) — unit vector along the rotor axis. ``None`` means "not
#: recorded"; writers default to ``+Z``. Populated by
#: :func:`solve_cyclic_modal` from the supplied ``pair_rotation``.
axis_dir: np.ndarray | None = None
[docs]
def save_cyclic_modal_results(
results: Sequence[CyclicModalResult],
path: str | Path,
model: Model,
*,
unit_system: str = "SI",
extra_field_data: dict[str, np.ndarray] | None = None,
) -> Path:
"""Serialise a cyclic-modal sweep to a single disk-backed ``.pv`` file.
Parameters
----------
results : sequence of CyclicModalResult
Per-harmonic-index results as returned by
:func:`solve_cyclic_modal`. All entries must share the same
``n_sectors``.
path : str or pathlib.Path
Destination file.
model : femorph_solver.model.Model
Model whose K / M matrices this sweep was computed on;
supplies the mesh and DOF layout.
extra_field_data : mapping, optional
Extra ``field_data`` entries (e.g. solver statistics).
Returns
-------
pathlib.Path
Canonicalised absolute path to the written file.
Notes
-----
The DOF-indexed ``(N, n_modes)`` complex mode shapes on each
per-``k`` result get re-projected onto per-``(k, mode)``,
per-node ``(n_modes, n_points, 6)`` complex arrays — the layout
:func:`~femorph_solver.result.cyclic_modal.write_cyclic_modal_result`
expects.
"""
if not results:
raise ValueError("results must be non-empty")
n_sectors = {int(r.n_sectors) for r in results}
if len(n_sectors) != 1:
raise ValueError(f"all results must share the same n_sectors; saw {sorted(n_sectors)}")
(n_sectors_value,) = n_sectors
from femorph_solver.result._material import MaterialTable
from femorph_solver.result.cyclic_modal import write_cyclic_modal_result
grid = model.grid
dof_map = model.dof_map()
node_nums = np.asarray(grid.point_data["ansys_node_num"], dtype=np.int64)
node_to_row = {int(n): i for i, n in enumerate(node_nums)}
node_row = np.fromiter(
(node_to_row[int(n)] for n in dof_map[:, 0]),
dtype=np.int64,
count=dof_map.shape[0],
)
comp = dof_map[:, 1].astype(np.int64)
modes_by_k: dict[int, tuple[np.ndarray, np.ndarray]] = {}
for r in results:
n_modes = int(r.mode_shapes.shape[1])
shapes = np.zeros((n_modes, grid.n_points, 6), dtype=np.complex128)
# Scatter each mode's DOF-indexed vector onto (node, component).
for k_mode in range(n_modes):
shapes[k_mode, node_row, comp] = r.mode_shapes[:, k_mode]
modes_by_k[int(r.harmonic_index)] = (
np.asarray(r.frequency, dtype=np.float64),
shapes,
)
# Collect the rotor axis stamp. Every result in a sweep comes
# from one cyclic solve so their axis info must agree; just take
# the first one that actually carries the stamp.
axis_point: np.ndarray | None = None
axis_dir: np.ndarray | None = None
for r in results:
if r.axis_point is not None and axis_point is None:
axis_point = np.asarray(r.axis_point, dtype=np.float64)
if r.axis_dir is not None and axis_dir is None:
axis_dir = np.asarray(r.axis_dir, dtype=np.float64)
material_table = MaterialTable.from_model(model, unit_system=str(unit_system))
return write_cyclic_modal_result(
path,
grid,
n_sectors_value,
modes_by_k,
axis_point=axis_point,
axis_dir=axis_dir,
material_table=material_table,
unit_system=unit_system,
extra_field_data=extra_field_data,
)
def _harmonic_indices(n_sectors: int) -> list[int]:
return list(range(n_sectors // 2 + 1))
[docs]
def solve_cyclic_modal(
K: sp.spmatrix | sp.sparray,
M: sp.spmatrix | sp.sparray,
low_node_dofs: np.ndarray,
high_node_dofs: np.ndarray,
n_sectors: int,
n_modes: int = 10,
*,
pair_rotation: np.ndarray | None = None,
harmonic_indices: Sequence[int] | None = None,
prescribed: dict[int, float] | None = None,
thread_limit: int | None = None,
linear_solver: str = "auto",
) -> list[CyclicModalResult]:
"""Solve the cyclic-symmetry modal problem on a single base sector.
Parameters
----------
K, M : scipy.sparse
Global stiffness / mass of the **base sector**, before cyclic or
Dirichlet reduction. Assumed real SPD.
low_node_dofs, high_node_dofs : (P, d) int arrays
Global DOF indices at ``P`` paired boundary nodes, each with
``d`` DOFs (typically 3 for a 3D solid). ``low_node_dofs[i, :]``
and ``high_node_dofs[i, :]`` must address the same physical DOFs
(same labels, same order) at the node pair that's identified by
the cyclic BC. A 1-D array is accepted for scalar (``d=1``)
problems.
n_sectors : int
Number of repetitions that close the rotor (``N``).
n_modes : int, default 10
Number of lowest modes per harmonic index.
pair_rotation : (d, d) array, optional
Spatial rotation R(α) that maps sector j's local frame to
sector j+1's, applied per node pair. Pass ``None`` when face
DOFs are already in a per-sector local frame (then R=I implied).
harmonic_indices : sequence of int, optional
Harmonic indices to solve. Defaults to ``0, 1, …, N//2``.
prescribed : mapping, optional
Dirichlet BCs on the sector (global DOF index → 0). Currently
only zero-value Dirichlet is supported; if a cyclic-face DOF is
prescribed its partner must also be prescribed (so the cyclic
constraint is trivially satisfied at that DOF).
thread_limit : int or None, optional
Cap on BLAS / OpenMP threads used inside the per-harmonic dense
Hermitian eigensolve. ``None`` defers to the global limit (see
:func:`femorph_solver.set_thread_limit`).
linear_solver : str, default ``"auto"``
Name of the registered linear backend used for the inner
shift-invert factorisation in the sparse path. ``"auto"`` picks
the fastest available SPD direct solver in the priority chain
``pardiso → cholmod → umfpack → superlu`` and emits a
one-shot warning if pardiso is missing on a problem large
enough to benefit from it. See
:func:`femorph_solver.solvers.linear.list_linear_solvers`.
Returns
-------
list[CyclicModalResult]
One :class:`CyclicModalResult` per requested harmonic index, in
the order requested.
"""
if n_sectors < 2:
raise ValueError(f"n_sectors must be >= 2, got {n_sectors}")
low = np.atleast_2d(np.asarray(low_node_dofs, dtype=np.int64))
high = np.atleast_2d(np.asarray(high_node_dofs, dtype=np.int64))
if low.shape != high.shape:
raise ValueError(
f"low_node_dofs and high_node_dofs must have the same shape, "
f"got {low.shape} vs {high.shape}"
)
if low.ndim != 2:
raise ValueError(f"node-DOF arrays must be 2-D (P, d), got shape {low.shape}")
P, d = low.shape
if pair_rotation is None:
R = np.eye(d)
else:
R = np.asarray(pair_rotation, dtype=np.float64)
if R.shape != (d, d):
raise ValueError(f"pair_rotation must be ({d}, {d}), got {R.shape}")
if harmonic_indices is None:
harmonic_indices = _harmonic_indices(n_sectors)
N = K.shape[0]
if M.shape != K.shape:
raise ValueError(f"K and M must have the same shape, got {K.shape} vs {M.shape}")
fixed_mask = np.zeros(N, dtype=bool)
if prescribed:
for idx, val in prescribed.items():
if float(val) != 0.0:
raise NotImplementedError(
"non-zero Dirichlet BCs are not supported by the cyclic "
"reduction yet — all prescribed values must be 0"
)
fixed_mask[int(idx)] = True
low_flat = low.ravel()
high_flat = high.ravel()
lo_fixed = fixed_mask[low_flat]
hi_fixed = fixed_mask[high_flat]
if (lo_fixed != hi_fixed).any():
raise ValueError(
"cyclic face Dirichlet is only allowed when both low and high "
"partners are fixed to zero"
)
face_mask = np.zeros(N, dtype=bool)
face_mask[high_flat] = True # eliminated DOFs
# Reduced DOF ordering: every DOF that is neither fixed nor on the
# high face keeps an index. Low-face DOFs are the "masters".
keep = ~(fixed_mask | face_mask)
n_keep = int(keep.sum())
if n_keep == 0:
raise ValueError("no free DOFs remain after cyclic + Dirichlet reduction")
red_of_global = -np.ones(N, dtype=np.int64)
red_of_global[keep] = np.arange(n_keep)
# Precompute the per-pair dense block ``T · R`` as a function of phase.
# The constraint is u_high[i, :] = T · R · u_low[i, :] (vector of d).
# That's equivalent to building an auxiliary (L_flat × n_keep) sparse
# matrix S with entries S[high_dof, reduced-index-of-low-dof] = T·R[a,b].
# Plan the reduction topology once per input matrix. The per-harmonic
# cost then collapses to three sparse scalings + two adds (see
# :class:`_CyclicReductionPlan`) instead of a full Python-level COO walk
# per k. On an N-sector sweep this replaces ``2 × (N/2 + 1)`` COO walks
# with just two, which is an order of magnitude less Python overhead
# for rotor-scale sectors.
K_plan = _plan_cyclic_reduction(K.tocoo(), red_of_global, fixed_mask, face_mask, low, high, R)
M_plan = _plan_cyclic_reduction(M.tocoo(), red_of_global, fixed_mask, face_mask, low, high, R)
# Extract the rotor axis from the caller's ``pair_rotation`` so
# the result can travel with its geometry stamp. For d != 3 this
# is a no-op (the axis concept is 3D-specific).
axis_point_stamp: np.ndarray | None = None
axis_dir_stamp: np.ndarray | None = None
if pair_rotation is not None and d == 3:
from femorph_solver.result._cyclic_expand import axis_from_rotation
axis_dir_stamp, _sector_angle = axis_from_rotation(R.real.astype(np.float64))
# The axis passes through the origin in the global frame unless
# the caller has an asymmetric mesh. We default to origin and
# let the Model / writer override if needed.
axis_point_stamp = np.zeros(3, dtype=np.float64)
results: list[CyclicModalResult] = []
alpha = 2.0 * np.pi / n_sectors
with _thread_limit(thread_limit):
for k in harmonic_indices:
T = complex(np.exp(1j * k * alpha))
TR = T * R.astype(np.complex128)
K_red = K_plan.apply(T)
M_red = M_plan.apply(T)
n_req = min(n_modes, n_keep)
w, v = _solve_reduced_eig(K_red, M_red, n_req, n_keep, linear_solver)
w_clean = np.where(w > 0, w, 0.0)
phi = np.zeros((N, n_req), dtype=np.complex128)
phi[keep] = v
# Expand the eliminated high DOFs back: u_high[i,:] = T R · u_low[i,:].
u_low_blocks = phi[low_flat].reshape(P, d, n_req)
u_high_blocks = np.einsum("ab,pbm->pam", TR, u_low_blocks)
phi[high_flat] = u_high_blocks.reshape(P * d, n_req)
results.append(
CyclicModalResult(
harmonic_index=int(k),
n_sectors=n_sectors,
omega_sq=w_clean,
frequency=np.sqrt(w_clean) / (2.0 * np.pi),
mode_shapes=phi,
axis_point=axis_point_stamp,
axis_dir=axis_dir_stamp,
)
)
return results
def _is_real_harmonic(A: sp.csr_array, atol: float = 1e-14) -> bool:
"""Return True if A's sparse data is (numerically) real.
For k=0 the phase factor ``T = e^{ikα}`` is ``1`` and K_red / M_red
have identically zero imaginary parts. For k=N/2 on even N the
phase is ``-1`` — again real. Other harmonics produce complex
Hermitian reductions. We look at the actual data rather than the
harmonic index so the caller can pass any sparse matrix without
having to re-derive the index.
"""
data = A.data
return bool(np.all(np.abs(data.imag) <= atol))
def _hermitian_to_real_augment(A: sp.csr_array) -> sp.csr_array:
"""Embed a complex-Hermitian sparse ``A`` (n×n) as real-symmetric (2n×2n).
For Hermitian ``A = A_re + i A_im`` (A_re symmetric, A_im
antisymmetric) the block matrix
A_aug = [[A_re, -A_im],
[A_im, A_re]]
is real-symmetric of size 2n×2n, and its spectrum is ``{λ}`` from
A's spectrum but with each eigenvalue repeated twice (eigenvectors
``[Re(v); Im(v)]`` and ``[-Im(v); Re(v)]``). This lets us solve
Hermitian generalised eigenproblems with ``scipy.sparse.linalg.eigsh``
(real-symmetric only) without pulling in PRIMME.
"""
# Build the 2×2 block via ``sp.bmat`` — ~2× faster than the previous
# 4-way COO concatenation + bool-mask filter on typical
# cyclic-reduced nnz (the inner concatenation doubled every COO
# entry; bmat keeps one buffer per block and reuses the input
# indptr/indices verbatim). A_re and A_im inherit A's CSR pattern
# directly — no tocoo round-trip — so the only real work is the
# 2×2 block stitching inside scipy.
A_re = sp.csr_array((A.data.real, A.indices, A.indptr), shape=A.shape)
A_im = sp.csr_array((A.data.imag, A.indices, A.indptr), shape=A.shape)
aug = sp.bmat([[A_re, -A_im], [A_im, A_re]], format="csr")
# Drop explicit zeros so the factor doesn't walk dead slots. This is
# relevant at diagonal of Re(A) (imag part structurally zero at
# diagonals for Hermitian A), producing ~n explicit zeros in each
# cross-diagonal block.
aug.eliminate_zeros()
return aug
def _shift_invert_opinv(
A: sp.csr_array | sp.csc_array, linear_solver: str, *, assume_spd: bool
) -> spla.LinearOperator:
"""Build the ``(K - σM)^-1`` operator used by ``eigsh`` shift-invert.
Routes the inner factorisation through the registered linear-solver
chain (pardiso → cholmod → umfpack → superlu for ``"auto"``), matching
what ``ArpackSolver`` does for plain modal. Warns once per process if
pardiso is missing and the problem is large enough to benefit.
``assume_spd`` controls the factorisation path: pass ``True`` only
when the caller knows the reduced operator is strictly SPD (e.g.
Dirichlet-constrained, sigma < 0). With ``False`` the auto-chain
resolves to a general-purpose backend (pardiso/umfpack/superlu) that
handles indefinite / semi-definite operators via LU — needed for
free-free cyclic (K_red has a rigid-body null-space at k=0 and σ=0)
and for the complex-Hermitian real augmentation.
"""
A_csc = A.tocsc()
if linear_solver == "auto":
resolved = select_default_linear_solver(spd=assume_spd, n_dof=A_csc.shape[0])
Solver = get_linear_solver(resolved)
else:
Solver = get_linear_solver(linear_solver, n_dof=A_csc.shape[0])
lu = Solver(A_csc, assume_spd=assume_spd)
return spla.LinearOperator(A_csc.shape, matvec=lu.solve, dtype=A_csc.dtype)
def _complex_as_real_augmented_matvec(A: sp.csr_array) -> spla.LinearOperator:
"""Wrap a complex Hermitian ``A`` (n×n) as the 2n real augmentation matvec.
The real 2n augmentation ``[[Re(A), -Im(A)], [Im(A), Re(A)]]`` acts
on ``x = [u; v]`` (two real length-n halves) as
``[[Re(A), -Im(A)], [Im(A), Re(A)]] · [u; v]
= [Re(A · (u + iv)); Im(A · (u + iv))]``
So ``eigsh`` gets the same 2n operator action from one complex CSR
matvec on ``A`` — no need to materialise the 4-block real CSR. On
a 45k-DOF rotor sector this saves ~200 MB of peak RSS per k (the
augmentation has ~4× K_red's nnz) and the ``sp.bmat`` build time
per matrix per harmonic. The complex matvec itself is ~2× faster
than the real 2n matvec at equal triplet count, since it touches
n complex numbers (2n floats) instead of 2n real rows × 2 blocks.
"""
n = A.shape[0]
def _matvec(x: np.ndarray) -> np.ndarray:
u = x[:n]
v = x[n:]
w = A @ (u + 1j * v)
return np.concatenate((w.real, w.imag))
return spla.LinearOperator((2 * n, 2 * n), matvec=_matvec, dtype=np.float64)
def _complex_opinv_for_augmented(K_red: sp.csr_array) -> spla.LinearOperator:
"""Build an OPinv on the real 2n augmentation via a *complex* factor of K_red.
The ``eigsh`` shift-invert loop on the augmented pair
``([[Re,-Im],[Im,Re]], …)`` at ``σ=0`` needs ``K_aug^{-1} · x`` for
each iteration. Writing ``x = [u; v]`` (two real length-n halves)
and ``Z = K_red`` (complex Hermitian), a little algebra on the
block form gives
K_aug^{-1} · [u; v] = [Re(Z^{-1} (u + i v)); Im(Z^{-1} (u + i v))].
So one complex ``splu`` on K_red (size n) replaces a real ``splu`` on
the full augmentation (size 2n) — cheaper to compute and apply.
scipy's ``splu`` handles complex CSC directly. We stay on SuperLU
here (rather than routing through our pluggable linear-solver
chain) because:
* most registered backends — pardiso, cholmod, umfpack — are real-only;
* the eigenvalue pair-dedup loop downstream is calibrated against
SuperLU's convergence, and switching backends previously produced
cross-Python-version mismatches.
"""
n = K_red.shape[0]
lu = spla.splu(K_red.tocsc())
def _matvec(x: np.ndarray) -> np.ndarray:
# x is (2n,) real from eigsh's Lanczos loop.
u = x[:n]
v = x[n:]
w = lu.solve(u + 1j * v)
return np.concatenate((w.real, w.imag))
return spla.LinearOperator((2 * n, 2 * n), matvec=_matvec, dtype=np.float64)
def _solve_reduced_eig(
K_red: sp.csr_array,
M_red: sp.csr_array,
n_req: int,
n_keep: int,
linear_solver: str,
) -> tuple[np.ndarray, np.ndarray]:
"""Lowest ``n_req`` modes of ``K_red φ = ω² M_red φ`` (complex Hermitian).
Small problems (``n_keep <= _DENSE_CUTOFF``) go through LAPACK
``eigh`` directly for maximum accuracy. Larger sectors use a
sparse shift-invert path whose inner ``K^-1`` solve is routed via
:func:`get_linear_solver` so pardiso is the default when installed
(same chain as plain modal; warns once if missing on large problems):
* For real harmonics (k=0 / k=N/2) the reduced operators are
strictly real-symmetric and ``scipy.sparse.linalg.eigsh`` with
``sigma=0`` returns the lowest modes via shift-invert.
* For complex harmonics we embed the Hermitian pair
``(K_red, M_red)`` into a 2n-sized real-symmetric pair via
:func:`_hermitian_to_real_augment`, run ``eigsh`` on that, and
deduplicate each eigenvalue (the embedding doubles them). The
complex eigenvector ``v`` is reconstructed from the augmented
eigenvector ``[a; b]`` as ``v = a + i b``.
"""
if n_keep <= _DENSE_CUTOFF:
K_dense = K_red.toarray()
M_dense = M_red.toarray()
K_dense = 0.5 * (K_dense + K_dense.conj().T)
M_dense = 0.5 * (M_dense + M_dense.conj().T)
w, v = la.eigh(K_dense, M_dense, subset_by_index=[0, n_req - 1])
return w, v.astype(np.complex128, copy=False)
if _is_real_harmonic(K_red) and _is_real_harmonic(M_red):
# Pure real-symmetric sparse path. K_red can be semi-definite at
# k=0 when the sector has no Dirichlet BCs (rigid-body nullspace),
# so ``assume_spd=False`` keeps the factorisation on LU rather than
# Cholesky.
K_r = sp.csr_array((K_red.data.real, K_red.indices, K_red.indptr), shape=K_red.shape)
M_r = sp.csr_array((M_red.data.real, M_red.indices, M_red.indptr), shape=M_red.shape)
K_r = 0.5 * (K_r + K_r.T)
M_r = 0.5 * (M_r + M_r.T)
OPinv = _shift_invert_opinv(K_r, linear_solver, assume_spd=False)
w, v = spla.eigsh(K_r, k=n_req, M=M_r, sigma=0.0, which="LM", OPinv=OPinv)
order = np.argsort(w)
return w[order], v[:, order].astype(np.complex128, copy=False)
# Complex Hermitian → 2n real-symmetric augmentation. The augmented
# matrix shares the spectrum of the Hermitian original (each eigenvalue
# doubled).
#
# Shift-invert at σ=0 needs ``K_aug^-1 · x`` inside the Lanczos loop.
# Instead of factoring the 2n real augmentation directly (which was
# scipy's default when we passed ``OPinv=None``), exploit the block
# identity
#
# [[Re(Z), -Im(Z)], [Im(Z), Re(Z)]]^{-1} · [u; v]
# = [Re(Z^{-1}(u+iv)); Im(Z^{-1}(u+iv))]
#
# with ``Z = K_red`` (complex Hermitian, size n). A single complex
# ``splu`` factor on ``K_red`` is ~3× cheaper than a real ``splu`` on
# the 2n augmentation on rotor-scale sectors — the augmentation has
# ~4× more nnz (four blocks of K_red's pattern) and factor cost grows
# super-linearly with problem size.
# Skip materialising the 2n real ``K_aug`` / ``M_aug`` CSRs entirely —
# wrap the complex K_red / M_red as real 2n LinearOperators via the
# block-matvec identity (see :func:`_complex_as_real_augmented_matvec`).
# On a rotor-scale sector this drops ~200 MB of peak RSS per k (the
# augmentation has ~4× K_red's nnz for each of K_aug / M_aug) and
# skips a ``sp.bmat`` build per matrix per harmonic. eigsh in
# shift-invert mode drives the Lanczos loop through OPinv and the M
# matvec only — A itself is inspected for shape/dtype but never
# matvec'd — so the K LinearOperator here is purely a shape handle.
OPinv = _complex_opinv_for_augmented(K_red)
K_op = _complex_as_real_augmented_matvec(K_red)
M_op = _complex_as_real_augmented_matvec(M_red)
# Each true eigenvalue appears twice in the augmented spectrum; ask
# for 2·n_req and then dedupe. Slight padding (min(2·n_req+4, 2n_keep-1))
# absorbs clustered pairs that land just outside the first 2·n_req
# convergence block.
n_aug = 2 * n_keep
n_request = min(2 * n_req + 4, n_aug - 1)
w_aug, v_aug = spla.eigsh(K_op, k=n_request, M=M_op, sigma=0.0, which="LM", OPinv=OPinv)
order = np.argsort(w_aug)
w_aug = w_aug[order]
v_aug = v_aug[:, order]
# Merge consecutive duplicates. Two eigenvectors of the augmented
# matrix span the same complex eigenmode; Gram-Schmidt on the first
# of each pair gives the complex ``v`` up to a global phase.
w_out: list[float] = []
v_out: list[np.ndarray] = []
i = 0
while i < len(w_aug) and len(w_out) < n_req:
# Take the current augmented eigenvector as the complex mode.
a = v_aug[:n_keep, i]
b = v_aug[n_keep:, i]
v_complex = a + 1j * b
nrm = np.linalg.norm(v_complex)
if nrm > 0:
v_complex = v_complex / nrm
w_out.append(w_aug[i])
v_out.append(v_complex)
# Skip the paired partner (same eigenvalue, 90° phase-rotated).
# The partner is expected within a tiny relative tolerance.
j = i + 1
if j < len(w_aug) and abs(w_aug[j] - w_aug[i]) <= 1e-8 * max(abs(w_aug[i]), 1.0):
i = j + 1
else:
i = j
w_final = np.array(w_out[:n_req], dtype=np.float64)
v_final = np.stack(v_out[:n_req], axis=1).astype(np.complex128)
return w_final, v_final
@dataclass
class _CyclicReductionPlan:
"""Cached ``P^H A P`` topology for an N-sector cyclic sweep.
The reduced triplets partition by phase exponent ``e ∈ {-1, 0, +1}``
— a function of where each input ``(row, col)`` lives relative to
the cyclic boundary, independent of the harmonic index ``k``. Once
per input matrix we bucket the reduced triplets into three real CSRs
(``A_zero``, ``A_pos``, ``A_neg``); per-harmonic evaluation is then
A_red(k) = A_zero + e^{ikα} · A_pos + e^{-ikα} · A_neg
with ``|T|² = 1`` cancellation making the high-×-high block
phase-independent (see :func:`_plan_cyclic_reduction` for the
derivation). For a PTR-scale rotor this replaces ~36 Python-heavy
COO walks (2 matrices × 18 harmonics) with just 2 walks plus 36
O(nnz) sparse scalings — roughly an order of magnitude less Python
overhead on the cyclic sweep hot path.
"""
A_zero: sp.csr_array
A_pos: sp.csr_array
A_neg: sp.csr_array
def apply(self, T: complex) -> sp.csr_array:
# Real-harmonic fast paths: k=0 and k=N/2 stay purely real so the
# downstream sparse eigsh path can take its real-symmetric branch
# without a ``.real`` copy.
if T == 1.0 + 0.0j:
return self.A_zero + self.A_pos + self.A_neg
if T == -1.0 + 0.0j:
return self.A_zero - self.A_pos - self.A_neg
return self.A_zero + T * self.A_pos + T.conjugate() * self.A_neg
def _plan_cyclic_reduction(
A_coo: sp.coo_matrix | sp.coo_array,
red_of_global: np.ndarray,
fixed_mask: np.ndarray,
face_mask: np.ndarray,
low: np.ndarray, # (P, d)
high: np.ndarray, # (P, d)
R: np.ndarray, # (d, d) real rotation — phase-independent
) -> _CyclicReductionPlan:
"""Plan ``P^H A P`` and return a per-harmonic evaluator.
``A`` (real SPD) is walked once. For each (r, c, v) triplet four
cases arise by ``(row_is_hi, col_is_hi)``:
* ``(False, False)`` — non-face × non-face, emits one triplet with
coefficient ``v``. Phase exponent ``0``.
* ``(True, False)`` — high row: expands into ``d`` triplets, one per
low-partner DOF ``b``, each weighted by ``R[a_r, b]``. Phase
factor in the full evaluation is ``conj(T·R[a_r, b]) = conj(T)·R``,
so the base coefficient is the real part ``v·R[a_r, b]`` and the
phase exponent is ``-1``.
* ``(False, True)`` — high col: mirror of the above, phase exponent
``+1``, base coefficient ``v·R[a_c, b']``.
* ``(True, True)`` — high × high: ``d²`` triplets. The two phase
factors ``conj(T·R[a_r, b])`` and ``T·R[a_c, b']`` multiply to
``|T|² · R[a_r, b] · R[a_c, b'] = R[a_r, b] · R[a_c, b']`` — the
unit-modulus phase cancels. Phase exponent ``0``.
The three phase buckets become three real CSRs with duplicates
summed within each bucket. :meth:`_CyclicReductionPlan.apply` then
combines them per-k with the scalar phase factor.
"""
rows = A_coo.row
cols = A_coo.col
data = np.asarray(A_coo.data, dtype=np.float64) # A is real SPD
P, d = low.shape
n_total = A_coo.shape[0]
n_keep = int((red_of_global >= 0).sum())
# Lookup: map each high-face global DOF to its (pair-index p, a-index)
# so we can fetch the rotation-row that rewrites that DOF. Dense
# ``(N,)`` sentinels — only high DOFs matter, rest stay at -1.
high_p = -np.ones(n_total, dtype=np.int64)
high_a = -np.ones(n_total, dtype=np.int64)
p_idx = np.broadcast_to(np.arange(P)[:, None], (P, d)).ravel()
a_idx = np.broadcast_to(np.arange(d)[None, :], (P, d)).ravel()
high_p[high.ravel()] = p_idx
high_a[high.ravel()] = a_idx
# Drop fixed rows/cols up front.
keep_entry = ~(fixed_mask[rows] | fixed_mask[cols])
rows = rows[keep_entry]
cols = cols[keep_entry]
data = data[keep_entry]
is_row_hi = face_mask[rows]
is_col_hi = face_mask[cols]
zero_rows: list[np.ndarray] = []
zero_cols: list[np.ndarray] = []
zero_vals: list[np.ndarray] = []
# --- Case A: (non-face, non-face) — phase exponent 0 -----------------
m = ~is_row_hi & ~is_col_hi
# ``red_of_global`` is -1 for face AND fixed rows; fixed are already
# dropped and ~is_row_hi / ~is_col_hi rules out face, so the mapped
# indices here are guaranteed >= 0.
zero_rows.append(red_of_global[rows[m]])
zero_cols.append(red_of_global[cols[m]])
zero_vals.append(data[m])
# --- Case B: (high row, non-face col) — phase exponent -1 ------------
m = is_row_hi & ~is_col_hi
r_sub = rows[m]
c_sub = cols[m]
v_sub = data[m]
p_r = high_p[r_sub]
a_r = high_a[r_sub]
red_c_b = red_of_global[c_sub]
# Expand the ``d`` low-partner contributions per entry in one batch.
b_arange = np.arange(d)
low_lookup = low[p_r[:, None], b_arange[None, :]] # (n, d)
red_r_b = red_of_global[low_lookup] # (n, d); -1 when partner is fixed
R_row = R[a_r[:, None], b_arange[None, :]] # (n, d) real
coef_b = v_sub[:, None] * R_row # (n, d) real
valid_b = red_r_b >= 0 # red_c_b guaranteed >= 0
cols_b_full = np.broadcast_to(red_c_b[:, None], red_r_b.shape)
neg_rows = red_r_b[valid_b]
neg_cols = cols_b_full[valid_b]
neg_vals = coef_b[valid_b]
# --- Case C: (non-face row, high col) — phase exponent +1 ------------
m = ~is_row_hi & is_col_hi
r_sub = rows[m]
c_sub = cols[m]
v_sub = data[m]
red_r_c = red_of_global[r_sub]
p_c = high_p[c_sub]
a_c = high_a[c_sub]
bp_arange = np.arange(d)
low_lookup = low[p_c[:, None], bp_arange[None, :]] # (n, d)
red_c_c = red_of_global[low_lookup]
R_col = R[a_c[:, None], bp_arange[None, :]]
coef_c = v_sub[:, None] * R_col
valid_c = red_c_c >= 0
rows_c_full = np.broadcast_to(red_r_c[:, None], red_c_c.shape)
pos_rows = rows_c_full[valid_c]
pos_cols = red_c_c[valid_c]
pos_vals = coef_c[valid_c]
# --- Case D: (high row, high col) — phase exponent 0 -----------------
m = is_row_hi & is_col_hi
r_sub = rows[m]
c_sub = cols[m]
v_sub = data[m]
p_r = high_p[r_sub]
a_r = high_a[r_sub]
p_c = high_p[c_sub]
a_c = high_a[c_sub]
# Full (n, d, d) expansion. R-weights combine as R[a_r, b] · R[a_c, b'].
low_r = low[p_r[:, None, None], b_arange[None, :, None]] # (n, d, 1)
low_c = low[p_c[:, None, None], bp_arange[None, None, :]] # (n, 1, d)
low_r = np.broadcast_to(low_r, (r_sub.size, d, d))
low_c = np.broadcast_to(low_c, (r_sub.size, d, d))
red_rr = red_of_global[low_r]
red_cc = red_of_global[low_c]
Rr = R[a_r[:, None, None], b_arange[None, :, None]]
Rc = R[a_c[:, None, None], bp_arange[None, None, :]]
coef_hh = v_sub[:, None, None] * Rr * Rc # (n, d, d)
valid_hh = (red_rr >= 0) & (red_cc >= 0)
zero_rows.append(red_rr[valid_hh])
zero_cols.append(red_cc[valid_hh])
zero_vals.append(coef_hh[valid_hh])
shape = (n_keep, n_keep)
def _to_csr(rs: np.ndarray, cs: np.ndarray, vs: np.ndarray) -> sp.csr_array:
if rs.size == 0:
return sp.csr_array(shape, dtype=np.float64)
return sp.coo_array(
(
vs.astype(np.float64, copy=False),
(rs.astype(np.int64, copy=False), cs.astype(np.int64, copy=False)),
),
shape=shape,
).tocsr()
A_zero = _to_csr(
np.concatenate(zero_rows), np.concatenate(zero_cols), np.concatenate(zero_vals)
)
A_pos = _to_csr(pos_rows, pos_cols, pos_vals)
A_neg = _to_csr(neg_rows, neg_cols, neg_vals)
return _CyclicReductionPlan(A_zero=A_zero, A_pos=A_pos, A_neg=A_neg)