import json
import logging
import warnings
from enum import auto
from enum import Enum
from pathlib import Path
from typing import Any
from typing import Dict
from typing import NamedTuple
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union
import dolfin
try:
from ufl_legacy import FiniteElement # noqa: F401
from ufl_legacy import tetrahedron # noqa: F401
from ufl_legacy import Cell # noqa: F401
from ufl_legacy import VectorElement # noqa: F401
except ImportError:
from ufl import FiniteElement # noqa: F401
from ufl import tetrahedron # noqa: F401
from ufl import Cell # noqa: F401
from ufl import VectorElement # noqa: F401
from .viz import dict_to_h5
from .viz import h5_to_dict
from .viz import h5pyfile
logger = logging.getLogger(__name__)
[docs]
class MeshTypes(Enum):
slab = auto()
lv_ellipsoid = auto()
biv_ellipsoid = auto()
[docs]
class H5Paths(str, Enum):
mesh = "/mesh"
cfun = "/meshfunctions/cfun"
ffun = "/meshfunctions/ffun"
efun = "/meshfunctions/efun"
vfun = "/meshfunctions/vfun"
f0 = "/microstructure/f0"
s0 = "/microstructure/s0"
n0 = "/microstructure/n0"
markers = "/markers"
info = "/info"
[docs]
class FileNames(str, Enum):
mesh = "mesh.xdmf"
cfun = "tetra_mesh.xdmf:name_to_read"
ffun = "triangle_mesh.xdmf:name_to_read"
efun = "line_mesh.xdmf:name_to_read"
vfun = "vertex_mesh.xdmf:name_to_read"
f0 = "microstructure.h5:f0"
s0 = "microstructure.h5:s0"
n0 = "microstructure.h5:n0"
markers = "markers.json"
info = "info.json"
[docs]
class Microstructure(NamedTuple):
f0: dolfin.Function
s0: dolfin.Function
n0: dolfin.Function
def load_microstructure(mesh, microstructure_path) -> Microstructure:
# Get signature
with h5pyfile(microstructure_path, comm=mesh.mpi_comm()) as h5file:
signature = h5file["f0"].attrs["signature"].decode()
sig = eval(signature)
sig._quad_scheme = "default"
V = dolfin.FunctionSpace(mesh, sig)
f0 = dolfin.Function(V)
s0 = dolfin.Function(V)
n0 = dolfin.Function(V)
with dolfin.HDF5File(
mesh.mpi_comm(),
microstructure_path.as_posix(),
"r",
) as h5file:
h5file.read(f0, "f0")
h5file.read(s0, "s0")
h5file.read(n0, "n0")
return Microstructure(f0=f0, s0=s0, n0=n0)
def read_signature(fname, group):
try:
with h5pyfile(fname) as h5file:
signature = h5file[group].attrs["signature"].decode()
except Exception:
return None
return signature
[docs]
class H5Path(NamedTuple):
h5group: str = ""
fname: str = ""
mesh_key: str = ""
is_dolfin: bool = True
is_mesh: bool = False
is_meshfunction: bool = False
is_function: bool = False
dim: int = -1
def to_dict(self):
return self._asdict()
def find_schema_in_folder(folder: Path) -> Optional[Dict[str, H5Path]]:
# First look for a schema file
for f in folder.iterdir():
if f.suffix == ".json":
try:
schema = load_schema(f)
except Exception:
pass
else:
return schema
return None
def load_schema(path: Path) -> Optional[Dict[str, H5Path]]:
if not path.is_file():
return None
data = json.loads(Path(path).read_text())
# Remove invalid keys
def filter_values(d: Dict[str, Any]):
return {k: v for k, v in d.items() if k in H5Path._fields}
return {k: H5Path(**filter_values(v)) for k, v in data.items()}
def extract_mesh_keys(d: Dict[str, H5Path]) -> Sequence[Tuple[str, str]]:
required_mesh_keys = []
for name, p in d.items():
if p.is_mesh:
continue
if not p.is_dolfin:
continue
required_mesh_keys.append((name, p.mesh_key))
return list(required_mesh_keys)
def read_xdmf(fname, obj, group=None):
try:
with dolfin.XDMFFile(Path(fname).as_posix()) as f:
if group is None:
f.read(obj)
else:
f.read(obj, group)
except RuntimeError as e:
msg = f"Unable to read {fname} and for group {group}. Got:\n" + e.args[0]
warnings.warn(msg, category=UserWarning, stacklevel=3)
def read_h5(fname, comm, obj, group=None):
try:
with dolfin.HDF5File(comm, Path(fname).as_posix(), "r") as f:
if group is None:
f.read(obj)
else:
f.read(obj, group)
except RuntimeError as e:
msg = f"Unable to read {fname} and for group {group}. Got:\n" + e.args[0]
warnings.warn(msg, category=UserWarning, stacklevel=3)
def read(
fname: Path,
obj: Any,
group: Optional[str],
current_mesh: dolfin.Mesh,
) -> None:
if fname.suffix == ".xdmf":
read_xdmf(fname, obj, group)
elif fname.suffix == ".h5":
read_h5(fname, current_mesh.mpi_comm(), obj, group)
else:
raise RuntimeError(f"Unknown file format for {fname}")
def extract_fname_group(
fname: str,
folder: Union[Path, str] = ".",
) -> Tuple[Path, Optional[str]]:
fg = fname.split(":")
if len(fg) == 1:
return Path(folder) / fg[0], None
if len(fg) > 2:
print(f"Warning: Invalid fname {fname}")
return Path(folder) / fg[0], fg[1]
def dump_schema(path: Union[Path, str], schema: Dict[str, H5Path], comm=None) -> None:
data = {k: v._asdict() for k, v in schema.items()}
logger.debug(f"Dump {data} to {path}")
if comm is None:
comm = dolfin.MPI.comm_world
if comm.rank == 0:
Path(path).write_text(
json.dumps(data, indent=2),
)
def global_check_condition(cond, comm):
global_cond = False
if comm.rank == 0:
global_cond = cond
global_cond = comm.bcast(global_cond, root=0)
return global_cond
def populate_data(schema, groups, comm, path, data, signatures):
required_mesh_keys = extract_mesh_keys(schema)
for name, mesh_key in required_mesh_keys:
if not groups.get(mesh_key, False):
msg = f"Missing mesh key '{mesh_key}' for key {name} in {path}"
raise RuntimeError(msg)
with dolfin.HDF5File(comm, path.as_posix(), "r") as h5file:
# Meshes
for name, p in schema.items():
if p.is_mesh and groups.get(name, False):
data[name] = dolfin.Mesh(comm)
h5file.read(data[name], p.h5group, True)
for name, p in schema.items():
if p.is_meshfunction and groups.get(name, False):
current_mesh = data[p.mesh_key]
data[name] = dolfin.MeshFunction("size_t", current_mesh, p.dim)
continue
if p.is_function and groups.get(name, False):
current_mesh = data[p.mesh_key]
signature = signatures.get(name)
if signature is None:
continue
sig = eval(signature)
sig._quad_scheme = "default"
V = dolfin.FunctionSpace(current_mesh, sig)
data[name] = dolfin.Function(V)
continue
dolfin.MPI.barrier(comm)
with dolfin.HDF5File(comm, path.as_posix(), "r") as h5file:
# Meshfunctions
for name, p in schema.items():
if p.is_meshfunction and groups.get(name, False):
h5file.read(data[name], p.h5group)
data[name].array()[data[name].array() == 2**64 - 1] = 0
continue
if p.is_function and groups.get(name, False):
h5file.read(data[name], p.h5group)
continue
class Geometry:
def __init__(self, **kwargs):
self._fields = ["schema"]
schema = kwargs.pop("schema")
if schema is None:
schema = type(self).default_schema()
self.schema = {}
missing_schema_entries = []
for k, v in kwargs.items():
s = schema.get(k)
if s is None:
missing_schema_entries.append(k)
continue
self._fields.append(k)
setattr(self, k, v)
self.schema[k] = s
self.schema = dolfin.MPI.comm_world.bcast(self.schema, root=0)
missing_schema_entries = dolfin.MPI.comm_world.bcast(
missing_schema_entries,
root=0,
)
self._fields = dolfin.MPI.comm_world.bcast(self._fields, root=0)
if len(missing_schema_entries) > 0:
msg = (
f"Missing schema entry for keys {missing_schema_entries!r}. "
"Objects will not be set as geometry attributes"
)
warnings.warn(UserWarning(msg), stacklevel=2)
def __repr__(self) -> str:
fields = ", ".join(self._fields)
return f"{type(self).__name__}({fields})"
@property
def comm(self):
if hasattr(self, "mesh") and hasattr(self.mesh, "mpi_comm"):
return self.mesh.mpi_comm()
return dolfin.MPI.comm_world
@staticmethod
def default_schema() -> Dict[str, H5Path]:
return {
"mesh": H5Path(
h5group=H5Paths.mesh.value,
is_mesh=True,
fname=FileNames.mesh.value,
),
"cfun": H5Path(
h5group=H5Paths.cfun.value,
is_meshfunction=True,
dim=3,
mesh_key="mesh",
fname=FileNames.cfun.value,
),
"ffun": H5Path(
h5group=H5Paths.ffun.value,
is_meshfunction=True,
dim=2,
mesh_key="mesh",
fname=FileNames.ffun.value,
),
"efun": H5Path(
h5group=H5Paths.efun.value,
is_meshfunction=True,
dim=1,
mesh_key="mesh",
fname=FileNames.efun.value,
),
"vfun": H5Path(
h5group=H5Paths.vfun.value,
is_meshfunction=True,
dim=0,
mesh_key="mesh",
fname=FileNames.vfun.value,
),
"f0": H5Path(
h5group=H5Paths.f0.value,
is_function=True,
mesh_key="mesh",
fname=FileNames.f0.value,
),
"s0": H5Path(
h5group=H5Paths.s0.value,
is_function=True,
mesh_key="mesh",
fname=FileNames.s0.value,
),
"n0": H5Path(
h5group=H5Paths.n0.value,
is_function=True,
mesh_key="mesh",
fname=FileNames.n0.value,
),
"info": H5Path(
h5group=H5Paths.info.value,
is_dolfin=False,
fname=FileNames.info.value,
),
"markers": H5Path(
h5group=H5Paths.markers.value,
is_dolfin=False,
fname=FileNames.markers.value,
),
}
def save(
self,
fname: Union[str, Path],
schema_path: Optional[Union[str, Path]] = None,
unlink: bool = True,
) -> None:
path = Path(fname).with_suffix(".h5")
logger.debug(f"Save geometry to {path}")
dolfin.MPI.barrier(self.comm)
file_exist = global_check_condition(path.is_file(), self.comm)
file_mode = "w"
if file_exist:
if unlink:
if self.comm.rank == 0:
path.unlink()
else:
file_mode = "a"
logger.debug(f"File already exists: {file_exist}. File mode {file_mode}")
dolfin.MPI.barrier(self.comm)
logger.debug("Save dolfin object")
with dolfin.HDF5File(
self.comm,
path.as_posix(),
file_mode,
) as h5file:
for name, p in self.schema.items():
obj = getattr(self, name, None)
if obj is not None and p.is_dolfin:
if p.h5group == "":
raise RuntimeError("Cannot write object with empty path")
h5file.write(obj, p.h5group)
logger.debug("Save schema to h5")
for name, p in self.schema.items():
obj = getattr(self, name, None)
if obj is not None and not p.is_dolfin:
if p.h5group == "":
raise RuntimeError("Cannot write object with empty path")
dict_to_h5(obj, path, p.h5group, comm=self.comm)
if schema_path is None:
schema_path = path.with_suffix(".json")
logger.debug(f"Save schema {schema_path}")
dump_schema(schema_path, schema=self.schema)
dolfin.MPI.barrier(self.comm)
logger.debug(f"Geometry saved to path {path} and schema to {schema_path}")
@classmethod
def from_file(
cls,
fname: Union[str, Path],
schema_path: Optional[Union[str, Path]] = None,
schema: Optional[Dict[str, H5Path]] = None,
comm=None,
):
path = Path(fname)
if not path.is_file():
msg = f"File {path} does not exist"
raise FileNotFoundError(msg)
if schema_path is not None:
schema = load_schema(Path(schema_path))
if schema is None:
schema = cls.default_schema()
if comm is None:
comm = dolfin.MPI.comm_world
groups = {}
data = {}
signatures = {}
import h5py
if comm.rank == 0:
with h5py.File(path, "r") as h5file:
for name, p in schema.items():
if p.h5group == "":
continue
groups[name] = p.h5group in h5file
if not p.is_dolfin:
if groups[name]:
data[name] = h5_to_dict(h5file[p.h5group])
else:
data[name] = None
if p.is_function and groups[name]:
signatures[name] = h5file[p.h5group].attrs["signature"].decode()
dolfin.MPI.barrier(comm)
signatures = comm.bcast(signatures, root=0)
groups = comm.bcast(groups, root=0)
data = comm.bcast(data, root=0)
dolfin.MPI.barrier(comm)
populate_data(schema, groups, comm, path, data, signatures)
return cls(**data, schema=schema)
@classmethod
def from_folder(cls, folder, schema: Optional[Dict[str, H5Path]] = None):
folder = Path(folder)
if schema is None:
if (schema := find_schema_in_folder(folder)) is None:
schema = cls.default_schema()
assert schema is not None
# Load mesh first
data = {}
for name, p in schema.items():
if p.fname == "":
continue
if not p.is_mesh:
continue
fname, group = extract_fname_group(p.fname, folder)
if not fname.is_file():
continue
data[name] = current_mesh = dolfin.Mesh()
read(fname, data[name], group=group, current_mesh=current_mesh)
# Now load rest of the dolfin stuff
for name, p in schema.items():
if p.fname == "":
continue
if p.is_meshfunction:
fname, group = extract_fname_group(p.fname, folder)
if not fname.is_file():
continue
current_mesh = data[p.mesh_key]
val = dolfin.MeshValueCollection("size_t", current_mesh, p.dim)
read(fname, val, group=group, current_mesh=current_mesh)
data[name] = dolfin.MeshFunction("size_t", current_mesh, val)
data[name].array()[data[name].array() == 2**64 - 1] = 0
continue
if p.is_function:
fname, group = extract_fname_group(p.fname, folder)
if not fname.is_file():
continue
signature = read_signature(fname, group)
if signature is None:
continue
current_mesh = data[p.mesh_key]
sig = eval(signature)
sig._quad_scheme = "default"
V = dolfin.FunctionSpace(current_mesh, sig)
data[name] = dolfin.Function(V)
read(fname, data[name], group=group, current_mesh=current_mesh)
continue
# Finally read json
for name, p in schema.items():
if p.is_dolfin:
continue
fname = folder / p.fname
if fname.suffix == ".json":
data[name] = json.loads(fname.read_text())
else:
raise RuntimeError(f"Unknown file format for {fname}")
return cls(**data, schema=schema)