"""Low-level reader for MAPDL binary files (RST / FULL / EMAT / …).
MAPDL binary files use Fortran-style sequential records. Each record is
laid out as::
[ size:int32 ] [ flags:int32 ] [ payload : size * 4 bytes ] [ pad:int32 ]
* ``size`` counts 4-byte *words* of payload (so a double array of length
*n* gives ``size = 2 * n``).
* ``flags`` only uses its least-significant byte (byte 7 from the start
of the record header). femorph-solver exposes the raw byte but does **not** use
it to infer payload dtype or compression state. pymapdl-reader's
Python and C++ backends disagree on the bit positions and neither
matches files produced by recent MAPDL — e.g. bit 3 (declared
"sparse" by both) is routinely set on perfectly dense records emitted
under ``/FCOMP,RST,0``. Callers must pass the dtype appropriate to
the record they expect (int32 for index tables, float64 for
coordinates and results, etc.).
* the trailing ``pad`` int32 is a Fortran record-length repeater.
femorph-solver's decoder supports uncompressed, dense records in the four standard
dtypes (int32, int16, float64, float32). Compression (bsparse, wsparse,
zlib) is not attempted — run MAPDL with ``/FCOMP,<file>,0`` to disable.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import BinaryIO
import numpy as np
# Canonical MAPDL file-format codes (word 2 of the standard header).
FILE_FORMATS: dict[int, str] = {
2: "EMAT",
3: "ESAV",
4: "FULL",
8: "SUB",
9: "MODE",
10: "RDSP",
12: "RST",
16: "DB",
45: "CMS",
}
[docs]
@dataclass
class RecordInfo:
"""Metadata for a single MAPDL record."""
size: int # payload size in 4-byte words
dtype: np.dtype
flags: int # raw header flag byte (byte 7 of the 8-byte record header)
offset: int # byte offset in the file where the record starts
def _open_binary(path: str | Path, endian: str = "<") -> BinaryIO:
"""Open a MAPDL binary file, validating its magic identifier.
Every MAPDL binary file starts with the int32 value ``100`` as its
first data word (part of the standard-header record). Endianness is
auto-detected; callers may pass ``"<"`` (little) or ``">"`` (big).
"""
fp = open(path, "rb") # noqa: SIM115 — returned to caller; closed there
magic = np.fromfile(fp, dtype=endian + "i4", count=1)
if magic.size == 0 or magic[0] != 100:
fp.seek(0)
magic = np.fromfile(fp, dtype=">i4", count=1)
if magic.size and magic[0] == 100:
# caller asked for <, file is >; they can reopen with correct endian
fp.close()
raise ValueError(f"{path} is big-endian; re-open with endian='>'")
fp.close()
raise ValueError(f"{path} is not a MAPDL binary file (bad magic)")
fp.seek(0)
return fp
[docs]
def read_record(
fp: BinaryIO,
offset: int | None = None,
dtype: np.dtype | str = "<i4",
) -> tuple[np.ndarray, RecordInfo]:
"""Read one MAPDL record starting at ``offset`` (bytes) or current position.
``dtype`` selects how to cast the payload. MAPDL doesn't encode dtype
reliably in the flag byte, so callers must pass the dtype appropriate
to the record they expect (int32 for index tables, float64 for
coordinates and results, etc.). The default is little-endian int32.
Returns ``(array, info)`` where ``array`` is the dense payload and
``info`` carries the header metadata. Advances the file pointer past
the trailing Fortran pad word.
"""
if offset is not None:
fp.seek(offset)
rec_offset = fp.tell()
header = fp.read(8)
if len(header) < 8:
raise EOFError("truncated MAPDL record header")
size = int(np.frombuffer(header[:4], dtype="<i4")[0])
flags = header[7]
dt = np.dtype(dtype)
n_bytes = size * 4
payload = fp.read(n_bytes)
if len(payload) < n_bytes:
raise EOFError("truncated MAPDL record payload")
arr = np.frombuffer(payload, dtype=dt).copy()
# Skip trailing Fortran pad word.
fp.seek(4, 1)
return arr, RecordInfo(size=size, dtype=dt, flags=flags, offset=rec_offset)
[docs]
def read_record_at(
path: str | Path,
word_offset: int,
dtype: np.dtype | str = "<i4",
) -> tuple[np.ndarray, RecordInfo]:
"""Convenience: open, seek to a 4-byte-word offset, read one record."""
fp = _open_binary(path)
try:
return read_record(fp, offset=int(word_offset) * 4, dtype=dtype)
finally:
fp.close()
[docs]
def decompress_bsparse(payload: np.ndarray, dtype: np.dtype | str = "<f8") -> np.ndarray:
"""Decompress a bit-sparse (bsparse) record payload.
MAPDL's bsparse encoding for a vector of length ``N``::
word[0] = N # decompressed length
word[1] = bitcode # 32-bit mask; bit i set -> position i nonzero
payload = K packed values # K = number of set bits in bitcode
Output is a dense ndarray of length ``N`` with zeros in unset slots.
Only ``N <= 32`` is supported — MAPDL uses a different encoding for
longer vectors.
"""
if payload.size < 2:
raise ValueError("bsparse payload too short to hold size+bitcode")
n = int(payload[0])
if n > 32:
raise NotImplementedError(
f"bsparse with N={n} > 32 needs multi-word bitcode — not yet supported"
)
bitcode = int(payload[1]) & 0xFFFFFFFF
data_words = payload[2:].view(np.uint8).tobytes()
dt = np.dtype(dtype)
packed = np.frombuffer(
data_words, dtype=dt, count=min(bin(bitcode).count("1"), len(data_words) // dt.itemsize)
)
out = np.zeros(n, dtype=dt)
k = 0
for i in range(n):
if (bitcode >> i) & 1:
out[i] = packed[k]
k += 1
return out
[docs]
def decompress_wsparse(payload: np.ndarray, dtype: np.dtype | str = "<f8") -> np.ndarray:
"""Decompress a windowed-sparse (wsparse) record payload.
MAPDL's wsparse encoding for a vector of length ``N``::
word[0] = N # decompressed length
word[1] = NWin # number of windows
for each window:
iLoc = int32
if iLoc > 0: # isolated single value at position iLoc
one value (dtype-sized)
else: # window starts at position -iLoc
iLen = int32
if iLen > 0: # iLen explicit values
iLen values
else: # constant run of (-iLen) copies
one value
All missing positions are zero.
"""
if payload.size < 2:
raise ValueError("wsparse payload too short to hold size+NWin")
n = int(payload[0])
n_windows = int(payload[1])
dt = np.dtype(dtype)
ishift = dt.itemsize // 4 # number of int32 words per dtype value
out = np.zeros(n, dtype=dt)
raw = payload[2:] # remaining int32 words
pos = 0
for _ in range(n_windows):
iloc = int(raw[pos])
pos += 1
if iloc > 0:
val = raw[pos : pos + ishift].view(dt)[0]
out[iloc] = val
pos += ishift
else:
start = -iloc
ilen = int(raw[pos])
pos += 1
if ilen > 0:
values = raw[pos : pos + ishift * ilen].view(dt)[:ilen].copy()
out[start : start + ilen] = values
pos += ishift * ilen
else:
run_len = -ilen
val = raw[pos : pos + ishift].view(dt)[0]
out[start : start + run_len] = val
pos += ishift
return out
[docs]
def iter_records(fp: BinaryIO, start: int = 0, dtype: np.dtype | str = "<i4"):
"""Yield ``(offset_bytes, RecordInfo, ndarray)`` tuples sequentially.
Stops at EOF or when the size field is non-positive (MAPDL marks the
end of the sequential stream with a -1 sentinel or zero padding).
"""
fp.seek(start)
while True:
pos = fp.tell()
peek = fp.read(8)
if len(peek) < 8:
return
size = int(np.frombuffer(peek[:4], dtype="<i4")[0])
if size <= 0:
return
fp.seek(pos)
arr, info = read_record(fp, dtype=dtype)
yield pos, info, arr
[docs]
def read_standard_header(path: str | Path) -> dict:
"""Parse the MAPDL standard file header.
Every MAPDL binary file begins with a 100-word standard header whose
fields live at fixed int32 word offsets. Numeric fields occupy one
int32 word followed by a single padding word; string fields are
packed 4 chars per word with the bytes of each word reversed (MAPDL
stores them with the Fortran byte order flipped).
The returned mapping covers the canonical fields: ``file_number``,
``file_format``, ``time``, ``date``, ``units``, ``verstring``,
``mainver``, ``subver``, ``machine``, ``jobname``, ``product``,
``username``, ``machine_identifier``, ``system_record_size``,
``jobname2``, ``title``, ``subtitle``, ``split_point``.
"""
unit_types = {
0: "User Defined",
1: "SI",
2: "CSG",
3: "U.S. Customary units (feet)",
4: "U.S. Customary units (inches)",
5: "MKS",
6: "MPA",
7: "uMKS",
}
fp = _open_binary(path)
try:
words = np.fromfile(fp, dtype="<i4", count=100)
finally:
fp.close()
if words.size < 100:
raise EOFError(f"{path} truncated before 100-word standard header")
int_time = str(int(words[4]))
time = ":".join([int_time[0:2], int_time[2:4], int_time[4:]])
int_date = str(int(words[6]))
if int_date == "-1":
date = ""
else:
date = "/".join([int_date[0:4], int_date[4:6], int_date[6:]])
def _string(word_offset: int, n_words: int) -> str:
raw = b"".join(words[word_offset + i].tobytes()[::-1] for i in range(n_words))
try:
return raw.decode("utf-8").strip()
except UnicodeDecodeError:
return raw.decode("latin-1", errors="replace").strip()
verstring = _string(11, 1)
header: dict = {
"file_number": int(words[0]),
"file_format": int(words[2]),
"time": time,
"date": date,
"units": unit_types.get(int(words[8]), "Unknown"),
"verstring": verstring,
"mainver": int(verstring[:2]) if len(verstring) >= 2 and verstring[:2].isdigit() else 0,
"subver": int(verstring[-1]) if verstring and verstring[-1].isdigit() else 0,
"machine": _string(13, 3),
"jobname": _string(16, 2),
"product": _string(18, 2),
"special": _string(20, 1),
"username": _string(21, 3),
"machine_identifier": _string(24, 3),
"system_record_size": int(words[27]),
"jobname2": _string(32, 8),
"title": _string(42, 20),
"subtitle": _string(62, 20),
"split_point": int(words[96]),
}
return header
def _read_string(fp: BinaryIO, n: int) -> str:
"""Read ``n`` 4-char strings, reversing each (MAPDL stores byte-swapped)."""
raw = b""
for _ in range(n):
raw += fp.read(4)[::-1]
try:
return raw.decode("utf-8")
except UnicodeDecodeError:
return raw.decode("latin-1", errors="replace")
# ---------------------------------------------------------------------------
# Writers
# ---------------------------------------------------------------------------
# Default flag bytes to stamp into records when writing. Empirically, MAPDL
# writes int32 records with 0x80 (bit 7 set) and float64 records with 0x00.
# These bits are not used for decoding but matching them keeps fidelity
# high for round-trip comparisons.
_WRITE_FLAGS_INT32 = 0x80
_WRITE_FLAGS_FLOAT64 = 0x00
def _encode_string(s: str, n_words: int) -> bytes:
"""Encode ``s`` into ``n_words`` 4-char slots with each word byte-reversed.
MAPDL stores character data in Fortran ``CHARACTER*4`` slots that end
up byte-swapped on disk relative to normal C string ordering. This
helper pads ``s`` with spaces to exactly ``4 * n_words`` chars and
reverses every 4-byte chunk to match.
"""
total_bytes = 4 * n_words
padded = s.encode("utf-8").ljust(total_bytes, b" ")[:total_bytes]
return b"".join(padded[i : i + 4][::-1] for i in range(0, total_bytes, 4))
[docs]
def write_record(
fp: BinaryIO,
payload: np.ndarray,
flags: int | None = None,
) -> int:
"""Append one MAPDL record to ``fp`` and return its byte offset.
``payload`` is written verbatim as bytes. ``size`` is derived from
``len(payload.tobytes()) // 4`` so any numpy dtype whose itemsize is
a multiple of 4 works. ``flags`` defaults to 0x80 for int32 payloads
and 0x00 for float64; any other dtype defaults to 0x00.
"""
raw = np.ascontiguousarray(payload).tobytes()
if len(raw) % 4 != 0:
raise ValueError(f"record payload must be a multiple of 4 bytes, got {len(raw)}")
size_words = len(raw) // 4
if flags is None:
if payload.dtype == np.dtype("<i4"):
flags = _WRITE_FLAGS_INT32
else:
flags = _WRITE_FLAGS_FLOAT64
flags &= 0xFF
offset = fp.tell()
# size:int32 + flag:int32 (high byte = flags, rest zero)
header = np.array([size_words], dtype="<i4").tobytes()
header += bytes([0, 0, 0, flags])
fp.write(header)
fp.write(raw)
# trailing Fortran pad repeater — MAPDL stores zero here (the reader
# doesn't rely on its value).
fp.write(b"\x00\x00\x00\x00")
return offset
[docs]
def write_standard_header(
fp: BinaryIO,
*,
file_format: int,
jobname: str = "file",
time: str = "00:00:00",
date: str = "",
units: int = 0,
verstring: str = "22.2",
machine: str = "LINUX x64",
product: str = "FULL",
username: str = "",
title: str = "",
) -> None:
"""Write the fixed-layout standard-header record to ``fp``.
Mirrors :func:`read_standard_header`. The record holds a 100-word
payload; the payload indices below are *payload-relative*. Read side
sees these words at file-absolute positions ``[i+2]`` because the
record framer prepends the 2-word ``(size, flags)`` header.
"""
payload = np.zeros(100, dtype="<i4")
# payload[i] = what read_standard_header sees at words[i + 2].
payload[0] = int(file_format) # words[2]
hhmmss = time.replace(":", "")
payload[2] = int(hhmmss) if hhmmss.isdigit() else 0 # words[4]
yyyymmdd = date.replace("/", "")
payload[4] = int(yyyymmdd) if yyyymmdd.isdigit() else -1 # words[6]
payload[6] = int(units) # words[8]
buf = bytearray(payload.tobytes())
def _stamp(s: str, payload_word: int, n_words: int) -> None:
start = payload_word * 4
buf[start : start + 4 * n_words] = _encode_string(s, n_words)
_stamp(verstring, 9, 1) # words[11]
_stamp(machine, 11, 3) # words[13..15]
_stamp(jobname, 14, 2) # words[16..17]
_stamp(product, 16, 2) # words[18..19]
_stamp("", 18, 1) # words[20] (special)
_stamp(username, 19, 3) # words[21..23]
_stamp(jobname, 30, 8) # words[32..39] jobname2
_stamp(title, 40, 20) # words[42..61]
arr = np.frombuffer(bytes(buf), dtype="<i4").copy()
write_record(fp, arr, flags=_WRITE_FLAGS_INT32)
[docs]
def open_for_write(path: str | Path) -> BinaryIO:
"""Open ``path`` for writing as a new MAPDL binary file."""
return open(path, "wb")