Create training data for a 3D static reduced order model (ROM)#
This example shows how to run a parametric sweep on an MAPDL model and export the output displacement and stress data into the format required to build a static ROM with Ansys Twin Builder.
The general data structure for the ROM building is shown in the following figure.

Organization of files and directories for static ROM creation.#
Here are the specific files outputted by the example:
ansys_pymadl_Static_ROM
│
├───displacement
│ │ doe.csv
│ │ points.bin
│ │ settings.json
│ │
│ └───snapshots
│ file0.bin
│ file1.bin
│ file2.bin
│ file3.bin
│
└───stress
│ doe.csv
│ points.bin
│ settings.json
│
└───snapshots
file0.bin
file1.bin
file2.bin
file3.bin
MAPDL notch model#
This example uses the model created in 3D Stress Concentration Analysis for a Notched Plate as a base. This
example was modified to add a nodal component, load_node
, to which the force is applied. This
makes is simple to retrieve and modify the force scoping for parametric runs.
The model, which uses load force in Newtons as the input, is parametrically varied.
Additional packages used#
In addition to PyMAPDL, the example requires that the following packages are installed and running:
Core functions#
Solve the MAPDL parametric variations#
The run_mapdl_variations()
function solves the MAPDL model for a set of parameter variations.
It loads a saved MAPDL database and loops through a list of force values. For each value, it applies
that force to a nodal component, solves the model, and records the location of the result file, along
with the parameter names and values.
def run_mapdl_variations():
"""
Run the MAPDL model parametric variations.
Returns
-------
list[tuple[str, dict]]
List of tuples of the MAPDL result file path (on the platform where MAPDL was executed) and
the parameter values for each variation solved.
"""
# Specify the force load variations
forces = [250, 500, 750, 1000]
# Start MAPDL and disable all but error messages
mapdl = launch_mapdl(loglevel="ERROR")
# Download the example database
notch_file = download_example_data(
filename="3d_notch.db", directory="pymapdl/static_ROM_data_generation"
)
mapdl.resume(notch_file, mute=True)
# Initialize the outputs
outputs = []
# Solve the parameter variations
for idx, force_load in enumerate(forces):
# Rename the job, change log, and error log files
mapdl.filname(f"variation_{idx}")
mapdl.run("/SOLU")
mapdl.cmsel("S", "load_node", "NODE")
mapdl.fdele("ALL", "FX")
mapdl.f("ALL", "FX", force_load)
mapdl.allsel()
mapdl.antype("STATIC")
mapdl.solve()
mapdl.finish(mute=True)
rst_path = mapdl.result_file
outputs.append((rst_path, {"force[N]": force_load}))
print(f"MAPDL run in: {mapdl.directory}")
mapdl.exit()
return outputs
Export ROM data for the solved variations#
The export_static_ROM_data()
function exports the ROM data from a set of solved
parametric variations. The function loads each results file into DPF and gets the scoping. It then
calls the export_static_ROM_variation()
function to export the ROM data for that result file.
The new_metadata
Boolean is set to True
on the first loop to trigger the creation of the
points.bin
and settings.json
files and a new doe.csv
file.
The pytwin.write_binary()
function writes the result field data to a ROM binary
file.
def export_static_ROM_data(
mapdl_results: list[tuple[str, dict]], output_folder: str | Path
):
"""
Export static ROM data to output folder.
Parameters
----------
mapdl_results: list[tuple[str, dict]]
List of tuples of the MAPDL result file path and the parameter values for each variation
solved.
output_folder: str|Path
Path to the folder to store ROM output data in.
"""
for idx, (rst_path, parameters) in enumerate(mapdl_results):
# Load the results to DPF and create scoping.
model = dpf.Model(rst_path)
scoping = get_scoping(model)
# Only create 'points.bin' and 'settings.json' files on first design point
new_metadata = idx == 0
# Export displacement and stress data
for name in ["displacement", "stress"]:
data_folder = Path(output_folder).joinpath(name)
export_static_ROM_variation(
model,
scoping,
name,
data_folder,
parameters=parameters,
snap_idx=idx,
new_metadata=new_metadata,
)
Export ROM data for a specific variation#
The export_static_ROM_variation()
function exports ROM snapshot data for a specific
parametric variation. The function exports a snapshot for displacement
and for stress
to
correspondingly named folders and records the snapshot name and parameter data to the doe.csv
file in that folder.
The results are sorted by the scoping IDs to ensure consistent ordering of results and points.
The new_metadata
Boolean controls the creation of the points.bin
and settings.json
files
and a new doe.csv
file. If it is False
, the first two files are not written, and parameter
data is appended to the existing doe.csv
file.
def export_static_ROM_variation(
model: dpf.Model,
scoping: dpf.Scoping,
name: str,
output_folder: str | Path,
parameters: dict,
snap_idx: int = 0,
new_metadata: bool = False,
):
"""
Export static ROM data for one parameter variation.
Parameters
----------
model : dpf.Model
DPF model with results data loaded.
scoping : dpf.Scoping
DPF nodal scoping for result export.
name : str
Result quantity to export. Options are ``displacement`` and ``stress``.
output_folder : str|Path
Folder to store exported data in. Use separate folders for each physics type.
parameters : dict
Dictionary of name-value pairs for the input parameters used to generate the current
results.
snap_idx : int, default = 0
Unique ID for the current results.
new_metadata : bool, default = False
Whether to trigger the creation of the following files for a given
data generation run, overwriting any existing ones: ``points.bin``,
``settings.json``, and ``doe.csv``.
"""
# Create the output folder
output_folder = Path(output_folder)
output_folder.mkdir(parents=True, exist_ok=True)
# Modify this section to export additional result types
is_deformation = False
if name == "displacement":
result = model.results.displacement
is_deformation = True
elif name == "stress":
result = model.results.stress
else:
raise ValueError(f"Unsupported result type: {name}")
# Retrieve displacement and stress at last result set
scoped_result = result.on_last_time_freq.on_mesh_scoping(scoping)
# Result must be sorted by scoping to ensure consistency across outputs
sorted_result = dpf.operators.logic.ascending_sort_fc(
scoped_result, sort_by_scoping=True
)
result_field = sorted_result.outputs.fields_container()[0]
if new_metadata:
write_points(model, scoping, output_folder)
write_doe_headers(output_folder, name, parameters)
write_settings(output_folder, result_field, name, is_deformation=is_deformation)
# Write snapshots
snapshot_folder = output_folder.joinpath("snapshots")
snapshot_folder.mkdir(parents=True, exist_ok=True)
snap_name = f"file{snap_idx}.bin"
write_doe_entry(output_folder, snap_name, parameters)
write_binary(snapshot_folder.joinpath(snap_name), result_field.data)
Additional functions#
Set the export scope#
The ROM data is exported on all nodes connected to elements. The get_scoping()
function gets
the nodes that are connected to all the elements in the mesh. This avoids including any unconnected
nodes, which have null values, in the scoping.
The returned scoping initially contains duplicate node IDs because a node can be connected to
multiple elements and is included once for each one. Creating a Python set
of node IDs removes the
duplicates.
def get_scoping(model: dpf.Model):
"""Return scoping of unique node IDs connected to elements in model."""
op = dpf.operators.scoping.connectivity_ids(
mesh_scoping=model.metadata.meshed_region.elements.scoping,
mesh=model.metadata.meshed_region,
take_mid_nodes=True,
)
# Get output data
connected_nodes_scoping = op.outputs.mesh_scoping()
# Compress the list to only keep unique IDs
connected_nodes_scoping.ids = sorted(list(set(connected_nodes_scoping.ids)))
return connected_nodes_scoping
Write points coordinates file#
The write_points()
function writes the x, y, z coordinates of the scoped nodes to the ROM points.bin
file
using the pytwin.write_binary()
function.
def write_points(model: dpf.Model, scoping: dpf.Scoping, output_folder: str | Path):
"""Write the ``points.bin`` file."""
nodes = model.metadata.meshed_region.nodes
scoped_node_indices, _ = nodes.map_scoping(scoping)
points_coordinates = nodes.coordinates_field.data[scoped_node_indices]
write_binary(Path(output_folder).joinpath("points.bin"), points_coordinates)
Write ROM settings#
The write_settings()
function writes the ROM settings.json
file. This records
information such as the field dimensionality (scalar, vector, tensor), result name,
unit, and whether it represent a deformation. The full file specification is available
in the Twin Builder Static ROM Builder documentation.
The original node numbers from export scoping are compressed and stored in the ids
field.
def write_settings(
path: str | Path, field: dpf.Field, name: str, is_deformation: bool = False
):
"""Write the ``settings.json`` file."""
if field.component_count in [1, 3]:
dimensionality = [field.component_count]
symmetricalDim = False
elif field.component_count == 6:
dimensionality = [3, 3]
symmetricalDim = True
else:
raise ValueError(f"Unsupported field dimensionality {field.component_count}")
settings = {
"pointsCoordinates": False,
"ids": compress_id_list(field.scoping.ids),
"location": "Nodal",
"unit": field.unit,
"unitDimension": {},
"unitFactor": 1.0,
"name": name,
"deformation": is_deformation,
"dimensionality": dimensionality,
"symmetricalDim": symmetricalDim,
"namedSelections": {},
}
with open(Path(path).joinpath("settings.json"), "w") as fw:
# Set default to convert Numpy int to int
json.dump(settings, fw, default=int, indent=4)
Compress ID list#
The settings.json
specification supports storing lists of consecutive integers in a
compressed fashion. The compress_id_list()
function implements this compression.
Runs of three or more incrementally increasing values are replaced by a sequence of
start, -1, end
.
def compress_id_list(id_list: np.ndarray):
"""
Compress array of consecutive IDs.
Compress array by replacing runs of three or more consecutive integers with ``start, -1, end``.
Example
-------
>>> input = np.array([0, 1, 2, 3, 4, 5, 6, 28, 29, 30, 31, 13, 15, 17, 18, 19, 20])
>>> compress_id_list(input)
[0, -1, 6, 28, -1, 31, 13, 15, 17, -1, 20]
"""
if id_list.size == 0:
return []
# Find breaks in consecutive sequences.
breaks = np.where(np.diff(id_list) != 1)[0]
# Add endpoints to form run boundaries
run_starts = np.insert(breaks + 1, 0, 0)
run_ends = np.append(breaks, len(id_list) - 1)
result = []
for start, end in zip(run_starts, run_ends):
length = end - start + 1
if length >= 3:
result.extend([int(id_list[start]), -1, int(id_list[end])])
else:
result.extend(id_list[start : end + 1].tolist())
return result
Run the script#
To run the script, install PyMAPDL and the additional dependencies listed in
Additional packages used. Then download and run
static_rom_data_generation.py
. The ROM data is stored in
a folder named ansys_pymadl_Static_ROM
in the system temporary folder.
Script assumptions and modification ideas#
Local execution#
The scripts assume that both the MAPDL solving and DPF export are performed locally on the same machine as the Python script execution.
For potential modification options, see Basic DPF-Core Usage with PyMAPDL.
Additional result types#
The script exports stress
and displacement
results. You can include or
exclude additional result types by modifying the export_static_ROM_variation()
function.
For example, you can add elastic strain by making these changes to
the export_static_ROM_variation()
function from line 181:
# Modify this section to export additional result types
is_deformation = False
if name == "displacement":
result = model.results.displacement
is_deformation = True
elif name == "stress":
result = model.results.stress
# Add additional quantities here
elif name == "elastic_strain":
result = model.results.elastic_strain
else:
raise ValueError(f"Unsupported result type: {name}")
Modify result scoping#
The script exports results on all nodes that are connected to elements. This does not account for nodes that are connected to elements but do not have results associated with them. For example, MPC184 pilot nodes would not usually have a stress result.
The script also does not allow scoping to components.
You could modify the get_scoping()
function to allow broader scoping options.
Modify settings#
The settings.json
files generated by the write_settings()
function implicitly
assume SI units. (unitFactor
is a scaling factor from SI units.) These files do not
include any information about unit dimensions.
Here is an example for displacement:
"unitDimension": {"length": 1.0}
Here is an example for stress:
"unitDimension": {"mass": 1.0, "length": -1.0, "time": -2.0}
You can add logic to check the model units and change the unitDimension
and unitFactor
fields accordingly. While this information is not used to build the ROM, it can be used when
consuming the ROM in downstream applications.
If results are included on nodal components, you can reference them in the
namedSelections
field. Each entry consists of a name and the indices of the
scoped nodes in the overall scoping (not node IDs). You can use the
compress_id_list()
function to compress long lists of nodes.
For example, to add a named selection called first_nodes
on the first one hundred scoped nodes
and another called second_nodes
to the fiftieth to one hundred and fiftieth nodes, you would add
this:
namedSelections: {"first_nodes": [0, -1, 99], "second_nodes": [49, -1, 149]}