import logging
from pathlib import Path
from typing import Optional
from typing import Union
import cv2
import dask.array as da
import matplotlib.pyplot as plt
import numpy as np
import tqdm
from . import frame_sequence as fs
from . import scaling
from . import utils
logger = logging.getLogger(__name__)
def apply_custom_colormap(image_gray, cmap=plt.get_cmap("seismic")):
assert image_gray.dtype == np.uint8, "must be np.uint8 image"
if image_gray.ndim == 3:
image_gray = image_gray.squeeze(-1)
# Initialize the matplotlib color map
sm =
# Obtain linear color range
color_range = sm.to_rgba(np.linspace(0, 1, 256))[:, 0:3] # color range RGBA => RGB
color_range = (color_range * 255.0).astype(np.uint8) # [0,1] => [0,255]
color_range = np.squeeze(
np.dstack([color_range[:, 2], color_range[:, 1], color_range[:, 0]]),
) # RGB => BGR
# Apply colormap for each channel individually
channels = [cv2.LUT(image_gray, color_range[:, i]) for i in range(3)]
return np.dstack(channels)
def colorize(img, vmin=None, vmax=None, factor=1, cmap="bwr"):
img = img.astype(float)
vmin = np.min(img) if vmin is None else vmin
vmax = np.max(img) if vmax is None else vmax
img = (img - vmin) / (vmax - vmin)
img = (img * 255).astype(np.uint8)
img = apply_custom_colormap(img, cmap=plt.get_cmap(cmap))
# img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
if factor != 1:
x, y, _ = img.shape
img = cv2.resize(img, (y * factor, x * factor))
return img
def _draw_flow(image, x, y, fx, fy, thickness: int = 2):
QUIVER = (0, 0, 255)
lines = np.vstack([x, y, x + fx, y + fy]).T.reshape(-1, 2, 2)
lines = np.int32(lines + 0.5)
vis = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
cv2.polylines(vis, lines, 0, QUIVER, thickness=thickness, lineType=8)
# for (x1, y1), (_x2, _y2) in lines:
#, (x1, y1), 1, (0, 255, 0), -1)
return vis
def draw_lk_flow(image, flow, reference_points):
x, y = reference_points.reshape(-1, 2).astype(int).T
fx, fy = flow.T
return _draw_flow(image, x, y, fx, fy)
def draw_flow(image, flow, step=16, scale: float = 1.0, thickness: int = 2):
image : [type]
flow : [type]
step : int, optional
[description], by default 16
h, w = image.shape[:2]
y, x = np.mgrid[step / 2 : h : step, step / 2 : w : step].reshape(2, -1).astype(int)
fx, fy = flow[y, x].T
return _draw_flow(image, x, y, scale * fx, scale * fy, thickness=thickness)
def draw_hsv(flow):
h, w = flow.shape[:2]
magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
hsv = np.zeros((h, w, 3), np.uint8)
# Sets image hue according to the optical flow
# direction
hsv[..., 0] = angle * 180 / np.pi / 2
# Sets image saturation to maximum
hsv[..., 1] = 255
# Sets image value according to the optical flow
# magnitude (normalized)
hsv[..., 2] = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX)
# Converts HSV to RGB (BGR) color representation
rgb = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
return rgb
def quiver_video(
data: utils.MPSData,
vectors: Union[np.ndarray, fs.VectorFrameSequence],
path: utils.PathLike,
step: int = 16,
vector_scale: float = 1.0,
frame_scale: float = 1.0,
convert: bool = True,
offset: int = 0,
thickness: int = 2,
) -> None:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
if isinstance(vectors, fs.VectorFrameSequence):
vectors_np: np.ndarray = vectors.array_np
elif isinstance(vectors, da.Array):
vectors_np = vectors.compute()
vectors_np = vectors
assert len(vectors_np.shape) == 4
if frame_scale < 1.0:
data = scaling.resize_data(data, frame_scale)
width = data.size_y
height = data.size_x
fps = data.framerate
num_frames = data.num_frames - offset
# Check shapes
time_axis = vectors_np.shape.index(num_frames)
except ValueError as ex:
msg = (
"Time axis for frames and vetors does not match. "
f"Got frames of shape {data.frames.shape}, and "
f"vector of shape {vectors_np.shape}."
raise ValueError(msg) from ex
vector_shape = (
vectors_np.shape[time_axis] + offset,
if not data.frames.shape == vector_shape:
msg = (
"Shape of frames and vector does not match. "
f"Got frames of shape {data.frames.shape}, and "
f"vector of shape {vectors_np.shape}."
f"Expected frames to have shape {vector_shape}."
raise ValueError(msg)
p = Path(path).with_suffix(".mp4")
if p.is_file():
frames = utils.to_uint8(data.frames)
out = cv2.VideoWriter(p.as_posix(), fourcc, fps, (width, height))
for i in tqdm.tqdm(range(num_frames), desc=f"Create quiver video at {p}"):
im = frames[:, :, i]
flow = np.take(vectors_np, i, axis=time_axis)
draw_flow(im, flow, step=step, scale=vector_scale, thickness=thickness),
if convert:
import imageio
tmp_path = p.parent.joinpath(p.stem + "_tmp").with_suffix(".mp4")
video =
imageio.mimwrite(p, [np.swapaxes(d, 0, 1) for d in video.iter_data()], fps=fps)
def hsv_video(
path: utils.PathLike,
vectors: Union[np.ndarray, fs.VectorFrameSequence],
fps: int = 50,
axis: int = 2,
convert: bool = False,
) -> None:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
if isinstance(vectors, fs.VectorFrameSequence):
vectors_np: np.ndarray = vectors.array_np
vectors_np = vectors
assert len(vectors_np.shape) == 4
width = vectors_np.shape[1]
height = vectors_np.shape[0]
assert len(vectors_np.shape) == 3
num_frames = vectors_np.shape[axis]
p = Path(path).with_suffix(".mp4")
out = cv2.VideoWriter(p.as_posix(), fourcc, fps, (width, height))
for i in tqdm.tqdm(range(num_frames), desc=f"Create HSV movie at {p}"):
flow = np.take(vectors_np, i, axis=axis)
if convert:
convert_imageio(p, fps)
def convert_imageio(path, fps):"Convert video using imageio")
import imageio
tmp_path = path.parent.joinpath(path.stem + "_tmp").with_suffix(".mp4")
video =
imageio.mimwrite(path, video.iter_data(), fps=fps)
def heatmap(
path: utils.PathLike,
data: Union[np.ndarray, fs.FrameSequence],
fps: int = 50,
axis: int = 2,
convert: bool = False,
vmin: Optional[float] = None,
vmax: Optional[float] = None,
transpose: bool = False,
):"Create heatmap video")
if isinstance(data, fs.FrameSequence):
data_np: np.ndarray = data.array_np
data_np = data
vmin = vmin if vmin is not None else data_np.min()
vmax = vmax if vmax is not None else data_np.max()
assert len(data_np.shape) == 3
num_frames = data_np.shape[axis]
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
width = data_np.shape[1]
height = data_np.shape[0]
if transpose:
width = data_np.shape[0]
height = data_np.shape[1]
p = Path(path).with_suffix(".mp4")
out = cv2.VideoWriter(p.as_posix(), fourcc, fps, (width, height))
for i in tqdm.tqdm(range(num_frames), desc=f"Create heatmap movie at {p}"):
heat = np.take(data_np, i, axis=axis)
if transpose:
heat = heat.T
out.write(colorize(heat, vmin=vmin, vmax=vmax, cmap=cmap))
cv2.destroyAllWindows()"Done creating heatmap video")
if convert:
convert_imageio(p, fps)