Source code for ap_features.average

from collections import namedtuple
from typing import Optional
from typing import Sequence
from typing import Tuple

import numpy as np
from scipy.interpolate import UnivariateSpline

from .utils import Array

Average = namedtuple("Average", ["y", "x", "ys", "xs"])


[docs] class InvalidSubSignalError(RuntimeError): pass
[docs] def clean_data( ys: Sequence[Array], xs: Optional[Sequence[Array]], ) -> Tuple[Sequence[Array], Sequence[Array]]: """Make sure `xs` and `ys` have the correct shapes and remove empty sub-signals Parameters ---------- ys : Sequence[Array] First list xs : Optional[Sequence[Array]] Second list Returns ------- Tuple[Sequence[Array], Sequence[Array]] (ys, xs) - cleaned version Note ---- The order you send in the array will be the same as the order it is returned. Apart from this fact, the order doesn't matter. Raises ------ InvalidSubSignalError If the length of `xs` and `ys` don't agree InvalidSubSignalError If the length of one of the sub-signals of `xs` and `ys` don't agree. """ new_xs = [] new_ys = [] if xs is None: return ys, [] if len(xs) != len(ys): raise InvalidSubSignalError( "Expected Xs and Ys has to be of same length. " f"Got len(xs) = {len(xs)}, len(ys) = {len(ys)}", ) for i, (x, y) in enumerate(zip(xs, ys)): if len(x) != len(y): raise InvalidSubSignalError( "Expected X and Y has to be of same length. " f"Got len(x) = {len(x)}, len(y) = {len(y)} for index {i}", ) if len(x) == 0: # Skip this one continue new_xs.append(x) new_ys.append(y) return new_ys, new_xs
[docs] def interpolate(X: Array, x: Array, y: Array) -> np.ndarray: """Interpolate array Parameters ---------- X : Array x-coordinates at which to evaluate the interpolated values x : Array x-coordinates of the data points y : Array y-coordinates of the data points Returns ------- np.ndarray Interpolated y-coordinates Note ---- This function will try to perform spline interpolation using `scipy.interpolate.UnivariateSpline` and fall back to `numpy.interp` in case that doesn't work """ try: Y = UnivariateSpline(x, y, s=0)(X) except Exception: # https://stackoverflow.com/questions/64766510/catch-dfitpack-error-from-scipy-interpolate-interpolatedunivariatespline Y = np.interp(X, x, y) return Y
[docs] def create_longest_time_array(xs: Sequence[Array], N: int) -> np.ndarray: """Given a list of sub-signals create a new array of length `N` that cover all values Parameters ---------- xs : Sequence[Array] List of monotonic sub sub-signal N : int Size of output array Returns ------- np.ndarray Array that cover all values of length `N` """ if len(xs) == 0: return np.arange(N) min_x = np.min([xi[0] for xi in xs]) max_x = np.max([xi[-1] for xi in xs]) return np.linspace(min_x, max_x, N)
[docs] def average_and_interpolate( ys: Sequence[Array], xs: Optional[Sequence[Array]] = None, N: int = 200, ) -> Average: """ Get the average of list of signals assuming that they align at the same x value Parameters ---------- ys : Array The signal values xs : Array The x-values N : int Length of output array (Default: 200) Returns ------- Y_avg: array The average y values X : array The new x-values """ ys, xs = clean_data(ys, xs) if len(xs) == 0: xs = [np.arange(0, len(yi)) for yi in ys] # Construct new time array X = create_longest_time_array(xs, N) if len(ys) == 0: return Average(y=np.zeros(N), x=X, xs=xs, ys=ys) if len(ys) == 1: return Average(y=interpolate(X, xs[0], ys[0]), x=X, xs=xs, ys=ys) Ys = [] Xs = [] Y = np.zeros_like(X) counts = np.zeros_like(X) for x, y in zip(xs, ys): # Take out relevant piece end_idx = next(j + 1 for j, xj in enumerate(X) if xj >= x[-1]) start_idx = next(j for j, xj in enumerate(X) if xj >= x[0]) xi = X[start_idx:end_idx] yi = interpolate(xi, x, y) Y[start_idx:end_idx] += yi counts[start_idx:end_idx] += 1 Xs.append(xi) Ys.append(yi) Y_avg = np.divide(Y, counts, out=np.zeros_like(Y), where=counts != 0) return Average(y=Y_avg, x=X, ys=Ys, xs=Xs)