Source code for simcardems.save_load_functions

import contextlib
import warnings
from pathlib import Path
from typing import Any
from typing import Dict
from typing import Optional
from typing import Type
from typing import Union

import cbcbeat
import dolfin
import numpy as np
from dolfin import FiniteElement  # noqa: F401
from dolfin import MixedElement  # noqa: F401
from dolfin import tetrahedron  # noqa: F401
from dolfin import VectorElement  # noqa: F401

from . import geometry
from . import mechanics_model
from . import utils
from .config import Config


logger = utils.getLogger(__name__)


[docs] def vs_functions_to_dict(vs, state_names): return {name: utils.sub_function(vs, index) for index, name in enumerate(state_names)}
[docs] @contextlib.contextmanager def h5pyfile(h5name, filemode="r", force_serial: bool = False, comm=None): import h5py from mpi4py import MPI if comm is None: comm = dolfin.MPI.comm_world if h5py.h5.get_config().mpi and dolfin.MPI.size(comm) > 1 and not force_serial: h5file = h5py.File(h5name, filemode, driver="mpio", comm=MPI.COMM_WORLD) else: if dolfin.MPI.size(comm) > 1 and not force_serial: warnings.warn("h5py is not installed with MPI support") h5file = h5py.File(h5name, filemode) yield h5file h5file.close()
[docs] def dict_to_h5(data, h5name, h5group, use_attrs: bool = True, comm=None): if comm is None: comm = dolfin.MPI.comm_world if comm.rank == 0: with h5pyfile(h5name, "a", force_serial=True) as h5file: if h5group == "": group = h5file else: group = h5file.create_group(h5group) for k, v in data.items(): if use_attrs: group.attrs[k] = v else: try: group.create_dataset(k, data=v) except OSError: logger.warning( f"Unable to save key {k} with data {v} in {h5name}/{h5group}", ) # Synchronize dolfin.MPI.barrier(comm)
[docs] def decode(x): if isinstance(x, bytes): return x.decode() elif isinstance(x, list): return [xi for xi in map(decode, x)] return x
[docs] def h5_to_dict(h5group, use_attrs: bool = True): import h5py if use_attrs: d: Dict[str, Any] = {} for k, v in dict(h5group.attrs).items(): if isinstance(v, np.int64): d[k] = int(v) elif isinstance(v, np.float64): d[k] = float(v) else: d[k] = v return d group = {} for key, value in h5group.items(): if isinstance(value, h5py.Dataset): v = decode(value[...].tolist()) group[key] = v elif isinstance(value, h5py.Group): group[key] = h5_to_dict(h5group[key]) else: raise ValueError(f"Unknown value type {type(value)} for key {key}.") return group
[docs] def check_file_exists(h5_filename, raise_on_false=True): path = Path(h5_filename) if not path.is_file(): if raise_on_false: raise FileNotFoundError(f"Path {h5_filename} does not exist") return False return True
[docs] def group_in_file(h5_filename, h5group): check_file_exists(h5_filename) exists = False with h5pyfile(h5_filename) as h5file: if h5group in h5file: exists = True return exists
[docs] def mech_heart_to_bnd_cond(mech_heart: mechanics_model.MechanicsProblem): if type(mech_heart).__name__ == "RigidMotionProblem": return "rigid" return "dirichlet"
[docs] def serialize_dict(d): new_d = {} for k, v in d.items(): if isinstance(v, Path): new_d[k] = v.as_posix() elif v is None: # Skip it continue elif isinstance(v, dict): new_d[k] = serialize_dict(v) elif isinstance(v, dolfin.Constant): try: new_d[k] = float(v) except Exception: continue else: new_d[k] = v return new_d
[docs] def save_state( path, config: Config, geo: geometry.BaseGeometry, state_params: Optional[Dict[str, float]] = None, ): path = Path(path) utils.remove_file(path, comm=geo.comm()) logger.info(f"Save state to {path}") geo.dump(path) logger.debug("Save using dolfin.HDF5File") logger.debug("Save using h5py") dict_to_h5(serialize_dict(config.as_dict()), path, "config", comm=geo.comm()) if state_params is None: state_params = {} dict_to_h5(serialize_dict(state_params), path, "state_params", comm=geo.comm())
[docs] def load_state( path: Union[str, Path], drug_factors_file: Union[str, Path] = "", popu_factors_file: Union[str, Path] = "", disease_state="healthy", PCL: float = 1000, ): logger.debug(f"Load state from path {path}") path = Path(path) if not path.is_file(): raise FileNotFoundError(f"File {path} does not exist") logger.debug("Open file with h5py") with h5pyfile(path) as h5file: config = Config(**h5_to_dict(h5file["config"])) if config.coupling_type == "explicit_ORdmm_Land": from .models.explicit_ORdmm_Land import EMCoupling elif config.coupling_type == "fully_coupled_ORdmm_Land": from .models.fully_coupled_ORdmm_Land import EMCoupling # type: ignore elif config.coupling_type == "pureEP_ORdmm_Land": from .models.pureEP_ORdmm_Land import EMCoupling # type: ignore elif config.coupling_type == "fully_coupled_Tor_Land": from .models.fully_coupled_Tor_Land import EMCoupling # type: ignore else: raise ValueError(f"Invalid coupling type: {config.coupling_type}") return EMCoupling.from_state( path=path, drug_factors_file=drug_factors_file, popu_factors_file=popu_factors_file, disease_state=disease_state, PCL=PCL, )
[docs] def load_initial_conditions_from_h5( path: Union[str, Path], CellModel: Type[cbcbeat.CardiacCellModel], ): path = Path(path) if not path.is_file(): raise FileNotFoundError(f"File {path} does not exist") logger.info(f"Loading initial conditions from {path}") with h5pyfile(path) as h5file: vs_signature = h5file["ep"]["vs"].attrs["signature"].decode() mesh = dolfin.Mesh() with dolfin.HDF5File(mesh.mpi_comm(), path.as_posix(), "r") as h5file: h5file.read(mesh, "/mesh", False) VS = dolfin.FunctionSpace(mesh, eval(vs_signature)) vs = dolfin.Function(VS) with dolfin.HDF5File(mesh.mpi_comm(), path.as_posix(), "r") as h5file: h5file.read(vs, "/ep/vs") return vs_functions_to_dict( vs, state_names=CellModel.default_initial_conditions().keys(), )