import logging
from typing import Optional
from typing import Tuple
import cv2
import dask
import dask.array as da
import numpy as np
import scipy.spatial
import tqdm
from dask.diagnostics import ProgressBar
from . import utils
logger = logging.getLogger(__name__)
INTERPOLATION_METHODS = {
"nearest": cv2.INTER_NEAREST,
"linear": cv2.INTER_LINEAR,
"area": cv2.INTER_AREA,
"cubic": cv2.INTER_CUBIC,
"lanczos": cv2.INTER_LANCZOS4,
}
def resize_vectors(vectors, new_shape):
if len(vectors.shape) == 4:
vec1 = resize_frames(
vectors[:, :, :, 0],
new_shape=new_shape,
)
vec2 = resize_frames(
vectors[:, :, :, 1],
new_shape=new_shape,
)
else:
assert len(vectors.shape) == 3
vec1 = resize_frames(
vectors[:, :, 0],
new_shape=new_shape,
)
vec2 = resize_frames(
vectors[:, :, 1],
new_shape=new_shape,
)
return da.stack([vec1, vec2], axis=-1)
def resize_data(data: utils.MPSData, scale: float) -> utils.MPSData:
new_frames = resize_frames(data.frames, scale)
info = data.info.copy()
info["um_per_pixel"] /= scale
info["size_x"], info["size_y"], info["num_frames"] = new_frames.shape
return utils.MPSData(new_frames, data.time_stamps, info)
def subsample_time(data: utils.MPSData, step: int) -> utils.MPSData:
new_frames = data.frames[:, :, ::step]
new_times = data.time_stamps[::step]
info = data.info.copy()
info["num_frames"] = len(new_times)
return utils.MPSData(new_frames, new_times, info)
def reshape_lk(reference_points: np.ndarray, flows: utils.Array) -> utils.Array:
x, y = reference_points.reshape(-1, 2).astype(int).T
xu = np.sort(np.unique(x))
yu = np.sort(np.unique(y))
is_dask = False
if isinstance(flows, da.Array):
is_dask = True
dx = xu[0]
dxs = np.diff(xu)
if len(dxs) > 0:
dx = dxs[0]
assert np.all(dxs == dx)
dy = yu[0]
dys = np.diff(yu)
if len(dys) > 0:
dy = dys[0]
assert np.all(dys == dy)
xp = ((x - np.min(x)) / dx).astype(int)
yp = ((y - np.min(y)) / dy).astype(int)
if len(flows.shape) == 3:
num_frames = flows.shape[-1]
out = np.zeros((yp.max() + 1, xp.max() + 1, 2, num_frames))
out[yp, xp, :, :] = flows
out = np.swapaxes(out, 2, 3)
else:
out = np.zeros((yp.max() + 1, xp.max() + 1, 2))
out[yp, xp, :] = flows
if is_dask:
out = da.from_array(out)
return out
def resize_frames(
frames: np.ndarray,
scale: float = 1.0,
new_shape: Optional[Tuple[int, int]] = None,
interpolation_method="nearest",
) -> np.ndarray:
logger.debug("Resize frames")
msg = f"Expected interpolation method to be one of {INTERPOLATION_METHODS.keys()}, got {interpolation_method}"
assert interpolation_method in INTERPOLATION_METHODS, msg
if scale != 1.0 or new_shape is not None:
if len(frames.shape) == 2:
w, h = frames.shape
else:
w, h, num_frames = frames.shape
if new_shape is not None:
assert len(new_shape) == 2
width, height = new_shape
else:
width = int(w * scale)
height = int(h * scale)
width = int(width)
height = int(height)
if len(frames.shape) == 2:
return cv2.resize(frames, (height, width))
all_resized_frames = []
for i in range(num_frames):
all_resized_frames.append(
dask.delayed(cv2.resize)(
frames[:, :, i],
(height, width),
INTERPOLATION_METHODS[interpolation_method],
),
)
with ProgressBar(out=utils.LoggerWrapper(logger, logging.INFO)):
resized_frames = da.stack(
*da.compute(all_resized_frames),
axis=-1,
).compute()
else:
resized_frames = frames.copy()
logger.info("Done resizing")
return resized_frames
[docs]
def interpolate_lk_flow(
disp: np.ndarray,
reference_points: np.ndarray,
size_x: int,
size_y: int,
interpolation_method: str = "linear",
) -> np.ndarray:
"""Given an array of displacements (of flow) coming from
the Lucas Kanade method return a new array which
interpolates the data onto a given size, i.e
the original size of the image.
Parameters
----------
disp : np.ndarray
The flow or displacement from LK algorithm
reference_points : np.ndarray
Reference points
size_x : int
Size of the output in x-direction
size_y : int
Size of the output in y-direction
interpolation_method : str
Method for interpolation, by default 'linear'
Returns
-------
np.ndarray
Interpolated values
"""
num_frames = disp.shape[-1]
from scipy.interpolate import griddata
disp_full = np.zeros((size_y, size_x, 2, num_frames))
ref_points = np.squeeze(reference_points)
grid_x, grid_y = np.meshgrid(np.arange(size_x), np.arange(size_y))
# TODO: This could be parallelized
for i in tqdm.tqdm(range(num_frames)):
values_x = disp[:, 0, i]
values_y = disp[:, 1, i]
disp_full[:, :, 0, i] = griddata(
ref_points,
values_x,
(grid_y, grid_x),
method=interpolation_method,
)
disp_full[:, :, 1, i] = griddata(
ref_points,
values_y,
(grid_y, grid_x),
method=interpolation_method,
)
return disp_full
def rbfinterp2d_map(args):
return rbfinterp2d(*args)
[docs]
def rbfinterp2d( # noqa:C901
coord,
input_array,
xgrid,
ygrid,
rbfunction="gaussian",
epsilon=10,
k=50,
nchunks=5,
):
"""
Fast 2-D grid interpolation of a sparse (multivariate) array using a
radial basis function.
https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.html
Parameters
----------
coord: array_like
Array of shape (n, 2) containing the coordinates of the data points
into a 2-dimensional space.
input_array: array_like
Array of shape (n) or (n, m) containing the values of the data points,
where *n* is the number of data points and *m* the number of co-located
variables. All values in ``input_array`` are required to have finite values.
xgrid, ygrid: array_like
1D arrays representing the coordinates of the 2-D output grid.
rbfunction: {"gaussian", "multiquadric", "inverse quadratic", "inverse multiquadric", "bump"}, optional
The name of one of the available radial basis function based on a
normalized Euclidian norm as defined in the **Notes** section below.
More details provided in the wikipedia reference page linked below.
epsilon: float, optional
The shape parameter used to scale the input to the radial kernel.
A smaller value for ``epsilon`` produces a smoother interpolation. More
details provided in the wikipedia reference page linked below.
k: int or None, optional
The number of nearest neighbours used for each target location.
This can also be useful to to speed-up the interpolation.
If set to None, it interpolates using all the data points at once.
nchunks: int, optional
The number of chunks in which the grid points are split to limit the
memory usage during the interpolation.
Returns
-------
output_array: ndarray
The interpolated field(s) having shape (*m*, ``ygrid.size``, ``xgrid.size``).
Notes
-----
The coordinates are normalized before computing the Euclidean norms:
.. math::
x = (x - min(x)) / max[max(x) - min(x), max(y) - min(y)],
y = (y - min(y)) / max[max(x) - min(x), max(y) - min(y)],
where the min and max values are taken as the 2nd and 98th percentiles.
References
----------
Wikipedia contributors, "Radial basis function,"
Wikipedia, The Free Encyclopedia,
https://en.wikipedia.org/w/index.php?title=Radial_basis_function&oldid=906155047
(accessed August 19, 2019).
"""
_rbfunctions = [
"nearest",
"gaussian",
"inverse quadratic",
"inverse multiquadric",
"bump",
]
input_array = np.copy(input_array)
if np.any(~np.isfinite(input_array)):
raise ValueError("input_array contains non-finite values")
if input_array.ndim == 1:
nvar = 1
input_array = input_array[:, None]
elif input_array.ndim == 2:
nvar = input_array.shape[1]
else:
raise ValueError(
"input_array must have 1 (n) or 2 dimensions (n, m), but it has %i" % input_array.ndim,
)
npoints = input_array.shape[0]
if npoints == 0:
raise ValueError(
"input_array (n, m) must contain at least one sample, but it has %i" % npoints,
)
# only one sample, return uniform fields
elif npoints == 1:
output_array = np.ones((nvar, ygrid.size, xgrid.size))
for i in range(nvar):
output_array[i, :, :] *= input_array[:, i]
return output_array
coord = np.copy(coord)
if coord.ndim != 2:
raise ValueError(
"coord must have 2 dimensions (n, 2), but it has %i" % coord.ndim,
)
if npoints != coord.shape[0]:
raise ValueError(
"the number of samples in the input_array does not match the "
+ "number of coordinates %i!=%i" % (npoints, coord.shape[0]),
)
# normalize coordinates
qcoord = np.percentile(coord, [2, 98], axis=0)
dextent = np.max(np.diff(qcoord, axis=0))
coord = (coord - qcoord[0, :]) / dextent
rbfunction = rbfunction.lower()
if rbfunction not in _rbfunctions:
raise ValueError(
"Unknown rbfunction '{}'\n".format(rbfunction) + "The available rbfunctions are: " + str(_rbfunctions),
) from None
# generate the target grid
X, Y = np.meshgrid(xgrid, ygrid)
grid = np.column_stack((X.ravel(), Y.ravel()))
# normalize the grid coordinates
grid = (grid - qcoord[0, :]) / dextent
# k-nearest interpolation
if k is not None and k > 0:
k = int(np.min((k, npoints)))
# create cKDTree object to represent source grid
tree = scipy.spatial.cKDTree(coord)
else:
k = 0
# split grid points in n chunks
if nchunks > 1:
subgrids = np.array_split(grid, nchunks, 0)
subgrids = [x for x in subgrids if x.size > 0]
else:
subgrids = [grid]
# loop subgrids
i0 = 0
output_array = np.zeros((grid.shape[0], nvar))
for i, subgrid in enumerate(subgrids):
idelta = subgrid.shape[0]
if k == 0:
# use all points
d = scipy.spatial.distance.cdist(coord, subgrid, "euclidean").transpose()
inds = np.arange(npoints)[None, :] * np.ones(
(subgrid.shape[0], npoints),
).astype(int)
else:
# use k-nearest neighbours
d, inds = tree.query(subgrid, k=k)
if k == 1:
# nearest neighbour
output_array[i0 : (i0 + idelta), :] = input_array[inds, :]
else:
# the interpolation weights
if rbfunction == "gaussian":
w = np.exp(-((d * epsilon) ** 2))
elif rbfunction == "inverse quadratic":
w = 1.0 / (1 + (epsilon * d) ** 2)
elif rbfunction == "inverse multiquadric":
w = 1.0 / np.sqrt(1 + (epsilon * d) ** 2)
elif rbfunction == "bump":
w = np.exp(-1.0 / (1 - (epsilon * d) ** 2))
w[d >= 1 / epsilon] = 0.0
if not np.all(np.sum(w, axis=1)):
w[np.sum(w, axis=1) == 0, :] = 1.0
# interpolate
for j in range(nvar):
output_array[i0 : (i0 + idelta), j] = np.sum(
w * input_array[inds, j],
axis=1,
) / np.sum(w, axis=1)
i0 += idelta
# reshape to final grid size
return output_array.reshape(ygrid.size, xgrid.size, nvar)