Source code for ap_features.features

import logging
from enum import Enum
from collections import namedtuple
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple

import numpy as np
from scipy.interpolate import UnivariateSpline

from . import _numba
from . import utils
from .utils import _check_factor
from .utils import Array
from .utils import Backend
from .utils import numpyfy

logger = logging.getLogger(__name__)

Upstroke = namedtuple(
    "Upstroke",
    ["index", "value", "upstroke", "dt", "x0", "sigmoid", "time_APD20_to_APD80"],
)
APDCoords = namedtuple("APDCoords", ["x1", "x2", "y1", "y2", "yth"])


[docs] class UnequalLengthError(RuntimeError): pass
[docs] def triangulation( V: Array, t: Array, low: int = 30, high: int = 80, v_r: Optional[float] = None, v_max: Optional[float] = None, use_spline: bool = True, backend: Backend = Backend.python, ) -> float: r"""Compute the triangulation which is the last intersection of the :math:`\mathrm{APD} \; p_{\mathrm{high}}` line minus the last intersection of the :math:`\mathrm{APD} \; p_{\mathrm{low}}` line Parameters ---------- V : Array The signal t : Array The time stamps low : int, optional Lower APD value, by default 30 high : int, optional Higher APD value, by default 80 v_r : Optional[float], optional The resting value, by default None. If None the minimum value is chosen v_max : Optional[float], optional The maximum value, by default None. If None the maximum value is chosen use_spline : bool, optional Use spline interpolation, by default True. Only applicable for python Backend. backend : utils.Backend, optional Which backend to use by default Backend.python. Choices, 'python', 'numba' Returns ------- float The triangulation """ apd_low_point = apd_point( factor=low, V=V, t=t, v_r=v_r, v_max=v_max, use_spline=use_spline, ) apd_high_point = apd_point( factor=high, V=V, t=t, v_r=v_r, v_max=v_max, use_spline=use_spline, ) tri = apd_high_point[-1] - apd_low_point[-1] if tri < 0: tri = np.nan return tri
[docs] def apd( factor: float, V: Array, t: Array, v_r: Optional[float] = None, v_max: Optional[float] = None, use_spline: bool = True, backend: Backend = Backend.python, ) -> float: r"""Return the action potential duration at the given factor repolarization, so that factor = 0 would be zero, and factor = 1 given the time from triggering to potential is down to resting potential. Parameters ---------- factor : int The APD factor between 0 and 100 V : Array The signal t : Array The time stamps v_r : Optional[float], optional The resting value, by default None. If None the minimum value is chosen v_max : Optional[float], optional The maximum value, by default None. If None the maximum value is chosen use_spline : bool, optional Use spline interpolation, by default True. Only applicable for python Backend. backend : utils.Backend, optional Which backend to use by default Backend.python. Choices, 'python', 'numba' Returns ------- float The action potential duration .. Note:: If the signal has more intersection than two with the APX_X line, then the first and second occurrence will be used. Raises ------ ValueError If factor is outside the range of (0, 100) ValueError If shape of x and y does not match RunTimeError In unable to compute APD Notes ----- If the signal represent voltage, we would like to compute the action potential duration at a given factor of the signal. Formally, Let :math:`p \in (0, 100)`, and define .. math:: y_p = \tilde{y} - \left(1- \frac{p}{100} \right). Now let :math:`\mathcal{T}` denote the set of solutions of :math:`y_p = 0`, i.e .. math:: y_p(t) = 0, \;\; \forall t \in \mathcal{T} Then there are three different scenarios; :math:`\mathcal{T}` contains one, two or more than two elements. Note that :math:`\mathcal{T}` cannot be empty (because of the intermediate value theorem). The only valid scenario is the case when :math:`\mathcal{T}` contains two (or more) elements. In this case we define .. math:: \mathrm{APD} \; p = \max \mathcal{T} - \min \mathcal{T} for :math:`p < 0.5` and .. math:: \mathrm{APD} \; p = \min \mathcal{T} / \min \mathcal{T} - \min \mathcal{T} """ "" assert backend in Backend.__members__ if isinstance(factor, (float, np.float64)) and 0 < factor < 1: # Factor should be multiplied with 100 factor = int(factor * 100) _check_factor(factor) y = np.array(V) x = np.array(t) if y.shape != x.shape: raise ValueError("The signal and time are not of same length") if backend == Backend.python: try: x1, x2 = apd_point(factor=factor, V=y, t=x, v_r=v_r, v_max=v_max, use_spline=use_spline) except RuntimeError: # Return a number that indicate that something went wrong return -1 return x2 - x1 else: return _numba.apd(V=y, T=x, factor=factor)
[docs] def sign_change(y: np.ndarray) -> np.ndarray: s = np.sign(y) # If we have many zero values just use these instead num_zeros = len(s[s == 0]) if num_zeros > 1: return np.where(s == 0)[0] return np.where(np.diff(np.sign(y)))[0]
[docs] def apd_points( factor: float, V: Array, t: Array, v_r: Optional[float] = None, v_max: Optional[float] = None, use_spline: bool = True, ) -> List[float]: """Returns the indices of all the intersections of the APD p line Parameters ---------- factor : int The APD line V : np.ndarray The signal t : np.ndarray The time stamps v_r : Optional[float], optional The resting value, by default None. If None the minimum value is chosen v_max : Optional[float], optional The maximum value, by default None. If None the maximum value is chosen use_spline : bool, optional Use spline interpolation or not, by default True Returns ------- List[float] All indices of points intersecting the APD p line Raises ------ RuntimeError If spline interpolation fails """ _check_factor(factor) y = utils.normalize_signal(V, v_r=v_r, v_max=v_max) - (1 - factor / 100) if use_spline: try: f = UnivariateSpline(t, y, s=0, k=3) except Exception as ex: msg = ( f"Unable to compute APD {factor * 100} using spline interpolation. " f"Please change your settings, {ex}" ) logger.warning(msg) raise RuntimeError(msg) inds = f.roots() if len(inds) == 1: # Safety guard for strange interpolations inds = t[sign_change(y)] else: inds = t[sign_change(y)] return inds
[docs] class APDPointStrategy(str, Enum): first_last = "first_last" big_diff_plus_one = "big_diff_plus_one" big_diff_last = "big_diff_last"
[docs] def apd_point( factor: float, V: Array, t: Array, v_r: Optional[float] = None, v_max: Optional[float] = None, use_spline=True, strategy: APDPointStrategy = APDPointStrategy.big_diff_plus_one, ) -> Tuple[float, float]: """Returns exactly two intersections of the APD p line. Parameters ---------- factor : int The APD line V : np.ndarray The signal t : np.ndarray The time stamps v_r : Optional[float], optional The resting value, by default None. If None the minimum value is chosen v_max : Optional[float], optional The maximum value, by default None. If None the maximum value is chosen use_spline : bool, optional Use spline interpolation or not, by default True strategy : APDPointStrategy Strategy for picking two apd points, either :code:`first_last`, :code:`big_diff_plus_one` or :code:`big_diff_last`, by default :code:`big_diff_plus_one` Returns ------- Tuple[float, float] Two points corresponding to the first and second intersection of the APD p line Raises ------ RuntimeError If spline interpolation fails Notes ----- Different strategies are used based on the number of intersections. If only two intersections are used, we are done, while if there are fewer then the same value will be returned for both the start and end. If no intersections are found the value 0 will be used. If more than two intersections are found then the neighboring intersections with the largest gap will be returned. """ inds = apd_points(factor=factor, V=V, t=t, v_r=v_r, v_max=v_max, use_spline=use_spline) if len(inds) == 0: logger.warning("Warning: no root was found for APD {}".format(factor)) x1 = x2 = 0.0 if len(inds) == 1: x1 = x2 = inds[0] logger.warning("Warning: only one root was found for APD {}" "".format(factor)) else: if APDPointStrategy[strategy] == APDPointStrategy.big_diff_plus_one: start_index = int(np.argmax(np.diff(inds))) end_index = start_index + 1 elif APDPointStrategy[strategy] == APDPointStrategy.big_diff_last: start_index = int(np.argmax(np.diff(inds))) end_index = -1 else: # first_last start_index = 0 end_index = -1 x1 = sorted(inds)[start_index] x2 = sorted(inds)[end_index] return x1, x2
[docs] def apd_coords( factor: float, V: Array, t: Array, v_r: Optional[float] = None, v_max: Optional[float] = None, use_spline=True, ) -> APDCoords: """Return the coordinates of the start and stop of the APD, and not the duration itself Parameters ---------- factor : int The APD factor between 0 and 100. V : Array The signal t : Array The time stamps v_r : Optional[float], optional The resting value, by default None. If None the minimum value is chosen v_max : Optional[float], optional The maximum value, by default None. If None the maximum value is chosen use_spline : bool, optional Use spline interpolation, by default True. Only applicable for python Backend. Returns ------- APDCoords APD coordinates """ _check_factor(factor) y = np.array(V) x = np.array(t) x1, x2 = apd_point(factor=factor, V=y, t=x, v_r=v_r, v_max=v_max, use_spline=use_spline) g = UnivariateSpline(x, y, s=0) y1 = g(x1) y2 = g(x2) yth = np.min(y) + (1 - factor / 100) * (np.max(y) - np.min(y)) return APDCoords(x1, x2, y1, y2, yth)
[docs] def time_above_apd_line( V: Array, t: Array, factor: float, v_r: Optional[float] = None, v_max: Optional[float] = None, ) -> float: """Compute the amount of time spent above APD p line Parameters ---------- V : Array Signal t : Array Time stamps factor : float The p in APD line v_r : Optional[float], optional Resting value, by default None v_max : Optional[float], optional Maximum value, by default None Returns ------- float Total time spent above the APD p line """ y = utils.normalize_signal(V, v_r=v_r, v_max=v_max) intersections = np.where(np.diff(y >= factor))[0] starts = intersections[::2] ends = intersections[1::2] total_time = 0.0 for start, end in zip(starts, ends): total_time += t[end] - t[start] return total_time
[docs] def tau( x: Array, y: Array, a: float = 0.75, backend: Backend = Backend.python, ) -> float: """ Decay time. Time for the signal amplitude to go from maximum to (1 - a) * 100 % of maximum Parameters ---------- x : Array The time stamps y : Array The signal a : float, optional The value for which you want to estimate the time decay, by default 0.75. If the value is larger than 1.0 it will be divided by 100 backend : utils.Backend, optional Which backend to use by default Backend.python. Choices, 'python', 'numba' Returns ------- float Decay time """ if backend != Backend.python: logger.warning("Method currently only implemented for python backend") if a > 1.0: a /= 100 Y = UnivariateSpline(x, utils.normalize_signal(y) - a, s=0, k=3) t_max = x[int(np.argmax(y))] r = Y.roots() if len(r) >= 2: t_a = r[1] elif len(r) == 1: logger.warning( ("Only one zero was found when computing tau{}. " "Result might be wrong").format( int(a * 100), ), ) t_a = r[0] else: logger.warning( ("No zero found when computing tau{}. " "Return the value of time to peak").format( int(a * 100), ), ) t_a = x[0] return t_a - t_max
[docs] def time_to_peak( y: Array, x: Array, pacing: Optional[Array] = None, backend: Backend = Backend.python, ) -> float: """Computed the time to peak from pacing is triggered to maximum amplitude. Note, if pacing is not provided it will compute the time from the beginning of the trace (which might not be consistent) to the peak. Parameters ---------- y : Array The signal x : Array The time stamps pacing : Optional[Array], optional The pacing amplitude, by default None backend : utils.Backend, optional Which backend to use by default Backend.python. Choices, 'python', 'numba' Returns ------- float Time to peak """ if backend != Backend.python: logger.warning("Method currently only implemented for python backend") # Check some edge cases if len(x) != len(y): raise UnequalLengthError("x and y does not have the same length") if len(x) == 0: return 0 if len(y) == 0: return 0 if pacing is None: return x[int(np.argmax(y))] t_max = x[int(np.argmax(y))] if pacing is None: t_start = x[0] else: try: start_idx = ( next(i for i, p in enumerate(np.diff(np.array(pacing).astype(float))) if p > 0) + 1 ) except StopIteration: start_idx = 0 t_start = x[start_idx] return t_max - t_start
[docs] def upstroke( x: Array, y: Array, a: float = 0.8, backend: Backend = Backend.python, ) -> float: """Compute the time from (1-a)*100 % signal amplitude to peak. For example if if a = 0.8 if will compute the time from the starting value of APD80 to the upstroke. Parameters ---------- x : Array The time stamps y : Array The signal a : float, optional Fraction of signal amplitude, by default 0.8. If the value is larger than 1.0 it will be divided by 100 backend : utils.Backend, optional Which backend to use by default Backend.python. Choices, 'python', 'numba' Returns ------- float The upstroke value Raises ------ ValueError If a is outside the range of (0, 1) """ if backend != Backend.python: logger.warning("Method currently only implemented for python backend") if a > 1.0: a /= 100.0 if not 0 < a < 1: raise ValueError("'a' has to be between 0.0 and 1.0") Y = UnivariateSpline(x, utils.normalize_signal(y) - (1 - a), s=0, k=3) t_max = x[int(np.argmax(y))] r = Y.roots() if len(r) >= 1: if len(r) == 1: logger.warning( ( "Only one zero was found when computing upstroke{}. " "Result might be wrong" ).format(int(a * 100)), ) t_a = r[0] else: logger.warning( ("No zero found when computing upstroke{}. " "Return the value of time to peak").format( int(a * 100), ), ) t_a = x[0] return t_max - t_a
[docs] def beating_frequency(times: List[Array], unit: str = "ms") -> float: """Returns the approximate beating frequency in Hz by finding the average duration of each beat Parameters ---------- times : List[Array] Time stamps of all beats unit : str, optional Unit of time, by default "ms" Returns ------- float Beating frequency in Hz """ if len(times) == 0: return np.nan # Get chopped data # Find the average length of each beat in time t_mean = np.mean([ti[-1] - ti[0] for ti in times]) # Convert to seconds if unit == "ms": t_mean /= 1000.0 # Return the frequency return 1.0 / t_mean
[docs] def beating_frequency_from_peaks( signals: List[Array], times: List[Array], unit: str = "ms", ) -> Sequence[float]: """Returns the beating frequency in Hz by using the peak values of the signals in each beat Parameters ---------- signals : List[Array] The signal values for each beat times : List[Array] The time stamps of all beats unit : str, optional Unit of time, by default "ms" Returns ------- List[float] Beating frequency in Hz for each beat """ t_maxs = [t[int(np.argmax(c))] for c, t in zip(signals, times)] dt = np.diff(t_maxs) if unit == "ms": dt = np.divide(dt, 1000.0) return np.divide(1, dt)
[docs] def beating_frequency_from_apd_line( y: np.ndarray, time: np.ndarray, unit: str = "ms", ) -> Sequence[float]: """Returns the beating frequency in Hz by using the intersection of the APD50 line Parameters ---------- signals : List[Array] The signal values for each beat times : List[Array] The time stamps of all beats unit : str, optional Unit of time, by default "ms" Returns ------- List[float] Beating frequency in Hz for each beat """ from . import chopping starts, ends, zeros = chopping.locate_chop_points( time, y, threshold_factor=0.5, min_window=50, ) if len(starts) > 1: dt = np.diff(starts) else: dt = np.diff(ends) if unit == "ms": dt = np.divide(dt, 1000.0) return np.divide(1, dt)
[docs] def find_upstroke_values( t: np.ndarray, y: np.ndarray, upstroke_duration: int = 50, normalize: bool = True, ) -> np.ndarray: # Find intersection with APD50 line y_mid = (np.max(y) + np.min(y)) / 2 f = UnivariateSpline(t, y - y_mid, s=0, k=3) zeros = f.roots() if len(zeros) == 0: return np.array([]) idx_mid = next(i for i, ti in enumerate(t) if ti > zeros[0]) # Upstroke should not be more than 50 ms N = upstroke_duration // 2 upstroke = y[idx_mid - N : idx_mid + N + 1] if normalize and len(upstroke) > 0: from scipy.ndimage import gaussian_filter1d upstroke_smooth = gaussian_filter1d(upstroke, 2.0) y_min = np.min(upstroke_smooth) y_max = np.max(upstroke_smooth) upstroke_normalized = (upstroke - y_min) / (y_max - y_min) return upstroke_normalized return upstroke
[docs] def apd_up_xy( y: Array, t: Array, low: int, high: int, backend: Backend = Backend.python, ) -> float: """Find the duration between first intersection (i.e during the upstroke) of two APD lines Arguments --------- t : np.ndarray Time values y : np.ndarray The trace low: int First APD line (value between 0 and 100) high: int Second APD line (value between 0 and 100) backend : utils.Backend, optional Which backend to use by default Backend.python. Choices, 'python', 'numba' Returns ------- float: The time between `low` to `high` """ _check_factor(low) _check_factor(high) y = numpyfy(y) t = numpyfy(t) if backend == Backend.numba: return _numba.apd_up_xy(y=y, t=t, low=low, high=high) y_norm = utils.normalize_signal(y) y_from = UnivariateSpline(t, y_norm - low / 100, s=0, k=3) t_from = y_from.roots()[0] y_to = UnivariateSpline(t, y_norm - high / 100, s=0, k=3) t_to = y_to.roots()[0] return t_to - t_from
[docs] def max_relative_upstroke_velocity( t: np.ndarray, y: np.ndarray, upstroke_duration: int = 50, sigmoid_fit: bool = True, ) -> Upstroke: """Estimate maximum relative upstroke velocity Arguments --------- t : np.ndarray Time values y : np.ndarray The trace upstroke_duration : int Duration in milliseconds of upstroke (Default: 50). This does not have to be exact up should at least be longer than the upstroke. sigmoid_fit : bool If True then use a sigmoid function to fit the data of the upstroke and report the maximum derivate of the sigmoid as the maximum upstroke. Notes ----- Brief outline of current algorithm: 1. Interpolate data to have time resolution of 1 ms. 2. Find first intersection with ADP50 line 3. Select 25 ms before and after the point found in 2 4. Normalize the 50 ms (upstroke_duration) from 3 so that we have a max and min value of 1 and 0 respectively. If sigmoid fit is True Fit the normalize trace to a sigmoid function and compte the maximum derivate of the sigmoid else: 5. Compute the successive differences in amplitude (i.e delta y) and report the maximum value of these """ # Interpolate to 1ms precision t0, y0 = utils.interpolate(t, y, dt=1.0) # Find values belonging to upstroke upstroke = find_upstroke_values(t0, y0, upstroke_duration=upstroke_duration) dt = np.mean(np.diff(t)) if len(upstroke) == 0: # There is no signal index = 0 value = np.nan x0 = None s = None time_APD20_to_APD80 = np.nan else: t_upstroke = t0[: len(upstroke)] t_upstroke -= t_upstroke[0] if sigmoid_fit: def sigmoid(x, k, x0): return 0.5 * (np.tanh(k * (x - x0)) + 1) from scipy.optimize import curve_fit popt, pcov = curve_fit(sigmoid, t_upstroke, upstroke, method="dogbox") k, x0 = popt index = None # type:ignore s = sigmoid(t_upstroke, k, x0) value = k / 2 time_APD20_to_APD80 = apd_up_xy(s, t_upstroke, 20, 80) else: # Find max upstroke index = np.argmax(np.diff(upstroke)) # type:ignore value = np.max(np.diff(upstroke)) x0 = None s = None time_APD20_to_APD80 = apd_up_xy(upstroke, t_upstroke, 20, 80) return Upstroke( index=index, value=value, upstroke=upstroke, dt=dt, x0=x0, sigmoid=s, time_APD20_to_APD80=time_APD20_to_APD80, )
[docs] def maximum_upstroke_velocity( y: Array, t: Optional[Array] = None, use_spline: bool = True, normalize: bool = False, ) -> float: r""" Compute maximum upstroke velocity Arguments --------- y : array The signal t : array The time points use_spline : bool Use spline interpolation (Default : True) normalize : bool If true normalize signal first, so that max value is 1.0, and min value is zero before performing the computation. Returns ------- float The maximum upstroke velocity Notes ----- If :math:`y` is the voltage, this feature corresponds to the maximum upstroke velocity. In the case when :math:`y` is a continuous signal, i.e a 5th order spline interpolant of the data, then we can simply find the roots of the second derivative, and evaluate the derivative at that point, i.e .. math:: \max \frac{\mathrm{d}y}{\mathrm{d}t} = \frac{\mathrm{d}y}{\mathrm{d}t} (t^*), where .. math:: \frac{\mathrm{d}^2y}{\mathrm{d}^2t}(t^*) = 0. If we decide to use the discrete version of the signal then the above each derivative is approximated using a forward difference, i.e .. math:: \frac{\mathrm{d}y}{\mathrm{d}t}(t_i) \approx \frac{y_{i+1}-y_i}{t_{i+1} - t_i}. """ if t is None: t = range(len(y)) msg = "The signal and time are not of same length" assert len(t) == len(y), msg # Normalize if normalize: y = utils.normalize_signal(y) if use_spline: f = UnivariateSpline(t, y, s=0, k=5) h = f.derivative() max_upstroke_vel = np.max(h(h.derivative().roots())) else: max_upstroke_vel = np.max(np.divide(np.diff(y), np.diff(t))) return max_upstroke_vel
[docs] def integrate_apd(y, t=None, factor=30, use_spline=True, normalize=False): r""" Compute the integral of the signals above the APD p line Arguments --------- y : array The signal t : array The time points factor: float Which APD line, by default 0.3 use_spline : bool Use spline interpolation (Default : True) normalize : bool If true normalize signal first, so that max value is 1.0, and min value is zero before performing the computation. Returns ------- integral : float The integral above the line defined by the APD p line. Notes ----- This feature represents the integral of the signal above the horizontal line defined by :math:`\mathrm{APD} p`. First let .. math:: y_p = \tilde{y} - \left(1- \frac{p}{100} \right), and let :math:`t_1` and :math:`t_2` be the two solutions solving :math:`y_p(t_i) = 0, i=1,2` (assume that we have 2 solutions only). The integral we are seeking can now be computed as follows: .. math:: \mathrm{Int} \; p = \int_{t_1}^{t_2} \left[ y - y(t_1) \right] \mathrm{d}t """ if normalize: y = utils.normalize_signal(y) if t is None: t = range(len(y)) if isinstance(factor, float) and 0 < factor < 1: # Factor should be multiplied with 100 factor = int(factor * 100) _check_factor(factor) x1, x2 = apd_point(factor, y, t, use_spline=use_spline) g = UnivariateSpline(t, y, s=0, k=3) if use_spline: Y = y - g(x1) f = UnivariateSpline(t, Y, s=0, k=3) integral = f.integral(x1, x2) else: val_th = np.min(y) + (1 - factor / 100) * (np.max(y) - np.min(y)) Y = y - val_th t1 = t.tolist().index(x1) t2 = t.tolist().index(x2) + 1 integral = np.sum(np.multiply(Y[t1:t2], np.diff(t)[t1:t2])) return integral
[docs] def corrected_apd(apd: float, beat_rate: float, formula: str = "friderica"): """Correct the given APD (or any QT measurement) for the beat rate. normally the faster the HR (or the shorter the RR interval), the shorter the QT interval, and vice versa Parameters ---------- apd: float The action potential duration beat_rate : float The beat rate (number of beats per minute) formula : str, optional Formula for computing th corrected APD, either 'friderica' or 'bazett', by default 'friderica', Returns ------- float The corrected APD Notes ----- Friderica formula (default): .. math:: APD (RR)^{-1/3} Bazett formula: .. math:: APD (RR)^{-1/2} where :math:`RR` is the R-R interval in an ECG. For an action potential this would be equivalent to the inverse of the beating frequency (or 60 divided by the beat rate) .. rubric:: Luo, Shen, et al. "A comparison of commonly used QT correction formulae: the effect of heart rate on the QTc of normal ECGs." Journal of electrocardiology 37 (2004): 81-90. """ formula = formula.lower() formulas = ["friderica", "bazett"] msg = f"Expected formula to be one of {formulas}, got {formula}" assert formula in formulas, msg RR = np.divide(60, beat_rate) if formula == "friderica": return np.multiply(apd, pow(RR, -1 / 3)) else: return np.multiply(apd, pow(RR, -1 / 2))
[docs] def detect_ead( y: Array, sigma: float = 1, prominence_level: float = 0.07, ) -> Tuple[bool, Optional[int]]: """Detect (Early after depolarizations) EADs based on peak prominence. Parameters ---------- y : Array The signal that you want to detect EADs sigma : float Standard deviation in the gaussian smoothing kernel Default: 1.0 prominence_level: float How prominent a peak should be in order to be characterized as an EAD. This value should be between 0 and 1, with a greater value being more prominent. Default: 0.07 Returns ------- bool: Flag indicating if an EAD is found or not int or None: Index where we found the EAD. If no EAD is found then this will be None. I more than one peaks are found then only the first will be returned. Notes ----- Given a signal :math:`y` we want to determine wether we have an EAD present in the signal. `EADs <https://en.wikipedia.org/wiki/Afterdepolarization>`_ are abnormal depolarizations happening after the upstroke in an action potential. We assume that an EAD occurs between the maximum value of the signal (i.e the peak) and the next minimum value (i.e when the signal is at rest) To remove noisy patterns we first smooth the signal with a gaussian filter. Then we take out only the part of the signal that is between its maximum and the next minimum values. Then we find the peaks with a `Topographic Prominence <https://en.wikipedia.org/wiki/Topographic_prominence>`_ greater than the given prominence level """ from scipy.ndimage import gaussian_filter1d from scipy.signal import find_peaks y = np.array(y) idx_max = int(np.argmax(y)) idx_min = idx_max + int(np.argmin(y[idx_max:])) y_tmp = y[idx_max:idx_min] - y[idx_min] if len(y_tmp) == 0: return False, None y_smooth = gaussian_filter1d(y_tmp / np.max(y_tmp), sigma) peaks, props = find_peaks(y_smooth, prominence=prominence_level) return len(peaks) > 0, None if len(peaks) == 0 else int(peaks[0] + idx_max)
[docs] def cost_terms_trace( y: Array, t: Array, backend: Backend = Backend.numba, ) -> np.ndarray: y = numpyfy(y) t = numpyfy(t) if backend == Backend.python: logger.warning( "Method currently not implemented for python backend (and will probably not be)", ) if backend == Backend.numba: return _numba.cost_terms_trace(y=y, t=t) raise ValueError(f"Unknown backend {backend}")
# # Use C backend # return _c.cost_terms_trace(y=y, t=t)
[docs] def cost_terms( v: Array, ca: Array, t_v: Array, t_ca: Array, backend: Backend = Backend.numba, ) -> np.ndarray: v = numpyfy(v) t_v = numpyfy(t_v) ca = numpyfy(ca) t_ca = numpyfy(t_ca) if backend == Backend.python: logger.warning( "Method currently not implemented for python backend (and will probably not be)", ) if backend == Backend.numba: return _numba.cost_terms(v=v, ca=ca, t_v=t_v, t_ca=t_ca) # Use C backend # return _c.cost_terms(v=v, ca=ca, t_v=t_v, t_ca=t_ca) raise ValueError(f"Unknown backend {backend}")
[docs] def all_cost_terms( arr: np.ndarray, t: np.ndarray, mask: Optional[np.ndarray] = None, backend: Backend = Backend.numba, normalize_time: bool = True, ) -> np.ndarray: arr = numpyfy(arr) t = numpyfy(t) if not isinstance(arr, np.ndarray): raise TypeError(f"Expected 'arr' to be of type numpy.ndarray got {type(arr)}") if not isinstance(t, np.ndarray): raise TypeError(f"Expected 't' to be of type numpy.ndarray got {type(t)}") if t.shape[0] != arr.shape[0]: raise ValueError( "Shape of 't'({t.shape}) and 'arr'({arr.shape}) does not match", ) if normalize_time: t = t - t[0] if backend == Backend.python: logger.warning( "Method currently not implemented for python backend (and will probably not be)", ) if backend == Backend.numba: return _numba.all_cost_terms(arr=arr, t=t, mask=mask) # Use C backend # return _c.all_cost_terms(arr=arr, t=t, mask=mask) raise ValueError(f"Unknown backend {backend}")