Source code for femorph_solver.mapdl_api.io.binary

"""Low-level reader for MAPDL binary files (RST / FULL / EMAT / …).

MAPDL binary files use Fortran-style sequential records. Each record is
laid out as::

    [ size:int32 ] [ flags:int32 ] [ payload : size * 4 bytes ] [ pad:int32 ]

* ``size`` counts 4-byte *words* of payload (so a double array of length
  *n* gives ``size = 2 * n``).
* ``flags`` only uses its least-significant byte (byte 7 from the start
  of the record header). femorph-solver exposes the raw byte but does **not** use
  it to infer payload dtype or compression state. pymapdl-reader's
  Python and C++ backends disagree on the bit positions and neither
  matches files produced by recent MAPDL — e.g. bit 3 (declared
  "sparse" by both) is routinely set on perfectly dense records emitted
  under ``/FCOMP,RST,0``. Callers must pass the dtype appropriate to
  the record they expect (int32 for index tables, float64 for
  coordinates and results, etc.).
* the trailing ``pad`` int32 is a Fortran record-length repeater.

femorph-solver's decoder supports uncompressed, dense records in the four standard
dtypes (int32, int16, float64, float32). Compression (bsparse, wsparse,
zlib) is not attempted — run MAPDL with ``/FCOMP,<file>,0`` to disable.
"""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import BinaryIO

import numpy as np

# Canonical MAPDL file-format codes (word 2 of the standard header).
FILE_FORMATS: dict[int, str] = {
    2: "EMAT",
    3: "ESAV",
    4: "FULL",
    8: "SUB",
    9: "MODE",
    10: "RDSP",
    12: "RST",
    16: "DB",
    45: "CMS",
}


[docs] @dataclass class RecordInfo: """Metadata for a single MAPDL record.""" size: int # payload size in 4-byte words dtype: np.dtype flags: int # raw header flag byte (byte 7 of the 8-byte record header) offset: int # byte offset in the file where the record starts
def _open_binary(path: str | Path, endian: str = "<") -> BinaryIO: """Open a MAPDL binary file, validating its magic identifier. Every MAPDL binary file starts with the int32 value ``100`` as its first data word (part of the standard-header record). Endianness is auto-detected; callers may pass ``"<"`` (little) or ``">"`` (big). """ fp = open(path, "rb") # noqa: SIM115 — returned to caller; closed there magic = np.fromfile(fp, dtype=endian + "i4", count=1) if magic.size == 0 or magic[0] != 100: fp.seek(0) magic = np.fromfile(fp, dtype=">i4", count=1) if magic.size and magic[0] == 100: # caller asked for <, file is >; they can reopen with correct endian fp.close() raise ValueError(f"{path} is big-endian; re-open with endian='>'") fp.close() raise ValueError(f"{path} is not a MAPDL binary file (bad magic)") fp.seek(0) return fp
[docs] def read_record( fp: BinaryIO, offset: int | None = None, dtype: np.dtype | str = "<i4", ) -> tuple[np.ndarray, RecordInfo]: """Read one MAPDL record starting at ``offset`` (bytes) or current position. ``dtype`` selects how to cast the payload. MAPDL doesn't encode dtype reliably in the flag byte, so callers must pass the dtype appropriate to the record they expect (int32 for index tables, float64 for coordinates and results, etc.). The default is little-endian int32. Returns ``(array, info)`` where ``array`` is the dense payload and ``info`` carries the header metadata. Advances the file pointer past the trailing Fortran pad word. """ if offset is not None: fp.seek(offset) rec_offset = fp.tell() header = fp.read(8) if len(header) < 8: raise EOFError("truncated MAPDL record header") size = int(np.frombuffer(header[:4], dtype="<i4")[0]) flags = header[7] dt = np.dtype(dtype) n_bytes = size * 4 payload = fp.read(n_bytes) if len(payload) < n_bytes: raise EOFError("truncated MAPDL record payload") arr = np.frombuffer(payload, dtype=dt).copy() # Skip trailing Fortran pad word. fp.seek(4, 1) return arr, RecordInfo(size=size, dtype=dt, flags=flags, offset=rec_offset)
[docs] def read_record_at( path: str | Path, word_offset: int, dtype: np.dtype | str = "<i4", ) -> tuple[np.ndarray, RecordInfo]: """Convenience: open, seek to a 4-byte-word offset, read one record.""" fp = _open_binary(path) try: return read_record(fp, offset=int(word_offset) * 4, dtype=dtype) finally: fp.close()
[docs] def decompress_bsparse(payload: np.ndarray, dtype: np.dtype | str = "<f8") -> np.ndarray: """Decompress a bit-sparse (bsparse) record payload. MAPDL's bsparse encoding for a vector of length ``N``:: word[0] = N # decompressed length word[1] = bitcode # 32-bit mask; bit i set -> position i nonzero payload = K packed values # K = number of set bits in bitcode Output is a dense ndarray of length ``N`` with zeros in unset slots. Only ``N <= 32`` is supported — MAPDL uses a different encoding for longer vectors. """ if payload.size < 2: raise ValueError("bsparse payload too short to hold size+bitcode") n = int(payload[0]) if n > 32: raise NotImplementedError( f"bsparse with N={n} > 32 needs multi-word bitcode — not yet supported" ) bitcode = int(payload[1]) & 0xFFFFFFFF data_words = payload[2:].view(np.uint8).tobytes() dt = np.dtype(dtype) packed = np.frombuffer( data_words, dtype=dt, count=min(bin(bitcode).count("1"), len(data_words) // dt.itemsize) ) out = np.zeros(n, dtype=dt) k = 0 for i in range(n): if (bitcode >> i) & 1: out[i] = packed[k] k += 1 return out
[docs] def decompress_wsparse(payload: np.ndarray, dtype: np.dtype | str = "<f8") -> np.ndarray: """Decompress a windowed-sparse (wsparse) record payload. MAPDL's wsparse encoding for a vector of length ``N``:: word[0] = N # decompressed length word[1] = NWin # number of windows for each window: iLoc = int32 if iLoc > 0: # isolated single value at position iLoc one value (dtype-sized) else: # window starts at position -iLoc iLen = int32 if iLen > 0: # iLen explicit values iLen values else: # constant run of (-iLen) copies one value All missing positions are zero. """ if payload.size < 2: raise ValueError("wsparse payload too short to hold size+NWin") n = int(payload[0]) n_windows = int(payload[1]) dt = np.dtype(dtype) ishift = dt.itemsize // 4 # number of int32 words per dtype value out = np.zeros(n, dtype=dt) raw = payload[2:] # remaining int32 words pos = 0 for _ in range(n_windows): iloc = int(raw[pos]) pos += 1 if iloc > 0: val = raw[pos : pos + ishift].view(dt)[0] out[iloc] = val pos += ishift else: start = -iloc ilen = int(raw[pos]) pos += 1 if ilen > 0: values = raw[pos : pos + ishift * ilen].view(dt)[:ilen].copy() out[start : start + ilen] = values pos += ishift * ilen else: run_len = -ilen val = raw[pos : pos + ishift].view(dt)[0] out[start : start + run_len] = val pos += ishift return out
[docs] def iter_records(fp: BinaryIO, start: int = 0, dtype: np.dtype | str = "<i4"): """Yield ``(offset_bytes, RecordInfo, ndarray)`` tuples sequentially. Stops at EOF or when the size field is non-positive (MAPDL marks the end of the sequential stream with a -1 sentinel or zero padding). """ fp.seek(start) while True: pos = fp.tell() peek = fp.read(8) if len(peek) < 8: return size = int(np.frombuffer(peek[:4], dtype="<i4")[0]) if size <= 0: return fp.seek(pos) arr, info = read_record(fp, dtype=dtype) yield pos, info, arr
[docs] def read_standard_header(path: str | Path) -> dict: """Parse the MAPDL standard file header. Every MAPDL binary file begins with a 100-word standard header whose fields live at fixed int32 word offsets. Numeric fields occupy one int32 word followed by a single padding word; string fields are packed 4 chars per word with the bytes of each word reversed (MAPDL stores them with the Fortran byte order flipped). The returned mapping covers the canonical fields: ``file_number``, ``file_format``, ``time``, ``date``, ``units``, ``verstring``, ``mainver``, ``subver``, ``machine``, ``jobname``, ``product``, ``username``, ``machine_identifier``, ``system_record_size``, ``jobname2``, ``title``, ``subtitle``, ``split_point``. """ unit_types = { 0: "User Defined", 1: "SI", 2: "CSG", 3: "U.S. Customary units (feet)", 4: "U.S. Customary units (inches)", 5: "MKS", 6: "MPA", 7: "uMKS", } fp = _open_binary(path) try: words = np.fromfile(fp, dtype="<i4", count=100) finally: fp.close() if words.size < 100: raise EOFError(f"{path} truncated before 100-word standard header") int_time = str(int(words[4])) time = ":".join([int_time[0:2], int_time[2:4], int_time[4:]]) int_date = str(int(words[6])) if int_date == "-1": date = "" else: date = "/".join([int_date[0:4], int_date[4:6], int_date[6:]]) def _string(word_offset: int, n_words: int) -> str: raw = b"".join(words[word_offset + i].tobytes()[::-1] for i in range(n_words)) try: return raw.decode("utf-8").strip() except UnicodeDecodeError: return raw.decode("latin-1", errors="replace").strip() verstring = _string(11, 1) header: dict = { "file_number": int(words[0]), "file_format": int(words[2]), "time": time, "date": date, "units": unit_types.get(int(words[8]), "Unknown"), "verstring": verstring, "mainver": int(verstring[:2]) if len(verstring) >= 2 and verstring[:2].isdigit() else 0, "subver": int(verstring[-1]) if verstring and verstring[-1].isdigit() else 0, "machine": _string(13, 3), "jobname": _string(16, 2), "product": _string(18, 2), "special": _string(20, 1), "username": _string(21, 3), "machine_identifier": _string(24, 3), "system_record_size": int(words[27]), "jobname2": _string(32, 8), "title": _string(42, 20), "subtitle": _string(62, 20), "split_point": int(words[96]), } return header
def _read_string(fp: BinaryIO, n: int) -> str: """Read ``n`` 4-char strings, reversing each (MAPDL stores byte-swapped).""" raw = b"" for _ in range(n): raw += fp.read(4)[::-1] try: return raw.decode("utf-8") except UnicodeDecodeError: return raw.decode("latin-1", errors="replace") # --------------------------------------------------------------------------- # Writers # --------------------------------------------------------------------------- # Default flag bytes to stamp into records when writing. Empirically, MAPDL # writes int32 records with 0x80 (bit 7 set) and float64 records with 0x00. # These bits are not used for decoding but matching them keeps fidelity # high for round-trip comparisons. _WRITE_FLAGS_INT32 = 0x80 _WRITE_FLAGS_FLOAT64 = 0x00 def _encode_string(s: str, n_words: int) -> bytes: """Encode ``s`` into ``n_words`` 4-char slots with each word byte-reversed. MAPDL stores character data in Fortran ``CHARACTER*4`` slots that end up byte-swapped on disk relative to normal C string ordering. This helper pads ``s`` with spaces to exactly ``4 * n_words`` chars and reverses every 4-byte chunk to match. """ total_bytes = 4 * n_words padded = s.encode("utf-8").ljust(total_bytes, b" ")[:total_bytes] return b"".join(padded[i : i + 4][::-1] for i in range(0, total_bytes, 4))
[docs] def write_record( fp: BinaryIO, payload: np.ndarray, flags: int | None = None, ) -> int: """Append one MAPDL record to ``fp`` and return its byte offset. ``payload`` is written verbatim as bytes. ``size`` is derived from ``len(payload.tobytes()) // 4`` so any numpy dtype whose itemsize is a multiple of 4 works. ``flags`` defaults to 0x80 for int32 payloads and 0x00 for float64; any other dtype defaults to 0x00. """ raw = np.ascontiguousarray(payload).tobytes() if len(raw) % 4 != 0: raise ValueError(f"record payload must be a multiple of 4 bytes, got {len(raw)}") size_words = len(raw) // 4 if flags is None: if payload.dtype == np.dtype("<i4"): flags = _WRITE_FLAGS_INT32 else: flags = _WRITE_FLAGS_FLOAT64 flags &= 0xFF offset = fp.tell() # size:int32 + flag:int32 (high byte = flags, rest zero) header = np.array([size_words], dtype="<i4").tobytes() header += bytes([0, 0, 0, flags]) fp.write(header) fp.write(raw) # trailing Fortran pad repeater — MAPDL stores zero here (the reader # doesn't rely on its value). fp.write(b"\x00\x00\x00\x00") return offset
[docs] def write_standard_header( fp: BinaryIO, *, file_format: int, jobname: str = "file", time: str = "00:00:00", date: str = "", units: int = 0, verstring: str = "22.2", machine: str = "LINUX x64", product: str = "FULL", username: str = "", title: str = "", ) -> None: """Write the fixed-layout standard-header record to ``fp``. Mirrors :func:`read_standard_header`. The record holds a 100-word payload; the payload indices below are *payload-relative*. Read side sees these words at file-absolute positions ``[i+2]`` because the record framer prepends the 2-word ``(size, flags)`` header. """ payload = np.zeros(100, dtype="<i4") # payload[i] = what read_standard_header sees at words[i + 2]. payload[0] = int(file_format) # words[2] hhmmss = time.replace(":", "") payload[2] = int(hhmmss) if hhmmss.isdigit() else 0 # words[4] yyyymmdd = date.replace("/", "") payload[4] = int(yyyymmdd) if yyyymmdd.isdigit() else -1 # words[6] payload[6] = int(units) # words[8] buf = bytearray(payload.tobytes()) def _stamp(s: str, payload_word: int, n_words: int) -> None: start = payload_word * 4 buf[start : start + 4 * n_words] = _encode_string(s, n_words) _stamp(verstring, 9, 1) # words[11] _stamp(machine, 11, 3) # words[13..15] _stamp(jobname, 14, 2) # words[16..17] _stamp(product, 16, 2) # words[18..19] _stamp("", 18, 1) # words[20] (special) _stamp(username, 19, 3) # words[21..23] _stamp(jobname, 30, 8) # words[32..39] jobname2 _stamp(title, 40, 20) # words[42..61] arr = np.frombuffer(bytes(buf), dtype="<i4").copy() write_record(fp, arr, flags=_WRITE_FLAGS_INT32)
[docs] def open_for_write(path: str | Path) -> BinaryIO: """Open ``path`` for writing as a new MAPDL binary file.""" return open(path, "wb")