Source code for cardiac_geometries.geometry

import json
import logging
import shutil
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

from mpi4py import MPI

import adios4dolfinx
import dolfinx
import ufl
from packaging.version import Version

from . import utils

logger = logging.getLogger(__name__)


def info_path(path: Path) -> Path:
    return path.parent / f"info_{path.stem}.json"


def markers_path(path: Path) -> Path:
    return path.parent / f"markers_{path.stem}.json"


def microstructure_path(path: Path) -> Path:
    return path.parent / f"microstructure_{path.stem}.json"


def additional_data_path(path: Path) -> Path:
    return path.parent / f"additional_data_{path.stem}.json"


def save_additional_data(
    additional_data: dict[str, dolfinx.mesh.MeshTags | dolfinx.fem.Function],
    path: Path,
    mesh: dolfinx.mesh.Mesh,
) -> None:
    additional_data_attributes = {}
    for name, data in additional_data.items():
        if isinstance(data, dolfinx.mesh.MeshTags):
            adios4dolfinx.write_meshtags(
                meshtags=data,
                mesh=mesh,
                filename=path,
                meshtag_name=name,
            )
            additional_data_attributes[name] = {"type": "MeshTags"}
        elif isinstance(data, dolfinx.fem.Function):
            adios4dolfinx.write_function(
                u=data,
                filename=path,
                name=name,
            )
            additional_data_attributes[name] = {
                "type": "Function",
                "element": utils.element2array(data.ufl_element()),
            }
        else:
            logger.warning(
                f"Additional data '{name}' of type {type(data)} "
                "is not supported and will not be saved."
            )

    if mesh.comm.rank == 0:
        additional_data_path(path).write_text(
            json.dumps(additional_data_attributes, indent=4, default=utils.json_serial)
        )


def load_additional_data(
    path: Path,
    mesh: dolfinx.mesh.Mesh,
) -> dict[str, dolfinx.mesh.MeshTags | dolfinx.fem.Function]:
    additional_data: dict[str, dolfinx.mesh.MeshTags | dolfinx.fem.Function] = {}
    if not additional_data_path(path).exists():
        return additional_data

    if mesh.comm.rank == 0:
        additional_data_attributes = json.loads(additional_data_path(path).read_text())
    else:
        additional_data_attributes = {}
    additional_data_attributes = mesh.comm.bcast(additional_data_attributes, root=0)

    for name, attr in additional_data_attributes.items():
        if attr["type"] == "MeshTags":
            mt = adios4dolfinx.read_meshtags(mesh=mesh, meshtag_name=name, filename=path)
            additional_data[name] = mt
        elif attr["type"] == "Function":
            V = utils.array2functionspace(mesh, tuple(attr["element"]))
            f = dolfinx.fem.Function(V, name=name)
            adios4dolfinx.read_function(u=f, filename=path, name=name)
            additional_data[name] = f
        else:
            logger.warning(
                f"Additional data '{name}' of type {attr['type']} "
                "is not supported and will not be loaded."
            )

    return additional_data


def save_geometry(
    path: Path,
    mesh: dolfinx.mesh.Mesh,
    markers: dict[str, tuple[int, int]] | None = None,
    info: dict[str, Any] | None = None,
    cfun: dolfinx.mesh.MeshTags | None = None,
    ffun: dolfinx.mesh.MeshTags | None = None,
    efun: dolfinx.mesh.MeshTags | None = None,
    vfun: dolfinx.mesh.MeshTags | None = None,
    f0: dolfinx.fem.Function | None = None,
    s0: dolfinx.fem.Function | None = None,
    n0: dolfinx.fem.Function | None = None,
    additional_data: dict[str, dolfinx.mesh.MeshTags | dolfinx.fem.Function] | None = None,
) -> None:
    path = Path(path)

    shutil.rmtree(path, ignore_errors=True)
    mesh.comm.barrier()
    adios4dolfinx.write_mesh(mesh=mesh, filename=path)

    if mesh.comm.rank == 0:
        if markers is not None:
            markers_path(path).write_text(json.dumps(markers, indent=4, default=utils.json_serial))
        if info is not None:
            info_path(path).write_text(json.dumps(info, indent=4, default=utils.json_serial))

    if cfun is not None:
        adios4dolfinx.write_meshtags(
            meshtags=cfun,
            mesh=mesh,
            filename=path,
            meshtag_name="Cell tags",
        )
    if ffun is not None:
        adios4dolfinx.write_meshtags(
            meshtags=ffun,
            mesh=mesh,
            filename=path,
            meshtag_name="Facet tags",
        )
    if Version(dolfinx.__version__) >= Version("0.10.0"):
        # Write edge and vertex tags is buggy in older versions
        if efun is not None:
            adios4dolfinx.write_meshtags(
                meshtags=efun,
                mesh=mesh,
                filename=path,
                meshtag_name="Edge tags",
            )
        if vfun is not None:
            adios4dolfinx.write_meshtags(
                meshtags=vfun,
                mesh=mesh,
                filename=path,
                meshtag_name="Vertex tags",
            )

    from .fibers.utils import save_microstructure

    save_microstructure(
        mesh=mesh,
        functions=[f for f in (f0, s0, n0) if f is not None],
        path=path,
    )

    if additional_data is not None:
        save_additional_data(additional_data, path, mesh=mesh)
    mesh.comm.barrier()


[docs] @dataclass class Geometry: mesh: dolfinx.mesh.Mesh markers: dict[str, tuple[int, int]] = field(default_factory=dict) cfun: dolfinx.mesh.MeshTags | None = None ffun: dolfinx.mesh.MeshTags | None = None efun: dolfinx.mesh.MeshTags | None = None vfun: dolfinx.mesh.MeshTags | None = None f0: dolfinx.fem.Function | None = None s0: dolfinx.fem.Function | None = None n0: dolfinx.fem.Function | None = None info: dict[str, Any] = field(default_factory=dict) additional_data: dict[str, dolfinx.mesh.MeshTags | dolfinx.fem.Function] = field( default_factory=dict )
[docs] def save(self, path: str | Path) -> None: """Save the geometry to a file using adios4dolfinx. Parameters ---------- path : str | Path The path to the file where the geometry will be saved. The file will be created if it does not exist, or overwritten if it does. """ path = Path(path) save_geometry( path=path, mesh=self.mesh, markers=self.markers, cfun=self.cfun, ffun=self.ffun, efun=self.efun, vfun=self.vfun, f0=self.f0, s0=self.s0, n0=self.n0, additional_data=self.additional_data, )
[docs] def save_folder(self, folder: str | Path) -> None: """Save the geometry to a folder containing mesh and markers files. Parameters ---------- folder : str | Path The path to the folder where the geometry will be saved. The folder will be created if it does not exist. """ comm = self.mesh.comm folder = Path(folder) folder.mkdir(parents=True, exist_ok=True) utils.save_mesh_to_xdmf( comm=comm, fname=folder / "mesh.xdmf", mesh=self.mesh, ct=self.cfun, ft=self.ffun, et=self.efun, vt=self.vfun, ) if comm.rank == 0: (folder / "markers.json").write_text(json.dumps(self.markers, indent=4)) (folder / "info.json").write_text(json.dumps(self.info, indent=4)) save_geometry( path=folder / "geometry.bp", mesh=self.mesh, markers=self.markers, cfun=self.cfun, ffun=self.ffun, efun=self.efun, vfun=self.vfun, f0=self.f0, s0=self.s0, n0=self.n0, additional_data=self.additional_data, ) logger.info(f"Geometry saved to {folder}")
@property def dx(self): """Volume measure for the mesh using the cell function `cfun` if it exists as subdomain data. """ return ufl.Measure("dx", domain=self.mesh, subdomain_data=self.cfun) @property def ds(self): """Surface measure for the mesh using the facet function `ffun` if it exists as subdomain data. """ return ufl.Measure("ds", domain=self.mesh, subdomain_data=self.ffun) @property def facet_normal(self) -> ufl.FacetNormal: """Facet normal vector for the mesh.""" return ufl.FacetNormal(self.mesh)
[docs] def refine( self, n=1, outdir: Path | None = None, ) -> "Geometry": """ Refine the mesh and transfer the meshtags to new geometry. Also regenerate fibers if `self.info` is found. If `self.info` is not found, it currently raises a NotImplementedError, however fiber could be interpolated from the old mesh to the new mesh but this will result in a loss of information about the fiber orientation. Parameters ---------- n : int, optional Number of times to refine the mesh, by default 1 outdir : Path | None, optional Output directory to save the refined mesh and meshtags, by default None in which case the mesh is not saved. Returns ------- Geometry A new Geometry object with the refined mesh and updated meshtags. Raises ------ NotImplementedError If `self.info` is not found, indicating that fiber interpolation after refinement is not implemented yet. """ mesh = self.mesh cfun = self.cfun ffun = self.ffun for _ in range(n): new_mesh, parent_cell, parent_facet = dolfinx.mesh.refine( mesh, partitioner=None, option=dolfinx.mesh.RefinementOption.parent_cell_and_facet, ) new_mesh.name = mesh.name mesh = new_mesh new_mesh.topology.create_entities(1) new_mesh.topology.create_connectivity(2, 3) if cfun is not None: new_cfun = dolfinx.mesh.transfer_meshtag(cfun, new_mesh, parent_cell, parent_facet) new_cfun.name = cfun.name cfun = new_cfun else: new_cfun = None if ffun is not None: new_ffun = dolfinx.mesh.transfer_meshtag(ffun, new_mesh, parent_cell, parent_facet) new_ffun.name = ffun.name ffun = new_ffun else: new_ffun = None if outdir is not None: outdir = Path(outdir) outdir.mkdir(parents=True, exist_ok=True) with dolfinx.io.XDMFFile(new_mesh.comm, outdir / "mesh.xdmf", "w") as xdmf: xdmf.write_mesh(new_mesh) if cfun is not None: xdmf.write_meshtags( cfun, new_mesh.geometry, geometry_xpath=f"/Xdmf/Domain/Grid[@Name='{new_mesh.name}']/Geometry", ) if ffun is not None: mesh.topology.create_connectivity(2, 3) xdmf.write_meshtags( ffun, new_mesh.geometry, geometry_xpath=f"/Xdmf/Domain/Grid[@Name='{new_mesh.name}']/Geometry", ) if self.info is not None: info = self.info.copy() info["refinement"] = n info.pop("outdir", None) if outdir is not None: info["outdir"] = str(outdir) from .fibers import generate_fibers f0, s0, n0 = generate_fibers(mesh=new_mesh, ffun=new_ffun, markers=self.markers, **info) else: info = None # Interpolate fibers raise NotImplementedError( "Interpolating fibers after refinement is not implemented yet." ) return Geometry( mesh=new_mesh, markers=self.markers, cfun=new_cfun, ffun=new_ffun, f0=f0, s0=s0, n0=n0, )
[docs] @classmethod def from_file( cls, comm: MPI.Intracomm, path: str | Path, ghost_mode: dolfinx.mesh.GhostMode = dolfinx.mesh.GhostMode.none, ) -> "Geometry": """Read geometry from a file using adios4dolfinx. Parameters ---------- comm : MPI.Intracomm The MPI communicator to use for reading the mesh. path : str | Path The path to the file containing the geometry data. ghost_mode : dolfinx.mesh.GhostMode, optional The ghost mode to use when reading the mesh, by default dolfinx.mesh.GhostMode.none. Note that if you need to refine the mesh later, you should use dolfinx.mesh.GhostMode.none. Returns ------- Geometry An instance of the Geometry class containing the mesh, markers, and functions. """ path = Path(path) if not path.exists(): raise ValueError(f"File {path} does not exist") if markers_path(path).exists(): if comm.rank == 0: markers = json.loads(markers_path(path).read_text()) else: markers = {} markers = comm.bcast(markers, root=0) else: markers = {} if info_path(path).exists(): if comm.rank == 0: info = json.loads(info_path(path).read_text()) else: info = {} info = comm.bcast(info, root=0) else: info = {} mesh = adios4dolfinx.read_mesh( comm=comm, filename=path, ghost_mode=ghost_mode, ) # markers = adios4dolfinx.read_attributes(comm=comm, filename=path, name="markers") tags = {} for name, meshtag_name in ( ("cfun", "Cell tags"), ("ffun", "Facet tags"), ("efun", "Edge tags"), ("vfun", "Vertex tags"), ): try: tags[name] = adios4dolfinx.read_meshtags( mesh=mesh, meshtag_name=meshtag_name, filename=path ) except (KeyError, IndexError): logger.debug(f"{name} not found in {path}") tags[name] = None functions = {} # if function_space_data is None: # function_space_data = adios4dolfinx.read_attributes( # comm=comm, filename=path, name="function_space" # ) if microstructure_path(path).exists(): if comm.rank == 0: function_space_data = json.loads(microstructure_path(path).read_text()) else: function_space_data = {} function_space_data = comm.bcast(function_space_data, root=0) else: function_space_data = {} assert isinstance(function_space_data, dict), "function_space_data must be a dictionary" for name, el in function_space_data.items(): V = utils.array2functionspace(mesh, tuple(el)) f = dolfinx.fem.Function(V, name=name) try: adios4dolfinx.read_function(u=f, filename=path, name=name) except KeyError: continue else: functions[name] = f additional_data = load_additional_data(path=path, mesh=mesh) return cls( mesh=mesh, markers=markers, info=info, additional_data=additional_data, **functions, **tags, )
[docs] @classmethod def from_folder( cls, comm: MPI.Intracomm, folder: str | Path, ghost_mode: dolfinx.mesh.GhostMode = dolfinx.mesh.GhostMode.none, ) -> "Geometry": """Read geometry from a folder containing mesh and markers files. Parameters ---------- comm : MPI.Intracomm The MPI communicator to use for reading the mesh and markers. folder : str | Path The path to the folder containing the geometry data. ghost_mode : dolfinx.mesh.GhostMode, optional The ghost mode to use when reading the mesh, by default dolfinx.mesh.GhostMode.none. Note that if you need to refine the mesh later, you should use dolfinx.mesh.GhostMode.none. Returns ------- Geometry An instance of the Geometry class containing the mesh, markers, and functions. Raises ------ ValueError If the required mesh file is not found in the specified folder. """ folder = Path(folder) logger.info(f"Reading geometry from {folder}") if not folder.exists(): raise ValueError(f"Folder {folder} does not exist") if (folder / "geometry.bp").exists(): logger.debug("Reading geometry from geometry.bp") return cls.from_file(comm=comm, path=folder / "geometry.bp", ghost_mode=ghost_mode) logger.warning( "geometry.bp not found, reading mesh and microstructure separately. " "This may lead to inconsistent dof numbering in the mesh and microstructure. " "Try deleting the folder and regenerating the mesh." ) # Read mesh if (folder / "mesh.xdmf").exists(): logger.debug("Reading mesh") mesh, tags = utils.read_mesh( comm=comm, filename=folder / "mesh.xdmf", ghost_mode=ghost_mode ) else: raise ValueError("No mesh file found") # Read markers if (folder / "markers.json").exists(): logger.debug("Reading markers") if comm.rank == 0: markers = json.loads((folder / "markers.json").read_text()) else: markers = {} markers = comm.bcast(markers, root=0) else: markers = {} if (folder / "microstructure.json").exists(): if comm.rank == 0: microstructure = json.loads((folder / "microstructure.json").read_text()) else: microstructure = {} microstructure = comm.bcast(microstructure, root=0) else: microstructure = {} functions = {} microstructure_path = folder / "microstructure.bp" if microstructure_path.exists(): logger.debug("Reading microstructure") # function_space = adios4dolfinx.read_attributes( # comm=MPI.COMM_WORLD, filename=microstructure_path, name="function_space" # ) for name, el in microstructure.items(): logger.debug(f"Reading {name}") V = utils.array2functionspace(mesh, tuple(el)) f = dolfinx.fem.Function(V, name=name) try: adios4dolfinx.read_function(u=f, filename=microstructure_path, name=name) except KeyError: continue else: functions[name] = f if (folder / "info.json").exists(): logger.debug("Reading info") if comm.rank == 0: info = json.loads((folder / "info.json").read_text()) else: info = {} info = comm.bcast(info, root=0) else: info = {} return cls( mesh=mesh, markers=markers, info=info, **functions, **tags, )
[docs] def rotate(self, target_normal, base_marker): """Rotate the geometry so that the base normal aligns with the target normal. Parameters ---------- target_normal : np.ndarray The target normal vector to align the base normal with. base_marker : str The marker for the base of the geometry. Returns ------- Geometry The rotated Geometry object. Only returned if inplace is False. """ from . import utils msg = ( f"Base marker '{base_marker}' not found in markers. " f"Available markers: {list(self.markers.keys())}" ) assert base_marker in self.markers, msg fields = [f for f in (self.f0, self.s0, self.n0) if f is not None] mesh_rotated, R_matrix, fields_rotated = utils.rotate_geometry_and_fields( mesh=self.mesh, ffun=self.ffun, base_marker=self.markers[base_marker][0], target_normal=target_normal, fields=fields, ) if self.f0 is not None: f0, s0, n0 = fields_rotated else: f0 = None s0 = None n0 = None self.info["rotation_matrix"] = R_matrix.tolist() return Geometry( mesh=mesh_rotated, markers=self.markers, cfun=self.cfun, ffun=self.ffun, efun=self.efun, vfun=self.vfun, f0=f0, s0=s0, n0=n0, info=self.info, )