Source code for femorph_solver.mapdl_api.io.rst_schema

"""Typed record schemas for MAPDL RST / FULL / EMAT files.

The key lists mirror those in ``fdresu.inc`` (the Fortran include file
that defines the layout of result-file records). They are reproduced
verbatim from pymapdl-reader's ``_rst_keys.py``.

``parse_header`` maps a flat int32 table onto a named dict, handling two
conventions that MAPDL uses inside these tables:

1. **Array fields** — keys that appear multiple times (e.g. ``DOFS``,
   ``title``) accumulate into a list.
2. **Split 64-bit pointers** — pairs named ``ptrXXXl`` / ``ptrXXXh``
   are reassembled into a single 64-bit ``ptrXXX`` value.
"""

from __future__ import annotations

import struct
from collections import Counter
from pathlib import Path

import numpy as np

from femorph_solver.mapdl_api.io.binary import (
    _open_binary,
    decompress_bsparse,
    iter_records,
    read_record,
)

# ---------------------------------------------------------------------------
# Schemas — copied from pymapdl-reader/ansys/mapdl/reader/_rst_keys.py
# ---------------------------------------------------------------------------

result_header_keys: list[str] = [
    "fun12",
    "maxn",
    "nnod",
    "resmax",
    "numdof",
    "maxe",
    "nelm",
    "kan",
    "nsets",
    "ptrend",
    "ptrDSIl",
    "ptrTIMl",
    "ptrLSPl",
    "ptrELMl",
    "ptrNODl",
    "ptrGEOl",
    "ptrCYCl",
    "CMSflg",
    "csEls",
    "units",
    "nSector",
    "csCord",
    "ptrEnd8",
    "ptrEnd8",
    "fsiflag",
    "pmeth",
    "noffst",
    "eoffst",
    "nTrans",
    "ptrTRANl",
    "PrecKey",
    "csNds",
    "cpxrst",
    "extopt",
    "nlgeom",
    "AvailData",
    "mmass",
    "kPerturb",
    "XfemKey",
    "rstsprs",
    "ptrDSIh",
    "ptrTIMh",
    "ptrLSPh",
    "ptrCYCh",
    "ptrELMh",
    "ptrNODh",
    "ptrGEOh",
    "ptrTRANh",
    "Glbnnod",
    "ptrGNODl",
    "ptrGNODh",
    "qrDmpKy",
    "MSUPkey",
    "PSDkey",
    "cycMSUPkey",
    "XfemCrkPropTech",
]

solution_data_header_keys: list[str] = (
    [
        "pv3num",
        "nelm",
        "nnod",
        "mask",
        "itime",
        "iter",
        "ncumit",
        "nrf",
        "cs_LSC",
        "nmast",
        "ptrNSL",
        "ptrESL",
        "ptrRF",
        "ptrMST",
        "ptrBC",
        "rxtrap",
        "mode",
        "isym",
        "kcmplx",
        "numdof",
    ]
    + ["DOFS"] * 30
    + ["title"] * 20
    + ["stitle"] * 20
    + [
        "dbmtim",
        "dbmdat",
        "dbfncl",
        "soltim",
        "soldat",
        "ptrOND",
        "ptrOEL",
        "nfldof",
        "ptrEXA",
        "ptrEXT",
        "ptrEXAl",
        "ptrEXAh",
        "ptrEXTl",
        "ptrEXTh",
        "ptrNSLl",
        "ptrNSLh",
        "ptrRFl",
        "ptrRFh",
        "ptrMSTl",
        "ptrMSTh",
        "ptrBCl",
        "ptrBCh",
        "ptrTRFl",
        "ptrTRFh",
        "ptrONDl",
        "ptrONDh",
        "ptrOELl",
        "ptrOELh",
        "ptrESLl",
        "ptrESLh",
        "ptrOSLl",
        "ptrOSLh",
        "sizeDEAD",
        "ptrDEADl",
        "ptrDEADh",
        "PrinKey",
        "numvdof",
        "numadof",
        "0",
        "0",
        "ptrVSLl",
        "ptrVSLh",
        "ptrASLl",
        "ptrASLh",
        "0",
        "0",
        "0",
        "0",
        "numRotCmp",
        "0",
        "ptrRCMl",
        "ptrRCMh",
        "nNodStr",
        "0",
        "ptrNDSTRl",
        "ptrNDSTRh",
        "AvailData",
        "geomID",
        "ptrGEOl",
        "ptrGEOh",
    ]
)

geometry_header_keys: list[str] = [
    "__unused",
    "maxety",
    "maxrl",
    "nnod",
    "nelm",
    "maxcsy",
    "ptrETY",
    "ptrREL",
    "ptrLOC",
    "ptrCSY",
    "ptrEID",
    "maxsec",
    "secsiz",
    "nummat",
    "matsiz",
    "ptrMAS",
    "csysiz",
    "elmsiz",
    "etysiz",
    "rlsiz",
    "ptrETYl",
    "ptrETYh",
    "ptrRELl",
    "ptrRELh",
    "ptrCSYl",
    "ptrCSYh",
    "ptrLOCl",
    "ptrLOCh",
    "ptrEIDl",
    "ptrEIDh",
    "ptrMASl",
    "ptrMASh",
    "ptrSECl",
    "ptrSECh",
    "ptrMATl",
    "ptrMATh",
    "ptrCNTl",
    "ptrCNTh",
    "ptrNODl",
    "ptrNODh",
    "ptrELMl",
    "ptrELMh",
    "Glblenb",
    "ptrGNODl",
    "ptrGNODh",
    "maxn",
    "NodesUpd",
    "lenbac",
    "maxcomp",
    "compsiz",
    "ptrCOMPl",
    "ptrCOMPh",
    "nMatProp",
    "nStage",
    "maxMSsz",
    "ptrMSl",
    "ptrMSh",
    "nCycP",
    "ptrCycPl",
    "ptrCycPh",
    "numety",
    "numrl",
    "numcsy",
    "numsec",
    "mapFlag",
    "cysCSID",
]

# DOF reference numbers used in boundary-condition index tables.
DOF_REF: dict[int, str] = {
    1: "UX",
    2: "UY",
    3: "UZ",
    4: "ROTX",
    5: "ROTY",
    6: "ROTZ",
    7: "AX",
    8: "AY",
    9: "AZ",
    10: "VX",
    11: "VY",
    12: "VZ",
    16: "WARP",
    17: "CONC",
    18: "HDSP",
    19: "PRES",
    20: "TEMP",
    21: "VOLT",
    22: "MAG",
    23: "ENKE",
    24: "ENDS",
    25: "EMF",
    26: "CURR",
}


def _two_ints_to_long(lo: int, hi: int) -> int:
    """Combine low/high int32 halves into a signed int64 MAPDL pointer."""
    packed = struct.pack(">I", hi & 0xFFFFFFFF) + struct.pack(">I", lo & 0xFFFFFFFF)
    return struct.unpack(">q", packed)[0]


[docs] def parse_header(table: np.ndarray, keys: list[str]) -> dict: """Map a flat int32 table onto a named header dict. Array-valued keys (those that appear more than once in ``keys``) are gathered into a list. Pairs ``ptrXXXl`` / ``ptrXXXh`` are reassembled into a single 64-bit ``ptrXXX`` entry. """ header: dict = {} counts = Counter(keys) counts.pop("0", None) counts.pop("__unused", None) for key, count in counts.items(): if count > 1: header[key] = [] max_i = len(table) - 1 for i, key in enumerate(keys): if key in ("0", "__unused"): continue if i > max_i: header[key] = 0 continue value = int(table[i]) if counts[key] > 1: # multi-valued field: append non-zero entries if value: header[key].append(value) else: header[key] = value # Reassemble split 64-bit pointers. for key in list(header): if key.endswith("h") and f"{key[:-1]}l" in header: base = key[:-1] header[base] = _two_ints_to_long(header[f"{base}l"], header[f"{base}h"]) return header
[docs] def read_rst_result_header(path: str | Path) -> dict: """Read record #1 of an RST file and parse it as a result header.""" fp = _open_binary(path) try: # Record 0 is the standard header; record 1 is the result header. read_record(fp, dtype="<i4") arr, _ = read_record(fp, dtype="<i4") finally: fp.close() header = parse_header(arr, result_header_keys) header["_file_type"] = "RST" return header
[docs] def read_rst_geometry_header(path: str | Path) -> dict: """Read the geometry header from an RST file using ``ptrGEO``. The result header's ``ptrGEO`` is a byte-offset expressed in 4-byte words from the start of the file. Seek there and read one record. """ result = read_rst_result_header(path) word_off = result["ptrGEO"] if word_off <= 0: raise ValueError(f"{path} has no geometry header (ptrGEO = {word_off})") fp = _open_binary(path) try: arr, _ = read_record(fp, offset=int(word_off) * 4, dtype="<i4") finally: fp.close() return parse_header(arr, geometry_header_keys)
[docs] def read_solution_data_header(path: str | Path, set_index: int = 0) -> tuple[dict, int]: """Parse the solution-data header for result set ``set_index``. Returns ``(header_dict, solution_word_offset)``. The word offset is the absolute file position of the solution header's 8-byte record frame — useful because ``ptrNSL`` / ``ptrESL`` / ``ptrBC`` inside the header are **relative** to that position. """ result = read_rst_result_header(path) n_sets = result["nsets"] if not (0 <= set_index < n_sets): raise IndexError(f"set_index {set_index} out of range (nsets={n_sets})") fp = _open_binary(path) try: dsi_arr, _ = read_record(fp, offset=result["ptrDSI"] * 4, dtype="<i4") # Each set occupies 2 ints in the DSI table: (lo, hi). sol_word = _two_ints_to_long( int(dsi_arr[2 * set_index]), int(dsi_arr[2 * set_index + 1]), ) sol_arr, _ = read_record(fp, offset=sol_word * 4, dtype="<i4") finally: fp.close() header = parse_header(sol_arr, solution_data_header_keys) return header, sol_word
[docs] def read_nodal_displacement(path: str | Path, set_index: int = 0) -> np.ndarray: """Return the decompressed nodal displacement vector for result set 0. Output shape is ``(nnod, numdof)``. Entries for DOFs with no stored value (bits unset in the bsparse bitmask) come back as zero. """ result = read_rst_result_header(path) sdh, sol_word = read_solution_data_header(path, set_index=set_index) nnod = result["nnod"] numdof = result["numdof"] nsl_word = sol_word + int(sdh["ptrNSL"]) fp = _open_binary(path) try: nsl_arr, info = read_record(fp, offset=nsl_word * 4, dtype="<i4") finally: fp.close() if info.flags & 0x08: # bsparse (bit-sparse) — stored-DOF bitmask + packed doubles. flat = decompress_bsparse(nsl_arr, dtype="<f8") else: # dense float64 payload flat = nsl_arr.view("<f8").copy() expected = nnod * numdof if flat.size < expected: raise ValueError( f"NSL record held {flat.size} DOFs, expected {expected} (nnod={nnod}, numdof={numdof})" ) return flat[:expected].reshape(nnod, numdof)
[docs] def find_record(path: str | Path, index: int, dtype: np.dtype | str = "<i4") -> np.ndarray: """Return the payload of record ``index`` (0-based) as an ndarray.""" fp = _open_binary(path) try: for i, (_, _, arr) in enumerate(iter_records(fp, dtype=dtype)): if i == index: return arr finally: fp.close() raise IndexError(f"{path} has fewer than {index + 1} records")