Source code for cardiac_geometries.utils

import tempfile
import typing
import xml.etree.ElementTree as ET
from enum import Enum
from pathlib import Path
from typing import Iterable, NamedTuple

from mpi4py import MPI

import basix
import dolfinx
import numpy as np
from dolfinx.graph import adjacencylist
from packaging.version import Version
from structlog import get_logger

try:
    import dolfinx.io.gmsh as gmshio
except ImportError:
    import dolfinx.io.gmshio as gmshio  # type: ignore[import]

logger = get_logger()

quads = ("Quadrature", "Q", "Quad", "quadrature", "q", "quad")
QUADNR = 42


[docs] class GMshModel(NamedTuple): mesh: dolfinx.mesh.Mesh cell_tags: dolfinx.mesh.MeshTags facet_tags: dolfinx.mesh.MeshTags edge_tags: dolfinx.mesh.MeshTags vertex_tags: dolfinx.mesh.MeshTags
def distribute_entity_data( mesh: dolfinx.mesh.Mesh, tdim: int, marked_entities: np.ndarray, entity_values: np.ndarray, ) -> tuple[np.ndarray, np.ndarray]: if Version(dolfinx.__version__) >= Version("0.9.0"): local_entities, local_values = dolfinx.io.utils.distribute_entity_data( mesh, tdim, marked_entities, entity_values ) else: local_entities, local_values = dolfinx.io.utils.distribute_entity_data( mesh._cpp_object, tdim, marked_entities, entity_values ) return local_entities, local_values # copied from https://github.com/FEniCS/dolfinx/blob/main/python/dolfinx/io/gmshio.py
[docs] def model_to_mesh( model, comm: MPI.Comm, rank: int, gdim: int = 3, partitioner: typing.Optional[ typing.Callable[ [MPI.Comm, int, int, dolfinx.cpp.graph.AdjacencyList_int32], dolfinx.cpp.graph.AdjacencyList_int32, ] ] = None, dtype=dolfinx.default_real_type, ) -> GMshModel: """Create a Mesh from a Gmsh model. Creates a :class:`dolfinx.mesh.Mesh` from the physical entities of the highest topological dimension in the Gmsh model. In parallel, the gmsh model is processed on one MPI rank, and the :class:`dolfinx.mesh.Mesh` is distributed across ranks. Args: model: Gmsh model. comm: MPI communicator to use for mesh creation. rank: MPI rank that the Gmsh model is initialized on. gdim: Geometrical dimension of the mesh. partitioner: Function that computes the parallel distribution of cells across MPI ranks. Returns: A tuple containing the :class:`dolfinx.mesh.Mesh` and :class:`dolfinx.mesh.MeshTags` for cells, facets, edges and vertices. Note: For performance, this function should only be called once for large problems. For re-use, it is recommended to save the mesh and corresponding tags using :class:`dolfinxio.XDMFFile` after creation for efficient access. """ if comm.rank == rank: assert model is not None, "Gmsh model is None on rank responsible for mesh creation." # Get mesh geometry and mesh topology for each element x = gmshio.extract_geometry(model) topologies = gmshio.extract_topology_and_markers(model) # Extract Gmsh cell id, dimension of cell and number of nodes to # cell for each num_cell_types = len(topologies.keys()) cell_information = dict() cell_dimensions = np.zeros(num_cell_types, dtype=np.int32) for i, element in enumerate(topologies.keys()): _, dim, _, num_nodes, _, _ = model.mesh.getElementProperties(element) cell_information[i] = {"id": element, "dim": dim, "num_nodes": num_nodes} cell_dimensions[i] = dim # Sort elements by ascending dimension perm_sort = np.argsort(cell_dimensions) # Broadcast cell type data and geometric dimension cell_id = cell_information[perm_sort[-1]]["id"] tdim = cell_information[perm_sort[-1]]["dim"] num_nodes = cell_information[perm_sort[-1]]["num_nodes"] cell_id, num_nodes = comm.bcast([cell_id, num_nodes], root=rank) # Check for facet data and broadcast relevant info if True has_facet_data = False if tdim - 1 in cell_dimensions: has_facet_data = True has_edge_data = False if tdim - 2 in cell_dimensions: has_edge_data = True has_vertex_data = False if tdim - 3 in cell_dimensions: has_vertex_data = True has_facet_data = comm.bcast(has_facet_data, root=rank) if has_facet_data: num_facet_nodes = comm.bcast(cell_information[perm_sort[-2]]["num_nodes"], root=rank) gmsh_facet_id = cell_information[perm_sort[-2]]["id"] marked_facets = np.asarray(topologies[gmsh_facet_id]["topology"], dtype=np.int64) facet_values = np.asarray(topologies[gmsh_facet_id]["cell_data"], dtype=np.int32) has_edge_data = comm.bcast(has_edge_data, root=rank) if has_edge_data: num_edge_nodes = comm.bcast(cell_information[perm_sort[-3]]["num_nodes"], root=rank) gmsh_edge_id = cell_information[perm_sort[-3]]["id"] marked_edges = np.asarray(topologies[gmsh_edge_id]["topology"], dtype=np.int64) edge_values = np.asarray(topologies[gmsh_edge_id]["cell_data"], dtype=np.int32) has_vertex_data = comm.bcast(has_vertex_data, root=rank) if has_vertex_data: num_vertex_nodes = comm.bcast(cell_information[perm_sort[-4]]["num_nodes"], root=rank) gmsh_vertex_id = cell_information[perm_sort[-4]]["id"] marked_vertices = np.asarray(topologies[gmsh_vertex_id]["topology"], dtype=np.int64) vertex_values = np.asarray(topologies[gmsh_vertex_id]["cell_data"], dtype=np.int32) cells = np.asarray(topologies[cell_id]["topology"], dtype=np.int64) cell_values = np.asarray(topologies[cell_id]["cell_data"], dtype=np.int32) else: cell_id, num_nodes = comm.bcast([None, None], root=rank) cells, x = np.empty([0, num_nodes], dtype=np.int32), np.empty([0, gdim], dtype=dtype) cell_values = np.empty((0,), dtype=np.int32) has_facet_data = comm.bcast(None, root=rank) if has_facet_data: num_facet_nodes = comm.bcast(None, root=rank) marked_facets = np.empty((0, num_facet_nodes), dtype=np.int32) facet_values = np.empty((0,), dtype=np.int32) has_edge_data = comm.bcast(None, root=rank) if has_edge_data: num_edge_nodes = comm.bcast(None, root=rank) marked_edges = np.empty((0, num_edge_nodes), dtype=np.int32) edge_values = np.empty((0,), dtype=np.int32) has_vertex_data = comm.bcast(None, root=rank) if has_vertex_data: num_vertex_nodes = comm.bcast(None, root=rank) marked_vertices = np.empty((0, num_vertex_nodes), dtype=np.int32) vertex_values = np.empty((0,), dtype=np.int32) # Create distributed mesh ufl_domain = gmshio.ufl_mesh(cell_id, gdim, dtype=dtype) gmsh_cell_perm = gmshio.cell_perm_array( dolfinx.cpp.mesh.to_type(str(ufl_domain.ufl_cell())), num_nodes ) cells = cells[:, gmsh_cell_perm].copy() mesh = dolfinx.mesh.create_mesh( comm, cells, x[:, :gdim].astype(dtype, copy=False), ufl_domain, partitioner ) # Create MeshTags for cells local_entities, local_values = distribute_entity_data( mesh, mesh.topology.dim, cells, cell_values ) mesh.topology.create_connectivity(mesh.topology.dim, 0) # adj = dolfinx.cpp.graph.AdjacencyList_int32(local_entities) adj = adjacencylist(local_entities) ct = dolfinx.mesh.meshtags_from_entities( mesh, mesh.topology.dim, adj, local_values.astype(np.int32, copy=False) ) ct.name = "Cell tags" # Create MeshTags for facets topology = mesh.topology tdim = topology.dim if has_facet_data: # Permute facets from MSH to DOLFINx ordering # FIXME: This does not work for prism meshes if ( topology.cell_type == dolfinx.mesh.CellType.prism or topology.cell_type == dolfinx.mesh.CellType.pyramid ): raise RuntimeError(f"Unsupported cell type {topology.cell_type}") facet_type = dolfinx.cpp.mesh.cell_entity_type( dolfinx.cpp.mesh.to_type(str(ufl_domain.ufl_cell())), tdim - 1, 0 ) gmsh_facet_perm = gmshio.cell_perm_array(facet_type, num_facet_nodes) marked_facets = marked_facets[:, gmsh_facet_perm] local_entities, local_values = distribute_entity_data( mesh, mesh.topology.dim - 1, marked_facets, facet_values ) mesh.topology.create_connectivity(topology.dim - 1, tdim) adj = adjacencylist(local_entities) ft = gmshio.meshtags_from_entities( mesh, tdim - 1, adj, local_values.astype(np.int32, copy=False) ) ft.name = "Facet tags" else: ft = dolfinx.mesh.meshtags( mesh, tdim - 1, np.empty(0, dtype=np.int32), np.empty(0, dtype=np.int32) ) if has_edge_data: # Permute edges from MSH to DOLFINx ordering edge_type = dolfinx.cpp.mesh.cell_entity_type( dolfinx.cpp.mesh.to_type(str(ufl_domain.ufl_cell())), tdim - 2, 0 ) gmsh_edge_perm = gmshio.cell_perm_array(edge_type, num_edge_nodes) marked_edges = marked_edges[:, gmsh_edge_perm] local_entities, local_values = distribute_entity_data( mesh, tdim - 2, marked_edges, edge_values ) mesh.topology.create_connectivity(topology.dim - 2, tdim) adj = adjacencylist(local_entities) et = gmshio.meshtags_from_entities( mesh, tdim - 2, adj, local_values.astype(np.int32, copy=False) ) et.name = "Edge tags" else: et = dolfinx.mesh.meshtags( mesh, tdim - 2, np.empty(0, dtype=np.int32), np.empty(0, dtype=np.int32) ) if has_vertex_data: # Permute vertices from MSH to DOLFINx ordering vertex_type = dolfinx.cpp.mesh.cell_entity_type( dolfinx.cpp.mesh.to_type(str(ufl_domain.ufl_cell())), tdim - 3, 0 ) gmsh_vertex_perm = gmshio.cell_perm_array(vertex_type, num_vertex_nodes) marked_vertices = marked_vertices[:, gmsh_vertex_perm] local_entities, local_values = distribute_entity_data( mesh, tdim - 3, marked_vertices, vertex_values ) mesh.topology.create_connectivity(topology.dim - 3, tdim) adj = adjacencylist(local_entities) vt = gmshio.meshtags_from_entities( mesh, tdim - 3, adj, local_values.astype(np.int32, copy=False) ) vt.name = "Vertex tags" else: vt = dolfinx.mesh.meshtags( mesh, tdim - 3, np.empty(0, dtype=np.int32), np.empty(0, dtype=np.int32) ) return GMshModel(mesh, ct, ft, et, vt)
[docs] def parse_element( space_string: str, mesh: dolfinx.mesh.Mesh, dim: int, discontinuous: bool = False ) -> basix.ufl._ElementBase: """ Parse a string representation of a basix element family """ family_str, degree_str = space_string.split("_") kwargs = {"degree": int(degree_str), "cell": mesh.ufl_cell().cellname()} if dim > 1: if family_str in quads: kwargs["value_shape"] = (dim,) else: kwargs["shape"] = (dim,) # breakpoint() if family_str in ["Lagrange", "P", "CG"]: el = basix.ufl.element(family=basix.ElementFamily.P, discontinuous=discontinuous, **kwargs) elif family_str in ["Discontinuous Lagrange", "DG", "dP"]: el = basix.ufl.element(family=basix.ElementFamily.P, discontinuous=True, **kwargs) elif family_str in quads: el = basix.ufl.quadrature_element(scheme="default", **kwargs) else: families = list(basix.ElementFamily.__members__.keys()) msg = f"Unknown element family: {family_str!r}, available families: {families}" raise ValueError(msg) return el
[docs] def space_from_string( space_string: str, mesh: dolfinx.mesh.Mesh, dim: int, discontinuous: bool = False ) -> dolfinx.fem.functionspace: """ Constructed a finite elements space from a string representation of the space Arguments --------- space_string : str A string on the form {family}_{degree} which determines the space. Example 'Lagrange_1'. mesh : df.Mesh The mesh dim : int 1 for scalar space, 3 for vector space. discontinuous: bool If true force element to be discontinuous, by default False """ el = parse_element(space_string, mesh, dim, discontinuous=discontinuous) return dolfinx.fem.functionspace(mesh, el)
def element2array(el: basix.ufl._BlockedElement) -> np.ndarray: if el.family_name in quads: return np.array( [QUADNR, int(el.cell_type), int(el.degree), 0], dtype=np.uint8, ) else: return np.array( [ int(el.basix_element.family), int(el.cell_type), int(el.degree), int(el.basix_element.discontinuous), ], dtype=np.uint8, ) def number2Enum(num: int, enum: Iterable) -> Enum: for e in enum: if int(e) == num: return e raise ValueError(f"Invalid value {num} for enum {enum}") def array2element(arr: np.ndarray) -> basix.finite_element.FiniteElement: cell_type = number2Enum(arr[1], basix.CellType) degree = int(arr[2]) discontinuous = bool(arr[3]) if arr[0] == QUADNR: return basix.ufl.quadrature_element( scheme="default", degree=degree, value_shape=(3,), cell=cell_type ) else: family = number2Enum(arr[0], basix.ElementFamily) # TODO: Shape is hardcoded to (3,) for now, but this should also be stored return basix.ufl.element( family=family, cell=cell_type, degree=degree, discontinuous=discontinuous, shape=(3,), lagrange_variant=basix.LagrangeVariant.unset, ) def handle_mesh_name(mesh_name: str = "") -> Path: if mesh_name == "": fd, mesh_name = tempfile.mkstemp(suffix=".msh") return Path(mesh_name).with_suffix(".msh") def json_serial(obj): if isinstance(obj, (np.ndarray)): return obj.tolist() else: try: return str(obj) except Exception: raise TypeError("Type %s not serializable" % type(obj))
[docs] class GMshGeometry(NamedTuple): mesh: dolfinx.mesh.Mesh cfun: dolfinx.mesh.MeshTags | None ffun: dolfinx.mesh.MeshTags | None efun: dolfinx.mesh.MeshTags | None vfun: dolfinx.mesh.MeshTags | None markers: dict[str, tuple[int, int]]
def read_mesh( comm, filename: str | Path ) -> tuple[dolfinx.mesh.Mesh, dict[str, dolfinx.mesh.MeshTags]]: tags = {} with dolfinx.io.XDMFFile(comm, filename, "r") as xdmf: mesh = xdmf.read_mesh(name="Mesh", ghost_mode=dolfinx.mesh.GhostMode.none) for var, name, dim in [ ("cfun", "Cell tags", mesh.topology.dim), ("ffun", "Facet tags", mesh.topology.dim - 1), ("efun", "Edge tags", mesh.topology.dim - 2), ("vfun", "Vertex tags", mesh.topology.dim - 3), ]: mesh.topology.create_connectivity(dim, mesh.topology.dim) try: tags[var] = xdmf.read_meshtags(mesh, name=name) except RuntimeError: continue return mesh, tags def gmsh2dolfin(comm: MPI.Intracomm, msh_file, rank: int = 0) -> GMshGeometry: logger.debug(f"Convert file {msh_file} to dolfin") outdir = Path(msh_file).parent outdir.mkdir(parents=True, exist_ok=True) if Version(dolfinx.__version__) >= Version("0.10.0"): mesh_data = gmshio.read_from_msh(comm=comm, filename=msh_file) mesh = mesh_data.mesh markers_ = mesh_data.physical_groups ct = mesh_data.cell_tags tdim = mesh.topology.dim if ct is None: ct = dolfinx.mesh.meshtags( mesh, tdim, np.empty(0, dtype=np.int32), np.empty(0, dtype=np.int32) ) ft = mesh_data.facet_tags if ft is None: ft = dolfinx.mesh.meshtags( mesh, tdim - 1, np.empty(0, dtype=np.int32), np.empty(0, dtype=np.int32) ) if hasattr(mesh_data, "edge_tags"): et = mesh_data.edge_tags else: et = mesh_data.ridge_tags if et is None: et = dolfinx.mesh.meshtags( mesh, tdim - 2, np.empty(0, dtype=np.int32), np.empty(0, dtype=np.int32) ) if hasattr(mesh_data, "vertex_tags"): vt = mesh_data.vertex_tags else: vt = mesh_data.peak_tags if vt is None: vt = dolfinx.mesh.meshtags( mesh, tdim - 3, np.empty(0, dtype=np.int32), np.empty(0, dtype=np.int32) ) markers = {k: tuple(reversed(v)) for k, v in markers_.items()} else: import gmsh # We could make this work in parallel in the future if comm.rank == rank: gmsh.initialize() gmsh.model.add("Mesh from file") gmsh.merge(str(msh_file)) mesh, ct, ft, et, vt = model_to_mesh(gmsh.model, comm, 0) markers = { gmsh.model.getPhysicalName(*v): tuple(reversed(v)) for v in gmsh.model.getPhysicalGroups() } gmsh.finalize() else: mesh, ct, ft, et, vt = model_to_mesh(gmsh.model, comm, 0) markers = {} markers = comm.bcast(markers, root=rank) mesh.name = "Mesh" ct.name = "Cell tags" ft.name = "Facet tags" et.name = "Edge tags" vt.name = "Vertex tags" # Save tags to xdmf with dolfinx.io.XDMFFile(comm, outdir / "mesh.xdmf", "w") as xdmf: xdmf.write_mesh(mesh) xdmf.write_meshtags( ct, mesh.geometry, geometry_xpath=f"/Xdmf/Domain/Grid[@Name='{mesh.name}']/Geometry", ) mesh.topology.create_connectivity(2, 3) xdmf.write_meshtags( ft, mesh.geometry, geometry_xpath=f"/Xdmf/Domain/Grid[@Name='{mesh.name}']/Geometry", ) mesh.topology.create_connectivity(1, 3) xdmf.write_meshtags( et, mesh.geometry, geometry_xpath=f"/Xdmf/Domain/Grid[@Name='{mesh.name}']/Geometry", ) mesh.topology.create_connectivity(0, 3) xdmf.write_meshtags( vt, mesh.geometry, geometry_xpath=f"/Xdmf/Domain/Grid[@Name='{mesh.name}']/Geometry", ) return GMshGeometry(mesh, ct, ft, et, vt, markers) def create_xdmf_pointcloud(filename: Path, us: typing.Sequence[dolfinx.fem.Function]) -> None: # Adopted from https://gist.github.com/jorgensd/8bae61ad7a0c211570dff0116a68a356 if len(us) == 0: return import adios2 u = us[0] points = u.function_space.tabulate_dof_coordinates() h5name = filename.with_suffix(".h5") bs = u.function_space.dofmap.index_map_bs comm = u.function_space.mesh.comm num_dofs_global = u.function_space.dofmap.index_map.size_global num_dofs_local = u.function_space.dofmap.index_map.size_local local_range = np.array(u.function_space.dofmap.index_map.local_range, dtype=np.int64) # Write XDMF on rank 0 if comm.rank == 0: xdmf = ET.Element("XDMF") xdmf.attrib["Version"] = "3.0" xdmf.attrib["xmlns:xi"] = "http://www.w3.org/2001/XInclude" domain = ET.SubElement(xdmf, "Domain") grid = ET.SubElement(domain, "Grid") grid.attrib["GridType"] = "Uniform" grid.attrib["Name"] = "Point Cloud" topology = ET.SubElement(grid, "Topology") topology.attrib["NumberOfElements"] = str(num_dofs_global) topology.attrib["TopologyType"] = "PolyVertex" topology.attrib["NodesPerElement"] = "1" geometry = ET.SubElement(grid, "Geometry") geometry.attrib["GeometryType"] = "XY" if points.shape[1] == 2 else "XYZ" for u in us: it0 = ET.SubElement(geometry, "DataItem") it0.attrib["Dimensions"] = f"{num_dofs_global} {points.shape[1]}" it0.attrib["Format"] = "HDF" it0.text = f"{h5name.name}:/Step0/Points" attrib = ET.SubElement(grid, "Attribute") attrib.attrib["Name"] = u.name if bs == 1: attrib.attrib["AttributeType"] = "Scalar" else: attrib.attrib["AttributeType"] = "Vector" attrib.attrib["Center"] = "Node" it1 = ET.SubElement(attrib, "DataItem") it1.attrib["Dimensions"] = f"{num_dofs_global} {bs}" it1.attrib["Format"] = "HDF" it1.text = f"{h5name.name}:/Step0/Values_{u.name}" text = [ '<?xml version="1.0"?>\n<!DOCTYPE Xdmf SYSTEM "Xdmf.dtd" []>\n', ET.tostring(xdmf, encoding="unicode"), ] filename.write_text("".join(text)) # Create ADIOS2 reader # start = time.perf_counter() adios = adios2.ADIOS(comm) io = adios.DeclareIO("Point cloud writer") io.SetEngine("HDF5") outfile = io.Open(h5name.as_posix(), adios2.Mode.Write) points_out = points[:num_dofs_local, :] pointvar = io.DefineVariable( "Points", points_out, shape=[num_dofs_global, points.shape[1]], start=[local_range[0], 0], count=[num_dofs_local, points.shape[1]], ) outfile.Put(pointvar, points_out) for u in us: data = u.x.array[: num_dofs_local * bs].reshape(-1, bs) valuevar = io.DefineVariable( f"Values_{u.name}", data, shape=[num_dofs_global, bs], start=[local_range[0], 0], count=[num_dofs_local, bs], ) outfile.Put(valuevar, data) outfile.PerformPuts() outfile.Close() assert adios.RemoveIO("Point cloud writer")