fmsignal module
This module is the core of the fmkit framework, which is designed to facilitate researches on in-air-handwriting related research.
Author: Duo Lu <duolu.cs@gmail.com>
Version: 0.1 License: MIT
Updated on Feb. 7, 2020, version 0.1
Created on Aug 14, 2017, draft
The MIT License
Copyright 2017-2021 Duo Lu <duolu.cs@gmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
FMSignal
3D finger motion signal (after preprocessing, device agnostic).
This is the data structure for a finger motion signal. The signal is a time series containing samples of physical states of one point on a hand, captured by a 3D camera device or a wearable data glove device. This is the class the abstracts the signal data structure after preprocessing, and it is device agnostic.
Attributes:
Name | Type | Description |
---|---|---|
length |
int |
length of the time series (i.e., number of samples). |
dim |
int |
dimension of each sample (i.e., number of sensor axes). |
ts |
ndarray |
timestamps of each sample, in a len dimensional vector. |
data |
ndarray |
the actual time series data, in a len * dim matrix. |
user |
str |
the user who creates this signal. |
cid |
str |
the unique id indicating the content of the signal. |
seq |
int |
the sequence id in a set when loaded from a dataset. |
NOTE: There are several ways to create an FMSignal object.
(1) Preprocess a raw signal, i.e., "raw_signal.preprocess()".
(2) Load data from a file with the class method "construct_from_file()".
(3) Deep copy from an existing FMSignal object, i.e., "signal.copy()".
(4) Align to a template or another signal to generate an aligned signal,
i.e., "signal.align_to(another_signal)"
(5) Modify an existing FMSignal object to generate a new signal, which
is basically only used for data agumentation.
It is not recommended to directly construct an FMSignal object using its constructor since the attributes may be inconsistent. Instead, please only use those previously mentioned methods to create an FMSignal object.
NOTE: Timestamp is always in ms and frequency is always in Hz.
NOTE: "ts" and "data" both have "dtype == np.float32".
Currently, a signal has the following 18 sensor axes, i.e., "dim" is 18, and the "data" field has the shape of (len, 18).
axes | description |
---|---|
0-2 | position in x-y-z |
3-5 | speed in x-y-z, currently just the derivative of the position |
6-8 | acceleration in x-y-z, currently just the derivative of speed |
9-11 | orientation, i.e., the x, y, z components of the quaternion |
12-14 | angular speed, currently just the derivative of the orientation |
15-17 | angular acceleration, just the derivative of the angular speed |
A signal may also have the following optional attributes.
attributes | description |
---|---|
len_origin | The length before alignment (only for aligned signals). |
dist | DTW alignment distant (only for aligned signals). |
a2to1_start | Alignment index start array (only for aligned signals). |
a2to1_end | Alignment index end array (only for aligned signals). |
NOTE: "user", "cid", and "seq" are only needed to print information for debugging. Use the class "FMSignalDesc" to obtain the meta data of the signal for more details. Typically, the signal file is named as "user_cid_seq.txt" (or ".csv", or ".npy").
For example, given a file "duolu_duolu_01.txt", the "user" label is "duolu", the "cid" label is also "duolu", indicating the content is about writing the string "duolu", and the "seq" is 1, indicating the repetition #1.
Usually, for privacy issues, "user" label and "cid" label are anonymous strings since they only need to be distinctive instead of meaningful. For example, "user00_id00_01.txt" means the "user" label is "user00", the "id" label is "id00", and the seq is 1.
NOTE: The raw signal file may have two different formats, i.e., either in Comma Separated Value (".csv") or in NumPy binary format (".npy"). However, the content structure is the same, which is essentially a matrix, where the rows are the samples at a certain time and the columns are the data from a specific sensor axis. See the "Data Format Details" document for more information.
__init__(self, length=0, dim=0, ts=None, data=None, user='', cid='', seq=0)
special
Constructor.
See the class attributes for the meaning of the arguments.
NOTE: This is only for internal usage. If an FMSignal object is needed, use the class method "construct_from_file()" to load data from a file, or use the "copy()" method to duplicate a signal, or use the "align_to()" method to obtain an aligned signal.
Source code in code_fmkit/fmsignal.py
def __init__(self, length=0, dim=0, ts=None, data=None,
user="", cid="", seq=0):
"""Constructor.
See the class attributes for the meaning of the arguments.
**NOTE**: This is only for internal usage. If an FMSignal object is
needed, use the class method "construct_from_file()" to load data from
a file, or use the "copy()" method to duplicate a signal, or use the
"align_to()" method to obtain an aligned signal.
"""
self.length = length
self.dim = dim
self.ts = ts
self.data = data
self.user = user
self.cid = cid
self.seq = seq
__str__(self)
special
Convert the meta info of the signal to a human readable string.
Currently, the string is "user_cid_seq".
Parameters:
Name | Type | Description | Default |
---|---|---|---|
None |
None |
No arguments. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def __str__(self):
"""Convert the meta info of the signal to a human readable string.
Currently, the string is "user_cid_seq".
Args:
None (None): No arguments.
Returns:
None: No return value.
"""
return "%s_%s_%d" % (self.user, self.cid, self.seq)
align_to(self, template, window=0, penalty=0, method='c', keep_dist_matrix=False)
Get another signal by aligning the signal to a template signal.
NOTE: The alignment is done using Dynamic Time Warping (DTW).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
template |
FMSignal |
The template signal. |
required |
window |
int |
The DTW alignment window. |
0 |
penalty |
int |
The DTW element-wise missalign penalty. |
0 |
method |
string |
Implementation method, either "c" or "python". |
'c' |
keep_dist_matrix |
bool |
Indicating whether to keep the DTW result, i.e., the "dist_matrix". |
False |
Returns:
Type | Description |
---|---|
FMSignal |
The constructed signal object. |
Source code in code_fmkit/fmsignal.py
def align_to(self, template, window=0, penalty=0, method=DTW_METHOD,
keep_dist_matrix=False):
"""Get another signal by aligning the signal to a template signal.
**NOTE**: The alignment is done using Dynamic Time Warping (DTW).
Args:
template (FMSignal): The template signal.
window (int): The DTW alignment window.
penalty (int): The DTW element-wise missalign penalty.
method (string): Implementation method, either "c" or "python".
keep_dist_matrix (bool): Indicating whether to keep the DTW result,
i.e., the "dist_matrix".
Returns:
FMSignal: The constructed signal object.
"""
length_new = template.length
ts_new = template.ts.copy()
#print(template.data.shape)
#print(self.data.shape)
if method == "python":
tup = dtw(template.data, self.data, window, penalty)
elif method == "c":
tup = dtw_c(template.data, self.data, window, penalty)
else:
raise ValueError("Unkown DTW implementation method (%s)!" % method)
(_dist, matrix, _d, a2to1_start, a2to1_end, _s, _e, data_new) = tup
signal = FMSignal(length_new, self.dim, ts_new, data_new,
self.user, self.cid, self.seq)
signal.len_origin = self.length
signal.a2to1_start = a2to1_start
signal.a2to1_end = a2to1_end
if keep_dist_matrix:
signal.dist_matrix = matrix
return signal
all_close_to(self, signal)
Check whether this signal is almost identical to another signal.
NOTE: The criteria of "identical" is defined by "np.allclose()". The two signals must have the same type and length.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
signal |
FMSignal |
The other signal to compare. |
required |
Returns:
Type | Description |
---|---|
bool |
True if they are almost identical; False otherwise. |
Source code in code_fmkit/fmsignal.py
def all_close_to(self, signal):
"""Check whether this signal is almost identical to another signal.
**NOTE**: The criteria of "identical" is defined by "np.allclose()".
The two signals must have the same type and length.
Args:
signal (FMSignal): The other signal to compare.
Returns:
bool: True if they are almost identical; False otherwise.
"""
if not isinstance(signal, FMSignal):
return False
if self.length != signal.length:
return False
# NOTE: The CSV format only stores six digits after the decimal point.
# Hence, "atol" can not be smaller than 1e-6.
r1 = np.allclose(self.ts, signal.ts, atol=1e-6)
r2 = np.allclose(self.data, signal.data, atol=1e-6)
return r1 and r2
amplitude_normalize(self)
Normalize the amplitude of each sensor axes.
NOTE: The ratios of x-y-z axes of one type, e.g., position, acc, etc., are not perserved.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
None |
None |
No argument. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def amplitude_normalize(self):
"""Normalize the amplitude of each sensor axes.
**NOTE**: The ratios of x-y-z axes of one type, e.g., position, acc, etc.,
are not perserved.
Args:
None (None): No argument.
Returns:
None: No return value.
"""
data = self.data
mean = np.mean(data, axis=0)
std = np.std(data, axis=0)
for j in range(self.dim):
data[:, j] = np.divide(data[:, j] - mean[j], std[j])
construct_from_file(fn, mode, user='', cid='', seq=0)
classmethod
Construct a signal by loading data from a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
The file name (without extension). |
required |
mode |
string |
The file format (currently either "csv" or "npy"). |
required |
user |
string |
The user who creates this signal. |
'' |
cid |
string |
The unique id indicating the content of the signal. |
'' |
seq |
int |
The sequence id in a set when loaded from a dataset. |
0 |
Returns:
Type | Description |
---|---|
FMSignal |
The constructed signal object. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the "mode" is unknown. |
FileNotFoundError |
If the file does not exist. |
Source code in code_fmkit/fmsignal.py
@classmethod
def construct_from_file(cls, fn, mode, user="", cid="", seq=0):
"""Construct a signal by loading data from a file.
Args:
fn (string): The file name (without extension).
mode (string): The file format (currently either "csv" or "npy").
user (string): The user who creates this signal.
cid (string): The unique id indicating the content of the signal.
seq (int): The sequence id in a set when loaded from a dataset.
Returns:
FMSignal: The constructed signal object.
Raises:
ValueError: If the "mode" is unknown.
FileNotFoundError: If the file does not exist.
"""
signal = FMSignal(0, 0, None, None)
signal.load_from_file(fn, mode)
signal.user = user
signal.cid = cid
signal.seq = seq
return signal
copy(self)
Deep copy of the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
None |
None |
No arguments. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def copy(self):
"""Deep copy of the signal.
Args:
None (None): No arguments.
Returns:
None: No return value.
"""
signal = FMSignal(self.length, self.dim, self.data.copy(),
self.ts.copy(), self.user, self.cid, self.seq)
return signal
distance_to(self, other)
Calculate the element-wise absolute distance of two signals.
NOTE: The other signal must be already aligned to this signal, or their lengths must be the same.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
FMSignal |
The other signal. |
required |
Returns:
Type | Description |
---|---|
ndarray |
An array of the absolute difference of the two signals. |
Exceptions:
Type | Description |
---|---|
ValueError |
If their lengths are different. |
Source code in code_fmkit/fmsignal.py
def distance_to(self, other):
"""Calculate the element-wise absolute distance of two signals.
**NOTE**: The other signal must be already aligned to this signal, or
their lengths must be the same.
Args:
other (FMSignal): The other signal.
Returns:
ndarray: An array of the absolute difference of the two signals.
Raises:
ValueError: If their lengths are different.
"""
if self.length != other.length:
raise ValueError("Incompatible signal lengths %d and %d."
% (self.length, other.length))
return np.absolute(self.data - other.data)
filter(self, sample_freq, cut_freq)
Low-pass filtering on the signal (in place).
NOTE: It is assumed that a hand can not move in very high frequency, so the high frequency components of the signal is filtered.
NOTE: This method use NumPy FFT and IFFT. It modifies the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sample_freq |
float |
sample frequency of this signal. |
required |
cut_freq |
float |
low pass filtering cutoff frequency. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def filter(self, sample_freq, cut_freq):
"""Low-pass filtering on the signal (in place).
**NOTE**: It is assumed that a hand can not move in very high frequency,
so the high frequency components of the signal is filtered.
**NOTE**: This method use NumPy FFT and IFFT. It modifies the signal.
Args:
sample_freq (float): sample frequency of this signal.
cut_freq (float): low pass filtering cutoff frequency.
Returns:
None: No return value.
"""
l = self.length
data = self.data
cut_l = int(cut_freq * l / sample_freq)
dft_co = np.fft.fft(data, l, axis=0)
for i in range(cut_l, l - cut_l):
dft_co[i] = 0 + 0j
ifft_c = np.fft.ifft(dft_co, l, axis=0)
ifft = ifft_c.astype(np.float32)
for i in range(l):
self.data[i] = ifft[i]
get_orientation(self)
Obtain orientation as a series of rotation matrices and quaternions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
None |
None |
No argument. |
required |
Returns:
Type | Description |
---|---|
tuple |
A tuple of the following representing the orientation.
|
Source code in code_fmkit/fmsignal.py
def get_orientation(self):
"""Obtain orientation as a series of rotation matrices and quaternions.
Args:
None (None): No argument.
Returns:
tuple: A tuple of the following representing the orientation.
ndarray: The rotation matrices (l-by-3-by-3).
list: The unit quaternions.
"""
l = self.length
data = self.data
rotms = np.zeros((l, 3, 3))
qs = [None] * l
for i in range(l):
qx = data[i, 9]
qy = data[i, 10]
qz = data[i, 11]
qw = math.sqrt(1 - qx * qx - qy * qy - qz * qz)
q = Quaternion(qw, qx, qy, qz)
qs[i] = q
rotms[i] = q.to_rotation_matrix()
return rotms, qs
load_alignment_index(self, fn, mode)
Load the alignment indicies from a NumPy binary file (.npy).
NOTE: The alignment indicies are the "a2to1_start" and "a2to1_end" attributes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
The file name (without the ".csv" or ".npy" extension). |
required |
mode |
string |
The file format (currently either "csv" or "npy"). |
required |
Returns:
Type | Description |
---|---|
None |
no return value. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the "mode" is unknown. |
FileNotFoundError |
If the file does not exist. |
Source code in code_fmkit/fmsignal.py
def load_alignment_index(self, fn, mode):
"""Load the alignment indicies from a NumPy binary file (.npy).
**NOTE**: The alignment indicies are the "a2to1_start" and "a2to1_end"
attributes.
Args:
fn (string): The file name (without the ".csv" or ".npy" extension).
mode (string): The file format (currently either "csv" or "npy").
Returns:
None: no return value.
Raises:
ValueError: If the "mode" is unknown.
FileNotFoundError: If the file does not exist.
"""
if mode == "csv":
fn += ".csv"
array = np.loadtxt(fn, dtype=np.int32, delimiter=",")
elif mode == "npy":
fn += ".npy"
array = np.load(fn)
assert array.dtype == np.int32
else:
raise ValueError("Unknown mode: " + mode)
self.a2to1_start = array[:, 0]
self.a2to1_end = array[:, 1]
self.length_origin = self.a2to1_end[-1]
load_from_file(self, fn, mode)
Load the signal from a file.
NOTE: This is only for internal usage. If an FMSignal object is needed, use the class method "construct_from_file()" instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
The file name (without the ".csv" or ".npy" extension). |
required |
mode |
string |
The file format (currently either "csv" or "npy"). |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the "mode" is unknown. |
FileNotFoundError |
if the file does not exist. |
Source code in code_fmkit/fmsignal.py
def load_from_file(self, fn, mode):
"""Load the signal from a file.
**NOTE**: This is only for internal usage. If an FMSignal object is
needed, use the class method "construct_from_file()" instead.
Args:
fn (string): The file name (without the ".csv" or ".npy" extension).
mode (string): The file format (currently either "csv" or "npy").
Returns:
None: No return value.
Raises:
ValueError: if the "mode" is unknown.
FileNotFoundError: if the file does not exist.
"""
if mode == "csv":
fn += ".csv"
array = np.loadtxt(fn, dtype=np.float32, delimiter=",")
elif mode == "npy":
fn += ".npy"
array = np.load(fn)
assert array.dtype == np.float32
else:
raise ValueError("Unknown mode: " + mode)
# NOTE: These "copy()" force the "ts" and "data" to have proper C-like
# array in memory, which is crucial for "dtw_c()"!!!
ts = array[:, 0:1].copy()
data = array[:, 1:].copy()
length = data.shape[0]
dim = data.shape[1]
self.length = length
self.dim = dim
self.ts = ts
self.data = data
pertube_amplitude(self, axis, time, s_window, scale, sigma)
Pertube the signal at the specified time.
NOTE: This method is used in data augmentation. It adds a Gaussian shape pertubation signal segment to the original signal. This method changes the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
axis |
int |
The specified axis to pertube. |
required |
time |
int |
The specified time to pertube. |
required |
s_window |
int |
The pertubation smooth window size (greater than 0). |
required |
scale |
float |
The scale factor of the pertubation (pos or neg). |
required |
sigma |
float |
The standard deviation of the Gaussian pertubation. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the input arguments are out of range. |
Source code in code_fmkit/fmsignal.py
def pertube_amplitude(self, axis, time, s_window, scale, sigma):
"""Pertube the signal at the specified time.
**NOTE**: This method is used in data augmentation. It adds a Gaussian
shape pertubation signal segment to the original signal. This method
changes the signal.
Args:
axis (int): The specified axis to pertube.
time (int): The specified time to pertube.
s_window (int): The pertubation smooth window size (greater than 0).
scale (float): The scale factor of the pertubation (pos or neg).
sigma (float): The standard deviation of the Gaussian pertubation.
Returns:
None: No return value.
Raises:
ValueError: If the input arguments are out of range.
"""
l = self.length
if axis < 0 or axis >= self.dim:
raise ValueError('Bad input axis (%d)' % axis)
if s_window <= 0 or time < 0 or time >= l:
raise ValueError('Bad input time (%d)' % time)
pertubation = scipy.signal.gaussian(s_window * 2, sigma) * scale
# Calculate the start indices.
s_data = time - s_window
s_pertube = 0
if s_data < 0:
s_pertube = -s_data
s_data = 0
# Calculate the end indices.
e_data = s_data + s_window * 2
e_pertube = s_window
if e_data > l:
e_pertube -= e_data - l
e_data = l
self.data[s_data:e_data, axis] += pertubation[s_pertube:e_pertube]
pertube_amplitude_seg(self, axis, time, window, s_window, scale, sigma)
Pertube the signal at a specified time for a segment.
NOTE: This method is used in data augmentation. It adds a flat pertubation signal segment to the original signal with Gaussian smooth edges on both sides. This method changes the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
axis |
int |
The specified axis to pertube. |
required |
time |
int |
The specified time to pertube. |
required |
window |
int |
The pertubation window size (greater than 0). |
required |
s_window |
int |
The pertubation smooth window size (greater than 0). |
required |
scale |
float |
The scale factor of the pertubation (pos or neg). |
required |
sigma |
float |
The standard deviation of the Gaussian pertubation. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the input arguments are out of range. |
Source code in code_fmkit/fmsignal.py
def pertube_amplitude_seg(self, axis, time, window, s_window, scale, sigma):
"""Pertube the signal at a specified time for a segment.
**NOTE**: This method is used in data augmentation. It adds a flat
pertubation signal segment to the original signal with Gaussian smooth
edges on both sides. This method changes the signal.
Args:
axis (int): The specified axis to pertube.
time (int): The specified time to pertube.
window (int): The pertubation window size (greater than 0).
s_window (int): The pertubation smooth window size (greater than 0).
scale (float): The scale factor of the pertubation (pos or neg).
sigma (float): The standard deviation of the Gaussian pertubation.
Returns:
None: No return value.
Raises:
ValueError: If the input arguments are out of range.
"""
l = self.length
if axis < 0 or axis >= self.dim:
raise ValueError('Bad input axis: ' + '%d' % axis)
if window <= 0 or s_window <= 0:
raise ValueError('Bad input window (%d) or s_window (%d)'
% (window, s_window))
if time < 0 or time + window >= l:
raise ValueError('Bad input time (%d) or window (%d)'
% (time, window))
pertube_size = window + s_window * 2
pertube_seg = np.ones(pertube_size, dtype=np.float32) * scale
# NOTE: The peak of this Gaussian series is 1.
smooth_edges = scipy.signal.gaussian(s_window * 2, sigma)
se_left = smooth_edges[:s_window]
se_right = smooth_edges[s_window:]
pertube_seg[:s_window] = np.multiply(pertube_seg[:s_window], se_left)
pertube_seg[-s_window:] = np.multiply(pertube_seg[:s_window], se_right)
s_data = time - s_window
s_pertube = 0
if s_data < 0:
s_pertube = -s_data
s_data = 0
e_data = time + window + s_window
e_pertube = pertube_size
if e_data >= l:
e_pertube -= e_data - l
e_data = l
self.data[s_data:e_data, axis] += pertube_seg[s_pertube:e_pertube]
resize(self, length_new)
Resize the signal to the specified new length.
NOTE: This method stretches the signal in time by linear interpolation. It is designed for temporal normalization. This method modifies the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
length_new |
int |
The specified new length. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the input arguments are out of range. |
Source code in code_fmkit/fmsignal.py
def resize(self, length_new):
"""Resize the signal to the specified new length.
**NOTE**: This method stretches the signal in time by linear
interpolation. It is designed for temporal normalization. This method
modifies the signal.
Args:
length_new (int): The specified new length.
Returns:
None: No return value.
Raises:
ValueError: If the input arguments are out of range.
"""
if length_new < 0:
raise ValueError('Bad new length (%d).' % length_new)
l = self.length
d = self.dim
data_new = np.zeros((length_new, d), np.float32)
ts_new = np.zeros((length_new), np.float32)
data = self.data
ts = self.ts
xp = np.linspace(0, l - 1, num=l)
x = np.linspace(0, l - 1, num=length_new)
for j in range(d):
data_new[:, j] = np.interp(x, xp, data[:, j])
ts_new[:] = np.interp(x, xp, ts)
self.length = length_new
self.ts = ts_new
self.data = data_new
resize_segment(self, start, window, seg_length_new)
Resize a segment of the signal and keep other parts untouched.
NOTE: This method is used in data augmentation. This method changes the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start |
int |
The start of the segment to stretch. |
required |
window |
int |
The stretch window size (greater than 0). |
required |
seg_length_new |
int |
The specified new segment length. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the input arguments are out of range. |
Source code in code_fmkit/fmsignal.py
def resize_segment(self, start, window, seg_length_new):
"""Resize a segment of the signal and keep other parts untouched.
**NOTE**: This method is used in data augmentation. This method changes
the signal.
Args:
start (int): The start of the segment to stretch.
window (int): The stretch window size (greater than 0).
seg_length_new (int): The specified new segment length.
Returns:
None: No return value.
Raises:
ValueError: If the input arguments are out of range.
"""
end = start + window
if window <= 0:
raise ValueError('Bad window size (%d).' % window)
if seg_length_new < 0:
raise ValueError('Bad new segment length (%d).' % seg_length_new)
if start < 0 or end >= self.length:
raise ValueError('Bad input start (%d) or window (%d).'
% (start, window))
sample_period = self.ts[1] - self.ts[0]
l = self.length
d = self.dim
seg_l = end - start
l_new = self.length - seg_l + seg_length_new
delta_new = seg_l / seg_length_new
seg_start_new = start
seg_end_new = seg_start_new + seg_length_new
data_new = np.zeros((l_new, d), np.float32)
ts_new = np.zeros((l_new, 1), np.float32)
data = self.data
# TODO: rewrite this with array splice and linear interpolation
data_new[0:start] = data[0:start]
data_new[seg_end_new:l_new] = data[end:l]
for i in range(seg_length_new):
it = start + i * delta_new
ii_old = int(math.floor(it))
ii_new = start + i
dt = it - ii_old
# print(i, delta_new, it, ii)
for v in range(d):
rate = (
data[ii_old][v] - data[ii_old][v]
if ii_old + 1 < l
else data[ii_old][v] - data[ii_old - 1][v]
)
data_new[ii_new][v] = data[ii_old][v] + rate * dt
for i in range(l_new):
ts_new[i] = sample_period * i
self.length = l_new
self.dim = d
self.ts = ts_new
self.data = data_new
save_alignment_index(self, fn, mode)
Save the alignment indicies to a NumPy binary file (.npy).
NOTE: The alignment indicies are the "a2to1_start" and "a2to1_end" attributes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
file name (without the ".csv" or ".npy" extension). |
required |
mode |
string |
The file format (currently either "csv" or "npy"). |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def save_alignment_index(self, fn, mode):
"""Save the alignment indicies to a NumPy binary file (.npy).
**NOTE**: The alignment indicies are the "a2to1_start" and "a2to1_end"
attributes.
Args:
fn (string): file name (without the ".csv" or ".npy" extension).
mode (string): The file format (currently either "csv" or "npy").
Returns:
None: No return value.
"""
l = self.length
column_1 = self.a2to1_start.reshape((l, 1))
column_2 = self.a2to1_end.reshape((l, 1))
array = np.concatenate((column_1, column_2), axis=1)
if mode == "csv":
fn += ".csv"
np.savetxt(fn, array, fmt="%d", delimiter=", ")
elif mode == "npy":
# NOTE: NumPy library add the ".npy" file extension for us!
np.save(fn, array)
else:
raise ValueError("Unknown mode: " + mode)
save_to_file(self, fn, mode)
Save the signal to a file.
NOTE: Only six digits after the decimal point are kept when floating point numbers are converted to CSV strings.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
file name (without the ".csv" or ".npy" extension). |
required |
mode |
string |
The file format (currently either "csv" or "npy"). |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def save_to_file(self, fn, mode):
"""Save the signal to a file.
**NOTE**: Only six digits after the decimal point are kept when floating
point numbers are converted to CSV strings.
Args:
fn (string): file name (without the ".csv" or ".npy" extension).
mode (string): The file format (currently either "csv" or "npy").
Returns:
None: No return value.
"""
l = self.length
array = np.concatenate((self.ts.reshape((l, 1)), self.data), axis=1)
if mode == "csv":
fn += ".csv"
np.savetxt(fn, array, fmt="%.6f", delimiter=", ")
elif mode == "npy":
assert array.dtype == np.float32
# NOTE: NumPy library add the ".npy" file extension for us!
np.save(fn, array)
else:
raise ValueError("Unknown mode: " + mode)
shift_temporally(self, shift)
Shift the signal in time.
NOTE: This method is used in data augmentation. This method modifies the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
shift |
int |
The specified shift in time. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def shift_temporally(self, shift):
"""Shift the signal in time.
**NOTE**: This method is used in data augmentation. This method modifies
the signal.
Args:
shift (int): The specified shift in time.
Returns:
None: No return value.
"""
data = self.data
l = self.length
d = self.dim
xp = np.arange(0, l, 1, dtype=np.float32)
x = xp + shift
data_shift = np.zeros((l, d), dtype=np.float32)
# TODO: Rewrite this using array splice.
for j in range(d):
data_shift[:, j] = np.interp(x, xp, data[:, j])
self.data = data_shift
stretch_axis(self, axis, scale)
Stretch along a certain sensor axis.
NOTE: This method is used in data augmentation. This method modifies the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
axis |
int |
The specified sensor axis to stretch. |
required |
scale |
float |
The intensity of the stretching. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the input arguments are out of range. |
Source code in code_fmkit/fmsignal.py
def stretch_axis(self, axis, scale):
"""Stretch along a certain sensor axis.
**NOTE**: This method is used in data augmentation. This method modifies
the signal.
Args:
axis (int): The specified sensor axis to stretch.
scale (float): The intensity of the stretching.
Returns:
None: No return value.
Raises:
ValueError: If the input arguments are out of range.
"""
if axis < 0 or axis >= self.dim:
raise ValueError('Bad axis (%d).' % axis)
l = self.length
offset_v = (np.arange(0, l, 1) - l / 2) * scale
self.data[:, axis] += offset_v
swap_segment(self, other, start, window, s_window)
Swap a segment of this signal with another signal.
NOTE: This method is used in data augmentation. It generates two new signals, which are the signals after swapping. It is done on all axes. This method does not modify this signal or the other signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
FMSignal |
The other signal. |
required |
start |
int |
The start of the segment to swap. |
required |
window |
int |
The pertubation window size (greater than 0). |
required |
s_window |
int |
The pertubation smooth window size (greater than 0). |
required |
Returns:
Type | Description |
---|---|
tuple |
A tuple containing the following.
|
Exceptions:
Type | Description |
---|---|
ValueError |
If the input arguments are out of range. |
Source code in code_fmkit/fmsignal.py
def swap_segment(self, other, start, window, s_window):
"""Swap a segment of this signal with another signal.
**NOTE**: This method is used in data augmentation. It generates two
new signals, which are the signals after swapping. It is done on all
axes. This method does not modify this signal or the other signal.
Args:
other (FMSignal): The other signal.
start (int): The start of the segment to swap.
window (int): The pertubation window size (greater than 0).
s_window (int): The pertubation smooth window size (greater than 0).
Returns:
tuple: A tuple containing the following.
FMSignal: This signal after swapping.
FMSignal: The other signal after swapping.
Raises:
ValueError: If the input arguments are out of range.
"""
end = start + window
l_min = min(self.length, other.length)
if window <= 0 or s_window <= 0:
raise ValueError('Bad input window (%d) or s_window (%d)'
% (window, s_window))
if start < 0 or end >= l_min:
raise ValueError('Bad input start (%d) or window (%d)'
% (start, window))
a = self.copy()
b = other.copy()
data_a = a.data
ts_a = a.ts
data_b = b.data
ts_b = b.ts
as1 = data_a[:start, :]
as2 = data_a[start:end, :]
as3 = data_a[end:, :]
bs1 = data_b[:start, :]
bs2 = data_b[start:end, :]
bs3 = data_b[end:, :]
ats1 = ts_a[:start]
ats2 = ts_a[start:end]
ats3 = ts_a[end:]
bts1 = ts_b[:start]
bts2 = ts_b[start:end]
bts3 = ts_b[end:]
data_a_new = np.concatenate([as1, bs2, as3], axis=0)
ts_a_new = np.concatenate([ats1, bts2, ats3], axis=0)
l_a = data_a_new.shape[0]
data_b_new = np.concatenate([bs1, as2, bs3], axis=0)
ts_b_new = np.concatenate([bts1, ats2, bts3], axis=0)
l_b = data_b_new.shape[0]
# Smooth the segment edges with linear interpolation.
# TODO: Rewrite this using NumPy vectorized operation.
if s_window != 0:
# Left margin
ml_start = max(0, start - s_window)
ml_end = min(start + s_window, min(l_a, l_b))
ml_length = ml_end - ml_start
ml_factor = np.linspace(1, 0, ml_length,
endpoint=True, dtype=np.float32)
# print(margin1_start, margin1_end)
for i, ii in zip(range(ml_start, ml_end), range(ml_length)):
data_a_new[i] = ml_factor[ii] * data_a[i] \
+ (1 - ml_factor[ii]) * data_b[i]
data_b_new[i] = ml_factor[ii] * data_b[i] \
+ (1 - ml_factor[ii]) * data_a[i]
# Right margin
mr_start = max(0, end - s_window)
mr_end = min(end + s_window, min(l_a, l_b))
mr_length = mr_end - mr_start
mr_factor = np.linspace(1, 0, mr_end - mr_start,
endpoint=True, dtype=np.float32)
# print(margin2_start, margin2_end)
for i, ii in zip(range(mr_start, mr_end), range(mr_length)):
data_a_new[i] = mr_factor[ii] * data_b[i] \
+ (1 - mr_factor[ii]) * data_a[i]
data_b_new[i] = mr_factor[ii] * data_a[i] \
+ (1 - mr_factor[ii]) * data_b[i]
a.data = data_a_new
a.ts = ts_a_new
a.len = l_a
b.data = data_b_new
b.ts = ts_b_new
b.len = l_b
return a, b
FMSignalDesc
A descriptor of a set of signals of the same writing content.
Typically, these signals are generated by users or imposters writing the same content in multiple repetitions. If the contents are different, they are considered as different set of signals with different descriptors.
NOTE: One descriptor object corresponds to one line in the meta file.
Attributes:
Name | Type | Description |
---|---|---|
uid |
int |
A unique ID that orders all descriptors in a collections. Usually it is the line number of the descriptor in the meta file. |
user |
string |
A label indicating which user generates the signals. |
cid |
string |
A label indicating the content for classification tasks. Note that for spoofing attacks, i.e., different users are asked to write the same content, the signals have the same id_label. This label is used as the account ID for user identification and authentication purpose, and hence it gets the name "id_label". |
device |
string |
This indicates which type of device is used to obtain the signals. Currently it is either "glove" or "leap". |
start |
int |
The start repetition sequence number (inclusive). |
end |
int |
The end repetition sequence number (exclusive). These sequence numbers allow the database to only load a specified section of the data, which is very useful when spliting the dataset into training and testing set or dealing with data augmentation. |
fn_prefix |
string |
File name prefix, typically "user_cid". The full file name is "fn_prefix" + "_" + "seq" + ".csv" or ".npy", where "seq" indicate the specific repetition. |
content |
string |
The actual content that is written. |
__init__(self, uid, user, cid, device, start, end, fn_prefix, content)
special
Constructor.
See the class attributes for the meaning of the arguments.
Source code in code_fmkit/fmsignal.py
def __init__(self, uid, user, cid,
device, start, end, fn_prefix, content):
"""Constructor.
See the class attributes for the meaning of the arguments.
"""
self.uid = uid
self.user = user
self.cid = cid
self.device = device
self.start = start
self.end = end
self.fn_prefix = fn_prefix
self.content = content
__str__(self)
special
Convert a descriptor to a human readable string representation.
NOTE: Currently it is "uid \t user \t cid \t device".
Source code in code_fmkit/fmsignal.py
def __str__(self):
"""Convert a descriptor to a human readable string representation.
**NOTE**: Currently it is "uid \\t user \\t cid \\t device".
"""
return "%8d\t%20s\t%20s\t%8s\t" % (
self.uid,
self.user,
self.cid,
self.device,
)
construct_from_meta_file(meta_fn)
classmethod
Factory method to build list of FMSignalDesc from a metadata file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
meta_fn |
string |
The metadata file name. |
required |
Returns:
Type | Description |
---|---|
list |
a list of descriptor objects. |
NOTE: The meta file contains a table with the following structure, (columns are seperated by commas):
column | type | content |
---|---|---|
0 | int | uid |
1 | string | user |
2 | string | cid |
3 | string | device |
4 | int | start repetition sequence number |
5 | int | end repetition sequence number |
6 | string | file name prefix |
7 | string | content |
Typically, the file name prefix field is just "user_label_id_label".
See the "data_example" and "meta_example" folders for examples.
Source code in code_fmkit/fmsignal.py
@classmethod
def construct_from_meta_file(cls, meta_fn):
"""Factory method to build list of FMSignalDesc from a metadata file.
Args:
meta_fn (string): The metadata file name.
Returns:
list: a list of descriptor objects.
**NOTE**: The meta file contains a table with the following structure,
(columns are seperated by commas):
column | type | content
-------|---------|--------------
0 | int | uid
1 | string | user
2 | string | cid
3 | string | device
4 | int | start repetition sequence number
5 | int | end repetition sequence number
6 | string | file name prefix
7 | string | content
Typically, the file name prefix field is just "user_label_id_label".
See the "data_example" and "meta_example" folders for examples.
"""
descs = []
with open(meta_fn, "r") as meta_fd:
reader = csv.reader(meta_fd)
for row in reader:
if len(row) == 0:
continue
if row[0].startswith("#"):
continue
strs = []
for column in row:
strs.append(column.lstrip().rstrip())
desc = cls(
int(strs[0]),
strs[1],
strs[2],
strs[3],
int(strs[4]),
int(strs[5]),
strs[6],
strs[7],
)
descs.append(desc)
return descs
FMSignalGlove
Raw finger motion signal collected by the data glove.
This class represents the raw signal obtained from the data glove. The glove uses two BNO055 Inertial Measurement Units (IMU), one on the tip of the index finger and the other on the tip of the thumb.
Attributes:
Name | Type | Description |
---|---|---|
length |
int |
The number of samples in this signal. |
ts |
ndarray |
Timestamp obtained from the device (1D vector). |
tsc |
ndarray |
Timestamp obtained from the client computer (1D vector). |
acc0 |
ndarray |
The linear acceleration obtained from the first IMU. |
gyro0 |
ndarray |
The angular speed obtained from the first IMU. |
gravity0 |
ndarray |
The gravity vector obtained from the first IMU. |
acc1 |
ndarray |
The linear acceleration obtained from the second IMU. |
gyro1 |
ndarray |
The angular speed obtained from the second IMU. |
gravity1 |
ndarray |
The gravity vector obtained from the second IMU. |
data |
ndarray |
The data after preprocessing. |
rotms |
ndarray |
The orientation of each smple in rotation matrices. |
qs |
list |
The orientation of each smple in unit quaternions. |
trajectory |
ndarray |
The motion trajectory of the selected IMU. |
NOTE: The raw sensor data such as "acc0" is a length * 3 matrix.
NOTE: "data", "rotms", "qs", and "trajectory" are only available after preprocessing. Since a preprocessed signal (i.e., FMSignal object) contains the motion of only one point, in the preprocessing procedure, one IMU must be selected (i.e., by the "point" argument of "preprocess()"). The "rotms" is a length * 3 * 3 tensor. The "qs" is a list of Quaternion objects from the "pyrotation" module. The "trajectory" is a length * 3 matrix.
NOTE: Although the BNO055 IMU can provide absolute orientation fused by the accelerometer, gyroscope. Additionally, it can provide linear acceleration with gravity removed. In our case, The IMU is set to NDOF mode (default mode, see BNO055 datasheet section 3.3). We use the linear acceleration but not the absolute orientation. Instead, we use a simple method to derive the orientation by integrating the angular speed. Since the signal usually has only a few seconds, this simple method is good enough. The raw file contains additional columns. See the "Data Format Details" document for more information.
NOTE: The raw signal file may have two different formats, i.e., either in Comma Separated Value (".csv") or in NumPy binary format (".npy"). However, the content structure is the same, which is essentially a matrix, where the rows are the samples at a certain time and the columns are the data from a specific sensor axis. See the "Data Format Details" document for more information. There is another format called "raw_internal", which is used to resolve format issues in the data collected at the early stage of this project. It is now obsolete.
__init__(self, user='', cid='', seq=0)
special
Constructor.
See the attributes of FMSignal for the meaning of the arguments.
NOTE: This is only for internal usage. If an FMSignalGlove object is needed, use the class method "construct_from_file()" to load data from a file, or use the "copy()" method to duplicate a signal.
Source code in code_fmkit/fmsignal.py
def __init__(self, user="", cid="", seq=0):
"""Constructor.
See the attributes of FMSignal for the meaning of the arguments.
**NOTE**: This is only for internal usage. If an FMSignalGlove object is
needed, use the class method "construct_from_file()" to load data from
a file, or use the "copy()" method to duplicate a signal.
"""
self.user = user
self.cid = cid
self.seq = seq
self.length = 0
self.ts = None
self.tsc = None
self.acc0 = None
self.gyro0 = None
self.gravity0 = None
self.acc1 = None
self.gyro1 = None
self.gravity1 = None
self.data = None
self.qs = None
self.rotms = None
self.trajectory = None
all_close_to(self, signal)
Check whether this signal is almost identical to the other signal.
NOTE: The criteria of "identical" is defined by "np.allclose()". The two signals must have the same type and length.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
signal |
FMSignalGlove |
The other signal to compare. |
required |
Returns:
Type | Description |
---|---|
bool |
True if they are almost identical; False otherwise. |
Source code in code_fmkit/fmsignal.py
def all_close_to(self, signal):
"""Check whether this signal is almost identical to the other signal.
**NOTE**: The criteria of "identical" is defined by "np.allclose()".
The two signals must have the same type and length.
Args:
signal (FMSignalGlove): The other signal to compare.
Returns:
bool: True if they are almost identical; False otherwise.
"""
if not isinstance(signal, FMSignalGlove):
return False
if self.length != signal.length:
return False
# NOTE: The CSV format only stores six digits after the decimal point.
# Hence, "atol" can not be smaller than 1e-6.
r1 = np.allclose(self.ts, signal.ts, atol=1e-6)
r2 = np.allclose(self.tsc, signal.tsc, atol=1e-6)
r3 = np.allclose(self.acc0, signal.acc0, atol=1e-6)
r4 = np.allclose(self.gyro0, signal.gyro0, atol=1e-6)
r5 = np.allclose(self.gravity0, signal.gravity0, atol=1e-6)
r6 = np.allclose(self.acc1, signal.acc1, atol=1e-6)
r7 = np.allclose(self.gyro1, signal.gyro1, atol=1e-6)
r8 = np.allclose(self.gravity1, signal.gravity1, atol=1e-6)
return r1 and r2 and r3 and r4 and r5 and r6 and r7 and r8
construct_from_file(fn, mode, user='', cid='', seq=0)
classmethod
Construct a signal by loading data from a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
The file name (without extension). |
required |
mode |
string |
The file format, either "raw_csv", or "raw_npy". |
required |
user |
string |
The user who creates this signal. |
'' |
cid |
string |
The unique id indicating the content of the signal. |
'' |
seq |
int |
The sequence id in a set when loaded from a dataset. |
0 |
Returns:
Type | Description |
---|---|
FMSignalGlove |
The constructed signal object. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the "mode" is wrong. |
FileNotFoundError |
If the file does not exist. |
Source code in code_fmkit/fmsignal.py
@classmethod
def construct_from_file(cls, fn, mode, user="", cid="", seq=0):
"""Construct a signal by loading data from a file.
Args:
fn (string): The file name (without extension).
mode (string): The file format, either "raw_csv", or "raw_npy".
user (string): The user who creates this signal.
cid (string): The unique id indicating the content of the signal.
seq (int): The sequence id in a set when loaded from a dataset.
Returns:
FMSignalGlove: The constructed signal object.
Raises:
ValueError: If the "mode" is wrong.
FileNotFoundError: If the file does not exist.
"""
signal = cls(user, cid, seq)
signal.load_from_file(fn, mode)
return signal
convert_axes_to_standard_glove(self, axes)
Convert a set of xyz data series to the standard glove axes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
axes |
ndarray |
The original data series. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
NOTE: This method is designed to handle "raw_internal" signals only.
Source code in code_fmkit/fmsignal.py
def convert_axes_to_standard_glove(self, axes):
"""Convert a set of xyz data series to the standard glove axes.
Args:
axes (ndarray): The original data series.
Returns:
None: No return value.
**NOTE**: This method is designed to handle "raw_internal" signals only.
"""
n = axes.shape[0]
temp = np.zeros((n, 1), np.float32)
# x <= -y', y <= x'
# (x, y) is standard glove. (x', y') is glove2 or glove3.
temp[:, 0] = axes[:, 0]
axes[:, 0] = -axes[:, 1]
axes[:, 1] = temp[:, 0]
convert_to_standard_glove(self)
Convert raw data columns to standard glove reference frame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
none |
None |
No arguments. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
NOTE: This method is designed to handle "raw_internal" signals only.
Source code in code_fmkit/fmsignal.py
def convert_to_standard_glove(self):
"""Convert raw data columns to standard glove reference frame.
Args:
none (None): No arguments.
Returns:
None: No return value.
**NOTE**: This method is designed to handle "raw_internal" signals only.
"""
self.convert_axes_to_standard_glove(self.acc0)
self.convert_axes_to_standard_glove(self.gyro0)
self.convert_axes_to_standard_glove(self.gravity0)
self.convert_axes_to_standard_glove(self.acc1)
self.convert_axes_to_standard_glove(self.gyro1)
self.convert_axes_to_standard_glove(self.gravity1)
estimate_angular_states(self, point)
Estimate orientation, angular speed, and angular acceleration.
NOTE: Orientations are represented as the qx, qy, and qz components of a unit quaternion. For the data glove device, the angular speed is directly obtained from the sensor, and the angular po
NOTE: Currently the beginning of the signal is used as the initial pose and the integration goes through the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
point |
string |
The point on the hand, either "tip" or "center". |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def estimate_angular_states(self, point):
"""Estimate orientation, angular speed, and angular acceleration.
**NOTE**: Orientations are represented as the qx, qy, and qz
components of a unit quaternion. For the data glove device, the angular
speed is directly obtained from the sensor, and the angular po
**NOTE**: Currently the beginning of the signal is used as the initial
pose and the integration goes through the signal.
Args:
point (string): The point on the hand, either "tip" or "center".
Returns:
None: No return value.
"""
data = self.data
l = self.length
if point == "tip":
gyro = self.gyro0
elif point == "center":
gyro = self.gyro1
else:
raise ValueError("Unknown point type: %s" % str(point))
q = Quaternion.identity()
# rotation matrix of each data sample
rotms = np.zeros((l, 3, 3))
# quaternion of each data sample
qs = [None] * l
# Given 50 Hz, one timestep is 20 ms, i.e., 0.02 second
timestep = 0.02
for i in range(0, l):
qs[i] = q
rotm = q.to_rotation_matrix()
rotms[i] = rotm
data[i, 9] = q.x
data[i, 10] = q.y
data[i, 11] = q.z
# z, y, x = rotation_matrix_to_euler_angles_zyx(rotm)
# yaw, pitch, roll, gimbal_lock \
# = rotation_matrix_to_euler_angles_zyx(rotm)
# data[i, 9] = roll
# data[i, 10] = pitch
# data[i, 11] = yaw
# assert not gimbal_lock
# NOTE: Gyro output is in rad/s
omega = gyro[i]
q = Quaternion.integrate_local(q, omega, timestep)
q.normalize()
data[:, 12:15] = gyro
data[:, 15] = np.gradient(data[:, 12])
data[:, 16] = np.gradient(data[:, 13])
data[:, 17] = np.gradient(data[:, 14])
self.data = data
self.rotms = rotms
self.qs = qs
estimate_linear_states(self, point, wv=0.2, wp=5)
Estimate position, velocity and acceleration.
NOTE: For the data glove device, the linear acceleration is directly obtained from the sensor. The position and velocity are derived by integrating the linear acceleration.
NOTE: This method depends on the orientation. Hence, it must be called after "estimate_angular_states()"
Source code in code_fmkit/fmsignal.py
def estimate_linear_states(self, point, wv=0.2, wp=5):
"""Estimate position, velocity and acceleration.
**NOTE**: For the data glove device, the linear acceleration is directly
obtained from the sensor. The position and velocity are derived by
integrating the linear acceleration.
**NOTE**: This method depends on the orientation. Hence, it must be
called after "estimate_angular_states()"
"""
data = self.data
l = self.length
if point == "tip":
acc = self.acc0
elif point == "center":
acc = self.acc1
else:
raise ValueError("Unknown point type: %s" % str(point))
trajectory = np.zeros((l, 3))
qs = self.qs
# given 50 Hz, one timestep is 20 ms, i.e., 0.02 second
timestep = 0.02
# dead reckoning using linear acceleration
p = np.array((0, 0, 0))
v_m = np.array((0, 0, 0))
a_m = np.array((0, 0, 0))
pp = np.array((200, 0, 0))
for i in range(1, l):
acc_local = acc[i]
q = qs[i]
a_m = q.rotate_a_point(acc_local).reshape(3)
pp_i = q.rotate_a_point(pp).reshape(3)
trajectory[i, 0:3] = pp_i
v_m = v_m + a_m * timestep
# Now we add a correction term
u = np.multiply(v_m, np.abs(v_m))
v_m = v_m - wv * u
p = p + v_m * timestep + 0.5 * a_m * timestep * timestep
#print(p)
# Now we add a similar correction term.
w = np.multiply(p, np.abs(p))
p = p - wp * w
data[i, 6:9] = a_m
data[i, 3:6] = v_m
data[i, 0:3] = p * 1000 # position is in mm
trajectory[:, 0] -= 200
self.trajectory = trajectory
filter(self, sample_freq, cut_freq)
Low-pass filtering on the signal.
NOTE: It is assumed that a hand can not move in very high frequency, so the high frequency components of the signal is filtered. This method uses NumPy FFT and IFFT.
NOTE: This method is only used in the preprocessing procedure. This method modifies the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sample_freq |
float |
sample frequency of this signal. |
required |
cut_freq |
float |
low pass filtering cutoff frequency. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def filter(self, sample_freq, cut_freq):
"""Low-pass filtering on the signal.
**NOTE**: It is assumed that a hand can not move in very high frequency,
so the high frequency components of the signal is filtered. This method
uses NumPy FFT and IFFT.
**NOTE**: This method is only used in the preprocessing procedure.
This method modifies the signal.
Args:
sample_freq (float): sample frequency of this signal.
cut_freq (float): low pass filtering cutoff frequency.
Returns:
None: No return value.
"""
l = self.length
data = self.data
cut_l = int(cut_freq * l / sample_freq)
dft_co = np.fft.fft(data, l, axis=0)
for i in range(cut_l, l - cut_l):
dft_co[i] = 0 + 0j
ifft_c = np.fft.ifft(dft_co, l, axis=0)
# NOTE: This must be converted to "np.float32"!
ifft = ifft_c.real.astype(np.float32)
for i in range(l):
self.data[i] = ifft[i]
load_from_buffer_raw(self, array)
Load data from a NumPy ndarray.
NOTE: This is only for internal usage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
array |
ndarray |
The buffer. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def load_from_buffer_raw(self, array):
"""Load data from a NumPy ndarray.
**NOTE**: This is only for internal usage.
Args:
array (ndarray): The buffer.
Returns:
None: No return value.
"""
l = array.shape[0]
m = array.shape[1]
assert m == 20, "Wrong raw file format: m = %d" % m
self.length = l
self.tsc = array[:, 0].flatten()
self.ts = array[:, 1].flatten()
self.acc0 = array[:, 2:5]
self.gyro0 = array[:, 5:8]
self.gravity0 = array[:, 8:11]
self.acc1 = array[:, 11:14]
self.gyro1 = array[:, 14:17]
self.gravity1 = array[:, 17:20]
load_from_buffer_raw_internal(self, array)
Load from a file directly obtained by the data collection client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
array |
ndarray |
The buffer, which is a numpy ndarrary. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
CAUTION: Do not use this method directly. Instead, uses the class method FMSignalGlove.construct_from_file() instead.
Source code in code_fmkit/fmsignal.py
def load_from_buffer_raw_internal(self, array):
"""Load from a file directly obtained by the data collection client.
Args:
array (ndarray): The buffer, which is a numpy ndarrary.
Returns:
None: No return value.
CAUTION: Do not use this method directly. Instead, uses the class
method FMSignalGlove.construct_from_file() instead.
"""
l = array.shape[0]
m = array.shape[1]
self.length = l
if m == 25:
# NOTE: This is the early format, with only one timestamp.
ts = array[:, 0].flatten()
tsc = np.zeros(l, dtype=np.float32)
acc0 = array[:, 1:4]
gyro0 = array[:, 4:7]
gravity0 = array[:, 7:10]
acc1 = array[:, 13:16]
gyro1 = array[:, 16:19]
gravity1 = array[:, 19:22]
elif m == 34:
# NOTE: This is the current format, with two timestamps.
ts = array[:, 1].flatten()
tsc = array[:, 0].flatten()
# Use an offset to reduce the size of tsc so that it can fit into
# the float32 type.
tsc -= 1514764800
acc0 = array[:, 2:5]
gyro0 = array[:, 5:8]
gravity0 = array[:, 11:14]
acc1 = array[:, 18:21]
gyro1 = array[:, 21:24]
gravity1 = array[:, 27:30]
else:
raise ValueError("Unknown format: m = %d" % m)
# Fix any timestamp anormaly
# NOTE: The device timestamps in glove data are in millisecond.
for i in range(l - 1):
if ts[i + 1] < ts[i]:
ts[i + 1] = ts[i] + 20
self.ts = ts.astype(np.float32)
self.tsc = tsc.astype(np.float32)
self.acc0 = acc0.astype(np.float32)
self.gyro0 = gyro0.astype(np.float32)
self.gravity0 = gravity0.astype(np.float32)
self.acc1 = acc1.astype(np.float32)
self.gyro1 = gyro1.astype(np.float32)
self.gravity1 = gravity1.astype(np.float32)
load_from_file(self, fn, mode)
General interface to load the raw signal from a file.
NOTE: This is only for internal usage. If an FMSignalLeap object is needed, use the class method "construct_from_file()" instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
The file name (without the ".csv" or ".npy" extension). |
required |
mode |
string |
The file format, either "raw_csv", or "raw_npy". |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the "mode" is unknown. |
FileNotFoundError |
if the file does not exist. |
Source code in code_fmkit/fmsignal.py
def load_from_file(self, fn, mode):
"""General interface to load the raw signal from a file.
**NOTE**: This is only for internal usage. If an FMSignalLeap object is
needed, use the class method "construct_from_file()" instead.
Args:
fn (string): The file name (without the ".csv" or ".npy" extension).
mode (string): The file format, either "raw_csv", or "raw_npy".
Returns:
None: No return value.
Raises:
ValueError: if the "mode" is unknown.
FileNotFoundError: if the file does not exist.
"""
if mode == "raw_internal":
fn += ".txt"
# NOTE: The "dtype" needs to be np.float64 for the raw timestamps!
array = np.loadtxt(fn, np.float64, delimiter=",")
self.load_from_buffer_raw_internal(array)
elif mode == "raw_csv":
fn += ".csv"
array = np.loadtxt(fn, np.float32, delimiter=",")
self.load_from_buffer_raw(array)
elif mode == "raw_npy":
fn += ".npy"
array = np.load(fn)
assert array.dtype == np.float32
self.load_from_buffer_raw(array)
else:
raise ValueError("Unknown file mode %s!" % mode)
pose_normalize(self, start, end, p_yaw=0, p_pitch=0, p_roll=0)
Normalize the position and orientation of the signal.
NOTE: This method expect a start index and an end index, where the signal segment between the start and the end is used to calculate the average pointing direction. Hence, it should be called after "prepare_trim_by_xxx()".
NOTE: This normalization step depends on the orientation. Hence, it must be called after "estimate_angular_states()"
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start |
int |
The start time index of the signal for normalization. |
required |
end |
int |
The end time index of the signal for normalization. |
required |
p_yaw |
float |
The pertubation yaw angle in radian. |
0 |
p_pitch |
float |
The pertubation pitch angle in radian. |
0 |
p_roll |
float |
The pertubation roll angle in radian. |
0 |
Returns:
Type | Description |
---|---|
None |
No return value. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the "point" is wrong or the "start" or "end" is bad. |
Source code in code_fmkit/fmsignal.py
def pose_normalize(self, start, end, p_yaw=0, p_pitch=0, p_roll=0):
"""Normalize the position and orientation of the signal.
**NOTE**: This method expect a start index and an end index, where the
signal segment between the start and the end is used to calculate the
average pointing direction. Hence, it should be called after
"prepare_trim_by_xxx()".
**NOTE**: This normalization step depends on the orientation. Hence,
it must be called after "estimate_angular_states()"
Args:
start (int): The start time index of the signal for normalization.
end (int): The end time index of the signal for normalization.
p_yaw (float): The pertubation yaw angle in radian.
p_pitch (float): The pertubation pitch angle in radian.
p_roll (float): The pertubation roll angle in radian.
Returns:
None: No return value.
Raises:
ValueError: If the "point" is wrong or the "start" or "end" is bad.
"""
data = self.data
rotms = self.rotms
qs = self.qs
vi = np.array((1.0, 0, 0), np.float32).reshape((3, 1))
# vj = np.array((0, 1.0, 0), np.float32).reshape((3, 1))
vk = np.array((0, 0, 1.0), np.float32).reshape((3, 1))
# find the approximated average pointing direction as vx
# find the approximated average downward direction as vz
vxs = np.matmul(rotms[start:end], vi)
vzs = np.matmul(rotms[start:end], vk)
vx = np.mean(vxs, axis=0).flatten()
vz = np.mean(vzs, axis=0).flatten()
vx = vx / np.linalg.norm(vx)
vz = vz / np.linalg.norm(vz)
vy = np.cross(vz, vx)
vy = vy / np.linalg.norm(vy)
vz = np.cross(vx, vy)
vz = vz / np.linalg.norm(vz)
vx = vx.reshape((3, 1))
vy = vy.reshape((3, 1))
vz = vz.reshape((3, 1))
R_g2l = np.concatenate((vx, vy, vz), axis=1)
normalize_rotation_matrix(R_g2l)
R_l2g = R_g2l.transpose()
R_offset = euler_zyx_to_rotation_matrix(p_yaw, p_pitch, p_roll)
R_l2g = np.matmul(R_offset, R_l2g)
for i in range(self.length):
rotm = np.matmul(R_l2g, rotms[i])
rotm = normalize_rotation_matrix(rotm)
q = Quaternion.construct_from_rotation_matrix(rotm)
rotms[i] = rotm
qs[i] = q
# Now we derive the three Tait-Bryan angles
# CAUTION: yaw first, then pitch, then roll
# i.e., z-y-x intrinsic rotation (east, north, sky)
# yaw, pitch, roll, gimbal_lock \
# = rotation_matrix_to_euler_angles_zyx(rotm)
#
# data[i, 9] = roll
# data[i, 10] = pitch
# data[i, 11] = yaw
# Tait-Bryan angles have singularity.
# Use quaternion components instead.
data[i, 9] = q.x
data[i, 10] = q.y
data[i, 11] = q.z
# verify using the gravity vector
# self.data[i, 3:6] = np.matmul(rotms[i], gravity0[i].reshape((3, 1))).reshape(3)
# apply pose offset on angular speed
ov = data[:, 12:15].T
ov = np.matmul(R_offset, ov)
data[:, 12:15] = ov.T
data[:, 15] = np.gradient(data[:, 12])
data[:, 16] = np.gradient(data[:, 13])
data[:, 17] = np.gradient(data[:, 14])
prepare_trim_by_acc(self, point, threshold)
Determine the indicies for the start and end of the finger motion.
NOTE: The trimming process is split into two parts to accommodate other preprocessing steps. This method is the first part, which returns the start and end indices without actually throwing away the data samples when the hand is not moving.
NOTE: This method is only used in the preprocessing procedure. This method DOES NOT modify the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
point |
string |
The point on the hand, either "tip" or "center". |
required |
threshold |
float |
Threshold to detect the hand motion. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
NOTE: The threshold is relative. Typically some value between 0.2 to 0.5 would be good enough.
Source code in code_fmkit/fmsignal.py
def prepare_trim_by_acc(self, point, threshold):
"""Determine the indicies for the start and end of the finger motion.
**NOTE**: The trimming process is split into two parts to accommodate
other preprocessing steps. This method is the first part, which returns
the start and end indices without actually throwing away the data
samples when the hand is not moving.
**NOTE**: This method is only used in the preprocessing procedure.
This method **DOES NOT** modify the signal.
Args:
point (string): The point on the hand, either "tip" or "center".
threshold (float): Threshold to detect the hand motion.
Returns:
None: No return value.
**NOTE**: The threshold is relative. Typically some value between 0.2
to 0.5 would be good enough.
"""
if point == "tip":
data = self.acc0
elif point == "center":
data = self.acc1
else:
raise ValueError("Unknown point type: %s" % str(point))
l = self.length
acc = data.copy()
a = np.linalg.norm(acc, axis=1)
a_std = np.std(a)
# NOTE: We cannot subtract the mean of acceleration here.
an = np.divide(a, a_std)
start = 0
for i in range(l):
if abs(an[i]) > threshold:
start = i - 5
break
start = max(start, 0)
end = 0
for i in range(l - 1, 0, -1):
if abs(an[i]) > threshold:
end = i + 5
break
end = min(end, l)
return (start, end)
preprocess(self, point, threshold=0.2, cut_freq=10, p_yaw=0, p_pitch=0, p_roll=0)
Preprocess the signal.
NOTE: This method follows all the preprocessing steps. See the "Data Processing" document for details.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
point |
string |
The point on the hand, either "tip" or "center". |
required |
threshold |
float |
Threshold to trim the signal without hand motion. |
0.2 |
cut_freq |
float |
Low pass filtering cutoff frequency. |
10 |
p_yaw |
float |
The pertubation yaw angle in radian. |
0 |
p_pitch |
float |
The pertubation pitch angle in radian. |
0 |
p_roll |
float |
The pertubation roll angle in radian. |
0 |
Returns:
Type | Description |
---|---|
FMSignal |
The preprocessed signal. |
NOTE: The default argument values are typically good enough.
Source code in code_fmkit/fmsignal.py
def preprocess(self, point, threshold=0.2, cut_freq=10,
p_yaw=0, p_pitch=0, p_roll=0):
"""Preprocess the signal.
**NOTE**: This method follows all the preprocessing steps. See the
"Data Processing" document for details.
Args:
point (string): The point on the hand, either "tip" or "center".
threshold (float): Threshold to trim the signal without hand motion.
cut_freq (float): Low pass filtering cutoff frequency.
p_yaw (float): The pertubation yaw angle in radian.
p_pitch (float): The pertubation pitch angle in radian.
p_roll (float): The pertubation roll angle in radian.
Returns:
FMSignal: The preprocessed signal.
**NOTE**: The default argument values are typically good enough.
"""
dim = 18
data = np.zeros((self.length, dim), dtype=np.float32)
self.dim = dim
self.data = data
start, end = self.prepare_trim_by_acc(point, threshold)
self.estimate_angular_states(point)
self.pose_normalize(start, end, p_yaw, p_pitch, p_roll)
self.estimate_linear_states(point)
sample_freq = self.length / (self.ts[self.length - 1]) * 1000.0
#self.filter(sample_freq, cut_freq)
#self.trim(start, end)
assert self.ts.dtype == np.float32
assert self.data.dtype == np.float32
signal = FMSignal(self.length, self.dim, self.ts, self.data,
self.user, self.cid, self.seq)
return signal
save_to_buffer_raw(self)
Save the raw signal to a NumPy ndarray.
NOTE: This is only for internal usage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
None |
None |
No argument. |
required |
Returns:
Type | Description |
---|---|
ndarray |
The buffer. |
Source code in code_fmkit/fmsignal.py
def save_to_buffer_raw(self):
"""Save the raw signal to a NumPy ndarray.
**NOTE**: This is only for internal usage.
Args:
None (None): No argument.
Returns:
ndarray: The buffer.
"""
array = np.zeros((self.length, 20), dtype=np.float32)
array[:, 0] = self.tsc
array[:, 1] = self.ts
array[:, 2:5] = self.acc0
array[:, 5:8] = self.gyro0
array[:, 8:11] = self.gravity0
array[:, 11:14] = self.acc1
array[:, 14:17] = self.gyro1
array[:, 17:20] = self.gravity1
return array
save_to_file(self, fn, mode)
General interface to save the raw signal to a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
The file name (without the ".csv" or ".npy" extension). |
required |
mode |
string |
The file format, either "raw_csv", or "raw_npy". |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def save_to_file(self, fn, mode):
"""General interface to save the raw signal to a file.
Args:
fn (string): The file name (without the ".csv" or ".npy" extension).
mode (string): The file format, either "raw_csv", or "raw_npy".
Returns:
None: No return value.
"""
if mode == "raw_csv":
array = self.save_to_buffer_raw()
fn += ".csv"
np.savetxt(fn, array, fmt="%.6f", delimiter=", ")
elif mode == "raw_npy":
array = self.save_to_buffer_raw()
# NOTE: NumPy library add the ".npy" file extension for us!
np.save(fn, array)
else:
raise ValueError("Unknown file mode %s!" % mode)
trim(self, start, end)
Trim the start and end of the signal where the hand does not move.
NOTE: The trimming process is split into two parts to accommodate other steps of preprocessing. This method is the second part, which throws away the data samples given the start and end indices.
NOTE: This method is only used in the preprocessing procedure. This method modifies the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start |
int |
The start index, i.e., data[start:end, :] will be kept. |
required |
end |
int |
The end index, i.e., data[start:end, :] will be kept. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def trim(self, start, end):
"""Trim the start and end of the signal where the hand does not move.
**NOTE**: The trimming process is split into two parts to accommodate
other steps of preprocessing. This method is the second part, which
throws away the data samples given the start and end indices.
**NOTE**: This method is only used in the preprocessing procedure.
This method modifies the signal.
Args:
start (int): The start index, i.e., data[start:end, :] will be kept.
end (int): The end index, i.e., data[start:end, :] will be kept.
Returns:
None: No return value.
"""
l_new = end - start
# NOTE: These "copy()" force the "ts" and "data" to have proper C-like
# array in memory, which is crucial for "dtw_c()"!!!
self.data = self.data[start:end, :].copy()
self.ts = self.ts[start:end].copy()
ts0 = self.ts[0]
self.ts -= ts0
self.length = l_new
if self.qs is not None:
self.qs = self.qs[start:end]
if self.rotms is not None:
self.rotms = self.rotms[start:end]
if self.trajectory is not None:
self.trajectory = self.trajectory[start:end]
FMSignalLeap
Raw finger motion signal collected by the Leap Motion controller.
This class represents the raw signal obtained from the Leap Motion sensor. An FMSignal object can be obtained by preprocessing this raw signal, where the preprocessing is essentially the main purpose of this FMSignalLeap class. It has the following additional attributes.
Attributes:
Name | Type | Description |
---|---|---|
length |
int |
The number of samples in this signal. |
ts |
ndarray |
Timestamp obtained from the device (1D vector). |
tsc |
ndarray |
Timestamp obtained from the client computer (1D vector). |
tip |
ndarray |
Position of the index finger tip (length * 3 matrix). |
center |
ndarray |
Position of the palm center (length * 3 matrix). |
joints |
ndarray |
Position of each joint (length * 5 * 5 * 3 tensor). |
confs |
ndarray |
Confidence value of each sample (1D vector). |
valids |
ndarray |
Whether the sample is valid (1D vector). |
data |
ndarray |
The data after preprocessing. |
rotms |
ndarray |
The orientation of each smple in rotation matrices. |
qs |
list |
The orientation of each smple in unit quaternions. |
trajectory |
ndarray |
The motion trajectory of the selected point. |
NOTE: The "joints" is a length * 5 * 5 * 3 tensor, i.e., 5 fingers, 5 joints on each finger, and 3 coordinates for each joint. The sequence of fingers are [thumb, index finger, middle finger, ring finger, little finger]. The sequence of joints are from the end of the palm to the tip of the finger. For example, "joints[:, 1, 0, :]" is the trajectory of the palm end of the index finger, and "joints[:, 1, 4, :]" is the trajectory of the tip of the index finger. Note that the thumb has only four joints, i.e., "joints[:, 0, 0, :]" and "joints[:, 0, 1, :]" are identical.
NOTE: The preprocessed signal contains the motion of only one point on the hand, which can be derived either from the center of the hand or the tip of the index finger. This is controlled by the "point" argument of the "preprocess()" method.
NOTE: The raw signal file may have two different formats, i.e., either in Comma Separated Value (".csv") or in NumPy binary format (".npy"). However, the content structure is the same, which is essentially a matrix, where the rows are the samples at a certain time and the columns are the data from a specific sensor axis. See the "Data Format Details" document for more information. There is another format called "raw_internal", which is used to resolve format issues in the data collected at the early stage of this project. It is now obsolete.
__init__(self, user='', cid='', seq=0)
special
Constructor.
See the attributes of FMSignal for the meaning of the arguments.
NOTE: This is only for internal usage. If an FMSignalLeap object is needed, use the class method "construct_from_file()" to load data from a file, or use the "copy()" method to duplicate a signal.
Source code in code_fmkit/fmsignal.py
def __init__(self, user="", cid="", seq=0):
"""Constructor.
See the attributes of FMSignal for the meaning of the arguments.
**NOTE**: This is only for internal usage. If an FMSignalLeap object is
needed, use the class method "construct_from_file()" to load data from
a file, or use the "copy()" method to duplicate a signal.
"""
self.user = user
self.cid = cid
self.seq = seq
self.length = 0
self.ts = None
self.tsc = None
self.tip = None
self.center = None
self.joints = None
self.confs = None
self.valids = None
self.data = None
self.qs = None
self.rotms = None
self.trajectory = None
all_close_to(self, signal)
Check whether this signal is almost identical to the other signal.
NOTE: The criteria of "identical" is defined by "np.allclose()". The two signals must have the same type and length.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
signal |
FMSignalLeap |
The other signal to compare. |
required |
Returns:
Type | Description |
---|---|
bool |
True if they are almost identical; False otherwise. |
Source code in code_fmkit/fmsignal.py
def all_close_to(self, signal):
"""Check whether this signal is almost identical to the other signal.
**NOTE**: The criteria of "identical" is defined by "np.allclose()".
The two signals must have the same type and length.
Args:
signal (FMSignalLeap): The other signal to compare.
Returns:
bool: True if they are almost identical; False otherwise.
"""
if not isinstance(signal, FMSignalLeap):
return False
if self.length != signal.length:
return False
# NOTE: The CSV format only stores six digits after the decimal point.
# Hence, "atol" can not be smaller than 1e-6.
r1 = np.allclose(self.ts, signal.ts, atol=1e-6)
r2 = np.allclose(self.tsc, signal.tsc, atol=1e-6)
r3 = np.allclose(self.tip, signal.tip, atol=1e-6)
r4 = np.allclose(self.center, signal.center, atol=1e-6)
r5 = np.allclose(self.joints, signal.joints, atol=1e-6)
r6 = np.allclose(self.confs, signal.confs, atol=1e-6)
r7 = np.allclose(self.valids, signal.valids, atol=1e-6)
return r1 and r2 and r3 and r4 and r5 and r6 and r7
construct_from_file(fn, mode, user='', cid='', seq=0)
classmethod
Construct a signal by loading data from a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
The file name (without extension). |
required |
mode |
string |
The file format, either "raw_csv", or "raw_npy". |
required |
user |
string |
The user who creates this signal. |
'' |
cid |
string |
The unique id indicating the content of the signal. |
'' |
seq |
int |
The sequence id in a set when loaded from a dataset. |
0 |
Returns:
Type | Description |
---|---|
FMSignalLeap |
The constructed signal object. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the "mode" is wrong. |
FileNotFoundError |
If the file does not exist. |
Source code in code_fmkit/fmsignal.py
@classmethod
def construct_from_file(cls, fn, mode, user="", cid="", seq=0):
"""Construct a signal by loading data from a file.
Args:
fn (string): The file name (without extension).
mode (string): The file format, either "raw_csv", or "raw_npy".
user (string): The user who creates this signal.
cid (string): The unique id indicating the content of the signal.
seq (int): The sequence id in a set when loaded from a dataset.
Returns:
FMSignalLeap: The constructed signal object.
Raises:
ValueError: If the "mode" is wrong.
FileNotFoundError: If the file does not exist.
"""
signal = FMSignalLeap(user, cid, seq)
signal.load_from_file(fn, mode)
return signal
estimate_angular_states(self, p_yaw=0, p_pitch=0, p_roll=0)
Estimate orientation, angular speed, and angular acceleration.
NOTE: Orientations are represented as the qx, qy, and qz components of a unit quaternion. It is obtained by the position of the joints of the hand.
NOTE: Currently, the angular speed is the relative local differential of the angular position. The angular acceleration is just the differential of the angular speed, not the real angular acceleration. In this way, even if the reference frame changes, the angular speed will not change. Hence, we add three purtubation angles to change the local reference frame a bit as needed, mainly for data augmentation usage.
NOTE: This method is only used in the preprocessing procedure. This method modifies the signal. This method should be called after the pose normalization step.
NOTE: Currently, this method iterates through each sample, which is relatively slow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
p_yaw |
float |
The pertubation yaw angle in radian. |
0 |
p_pitch |
float |
The pertubation pitch angle in radian. |
0 |
p_roll |
float |
The pertubation roll angle in radian. |
0 |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def estimate_angular_states(self, p_yaw=0, p_pitch=0, p_roll=0):
"""Estimate orientation, angular speed, and angular acceleration.
**NOTE**: Orientations are represented as the qx, qy, and qz
components of a unit quaternion. It is obtained by the position of the
joints of the hand.
**NOTE**: Currently, the angular speed is the relative local
differential of the angular position. The angular acceleration is just
the differential of the angular speed, not the real angular
acceleration. In this way, even if the reference frame changes, the
angular speed will not change. Hence, we add three purtubation angles
to change the local reference frame a bit as needed, mainly for data
augmentation usage.
**NOTE**: This method is only used in the preprocessing procedure.
This method modifies the signal. This method should be called after
the pose normalization step.
**NOTE**: Currently, this method iterates through each sample, which is
relatively slow.
Args:
p_yaw (float): The pertubation yaw angle in radian.
p_pitch (float): The pertubation pitch angle in radian.
p_roll (float): The pertubation roll angle in radian.
Returns:
None: No return value.
"""
data = self.data
ts = self.ts
# rotation matrix of each data sample
rotms = np.zeros((self.length, 3, 3))
# pose offset, for augmentation
R_offset = euler_zyx_to_rotation_matrix(p_yaw, p_pitch, p_roll)
# quaternion of each data sample
qs = [None] * self.length
omega_pre = np.asarray((0.0, 0.0, 0.0))
for i in range(self.length):
joints = self.joints[i]
# Origin is the near end of the index finger.
# p0 = joints[1, 1]
p0 = joints[1, 0]
# Pointing direction is from the near end to the next joint along
# the index finger.
# p1 = joints[1, 2]
p1 = joints[1, 1]
# The side direction is from the near end of the index finger to
# the near end of the little finger.
# p2 = joints[4, 1]
p2 = joints[4, 0]
# derive the pose represented in three orthogonal vectors,
# i.e., vx, vy, vz
# Note that here vx is the general pointing direction, vz is the
# general palm facing direction
vx = p1 - p0
vy_prime = p0 - p2
vz = np.cross(vx, vy_prime)
if np.linalg.norm(vx) <= 1e-6:
print(i, p0, p1)
# most of the time the palm is facing downward, so we flip the
# axes here to set up a local reference frame where the z-axis
# is always upward.
# CAUTION: There are chances that the sensor wrongly recognize the
# right hand as the palm facing upward. Similarly, there are cases
# that the right hand is wrongly identified as the left hand. In
# either cases, we just always make the z-axis upward.
# if vz[2] < 0:
# vz = -vz
vy = np.cross(vz, vx)
vz = np.cross(vx, vy)
vx = vx / np.linalg.norm(vx)
vy = vy / np.linalg.norm(vy)
vz = vz / np.linalg.norm(vz)
vx = vx.reshape((3, 1))
vy = vy.reshape((3, 1))
vz = vz.reshape((3, 1))
# CAUTION: this is just an approximation of a rotation matrix,
# and it may not be perfectly northonormal!!!
rotm = np.concatenate((vx, vy, vz), axis=1)
q = Quaternion.construct_from_rotation_matrix(rotm)
# print(q, q.norm())
# u = rotation_matrix_to_angle_axis(rotm)
if i > 0:
pv = qs[i - 1].to_vector()
qv = q.to_vector()
p1 = np.linalg.norm(pv - qv)
p2 = np.linalg.norm(pv + qv)
if p1 > p2:
q = q.negate()
# Now since it is a unit quaternion encoding the rotation, we
# convert it back to a rotation matrix for future usage
# rotm = q.to_rotation_matrix()
qs[i] = q
rotms[i] = rotm
# Now we derive the three Tait-Bryan angles
# CAUTION: yaw first, then pitch, then roll
# i.e., z-y-x intrinsic rotation (east, north, sky)
# yaw, pitch, roll, gimbal_lock \
# = rotation_matrix_to_euler_angles_zyx(rotm)
#
# data[i, 9] = roll
# data[i, 10] = pitch
# data[i, 11] = yaw
#
# assert not gimbal_lock
# Tait-Bryan angles have singularity.
# Use quaternion components instead.
data[i, 9] = q.x
data[i, 10] = q.y
data[i, 11] = q.z
# data[i, 9:12] = u
if i > 0:
timestep = ts[i] - ts[i - 1]
if timestep > 0:
p = qs[i - 1]
omega = Quaternion.differentiate_local(p, q, timestep)
else:
omega = omega_pre
omega_pre = omega
data[i, 12] = omega[0] * 1000
data[i, 13] = omega[1] * 1000
data[i, 14] = omega[2] * 1000
data[0, 12] = data[1, 12]
data[0, 13] = data[1, 13]
data[0, 14] = data[1, 14]
ov = data[:, 12:15].T
ov = np.matmul(R_offset, ov)
data[:, 12:15] = ov.T
data[:, 15] = np.gradient(data[:, 12])
data[:, 16] = np.gradient(data[:, 13])
data[:, 17] = np.gradient(data[:, 14])
#print(data.dtype)
self.data = data
self.dim = 18
self.rotms = rotms
self.qs = qs
estimate_linear_states(self, point)
Estimate position, velocity and acceleration.
NOTE: For the Leap Motion device, only the position is directly obtained from the device. The velocity and acceleration are derived by differentiating the the position.
NOTE: This method is only used in the preprocessing procedure. This method modifies the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
point |
string |
The point on the hand, either "tip" or "center". |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def estimate_linear_states(self, point):
"""Estimate position, velocity and acceleration.
**NOTE**: For the Leap Motion device, only the position is directly
obtained from the device. The velocity and acceleration are derived
by differentiating the the position.
**NOTE**: This method is only used in the preprocessing procedure.
This method modifies the signal.
Args:
point (string): The point on the hand, either "tip" or "center".
Returns:
None: No return value.
"""
data = self.data
# print(data.dtype)
if point == "tip":
# Use the tip of the index finger.
data[:, 0:3] = self.tip
self.trajectory = self.tip.copy()
elif point == "center":
# Use the center of the palm.
data[:, 0:3] = self.center
self.trajectory = self.center.copy()
else:
raise ValueError("Unknown point type: %s" % str(point))
data[:, 3] = np.gradient(data[:, 0])
data[:, 4] = np.gradient(data[:, 1])
data[:, 5] = np.gradient(data[:, 2])
data[:, 6] = np.gradient(data[:, 3])
data[:, 7] = np.gradient(data[:, 4])
data[:, 8] = np.gradient(data[:, 5])
# print(data.dtype)
self.data = data
filter(self, sample_freq, cut_freq)
Low-pass filtering on the signal.
NOTE: It is assumed that a hand can not move in very high frequency, so the high frequency components of the signal is filtered. This method uses NumPy FFT and IFFT.
NOTE: This method is only used in the preprocessing procedure. This method modifies the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sample_freq |
float |
sample frequency of this signal. |
required |
cut_freq |
float |
low pass filtering cutoff frequency. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def filter(self, sample_freq, cut_freq):
"""Low-pass filtering on the signal.
**NOTE**: It is assumed that a hand can not move in very high frequency,
so the high frequency components of the signal is filtered. This method
uses NumPy FFT and IFFT.
**NOTE**: This method is only used in the preprocessing procedure.
This method modifies the signal.
Args:
sample_freq (float): sample frequency of this signal.
cut_freq (float): low pass filtering cutoff frequency.
Returns:
None: No return value.
"""
l = self.length
data = self.data
cut_l = int(cut_freq * l / sample_freq)
dft_co = np.fft.fft(data, l, axis=0)
for i in range(cut_l, l - cut_l):
dft_co[i] = 0 + 0j
ifft_c = np.fft.ifft(dft_co, l, axis=0)
# NOTE: This must be converted to "np.float32"!
ifft = ifft_c.real.astype(np.float32)
#print(ifft.dtype)
self.data = ifft
fix_missing_samples(self)
Fix missing data samples by linear interpolation.
NOTE: This is only for internal usage. The missing samples are mainly caused by the motion of the hand which are outside the field of the view of the sensor. This procedure assumes that the first sample is always valid! Since it uses linear interpolation, it will not work well with too many missing samples.
NOTE: This method is only used in the preprocessing procedure. This method modifies the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
None |
None |
No argument. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def fix_missing_samples(self):
"""Fix missing data samples by linear interpolation.
**NOTE**: This is only for internal usage. The missing samples are
mainly caused by the motion of the hand which are outside the field of
the view of the sensor. This procedure assumes that the first sample is
always valid! Since it uses linear interpolation, it will not work well
with too many missing samples.
**NOTE**: This method is only used in the preprocessing procedure.
This method modifies the signal.
Args:
None (None): No argument.
Returns:
None: No return value.
"""
l = self.length
i = 0
while i < l - 1:
# Find the start index of a missing segment.
while i < l - 1:
if self.valids[i] == 0:
break
else:
i += 1
# Find the end index of a missing segment.
j = i
while j < l - 1:
if self.valids[j] == 1:
break
else:
j += 1
# If missing points found between i and j, fix them.
# If no missing point found, just skip.
if i < j:
# print(i, j)
start_ts = self.ts[i - 1]
end_ts = self.ts[j]
start_tip = self.tip[i - 1]
end_tip = self.tip[j]
length_tip = (end_tip - start_tip)
start_center = self.center[i - 1]
end_center = self.center[j]
length_center = (end_center - start_center)
start_joints = self.joints[i - 1]
end_joints = self.joints[j]
length_joints = (end_joints - start_joints)
for k in range(i, j):
k_ts = self.ts[k]
rate = (k_ts - start_ts) * 1.0 / (end_ts - start_ts)
self.tip[k] = start_tip + length_tip * rate
self.center[k] = start_center + length_center * rate
self.joints[k] = start_joints + length_joints * rate
self.valids[k] = 1
# Find another missing sement in the next iteration
i = j
load_from_buffer_raw(self, array)
Load data from a NumPy ndarray.
NOTE: This is only for internal usage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
array |
ndarray |
The buffer. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def load_from_buffer_raw(self, array):
"""Load data from a NumPy ndarray.
**NOTE**: This is only for internal usage.
Args:
array (ndarray): The buffer.
Returns:
None: No return value.
"""
l = array.shape[0]
m = array.shape[1]
assert m == 2 + 3 + 3 + 75 + 2, "Wrong raw file format: m = %d" % m
self.length = l
self.ts = array[:, 1].flatten()
self.tsc = array[:, 0].flatten()
offset_tip = 2
offset_center = 2 + 3
offset_joints = 2 + 3 + 3
self.tip = array[:, offset_tip : offset_tip + 3]
self.center = array[:, offset_center : offset_center + 3]
# Load joint positions
self.joints = np.zeros((l, 5, 5, 3), np.float32)
for j in range(5):
for k in range(5):
index = offset_joints + j * 5 * 3 + k * 3
self.joints[:, j, k, :] = array[:, index : index + 3]
# Load confidences and valid flags
self.confs = array[:, -2]
self.valids = array[:, -1]
load_from_buffer_raw_internal(self, array)
Load data from a NumPy ndarray in the "raw_internal" format.
NOTE: This is only for internal usage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
array |
ndarray |
The buffer. |
required |
Returns:
Type | Description |
---|---|
None |
No Return value. |
Source code in code_fmkit/fmsignal.py
def load_from_buffer_raw_internal(self, array):
"""Load data from a NumPy ndarray in the "raw_internal" format.
**NOTE**: This is only for internal usage.
Args:
array (ndarray): The buffer.
Returns:
None: No Return value.
"""
l = array.shape[0]
m = array.shape[1]
# --------- process timestamp ------
if m == 133 or m == 93:
# NOTE: This is the old format, with only one timestamp.
offset_base = 1
ts = array[:, 0].flatten()
tsc = np.zeros(l, np.float32)
elif m == 134:
# CATUION: the current format, with two timestamps.
offset_base = 2
ts = array[:, 1].flatten()
tsc = array[:, 0].flatten()
# Use an offset to reduce the size of tsc so that it can fit into
# the float32 type.
tsc -= 1514764800
else:
raise ValueError("Unknown data file format!: m = %d" % m)
for i in range(l):
# fix timestamp wraping over maximum of uint32
if i > 0 and ts[i] < ts[i - 1]:
ts[i] += 4294967295.0
# Fix timestamp offset and convert timestamp to millisecond
# CAUTION: timestamp must start from 0! Other methods such as filtering
# depend on this assumption!
ts0 = ts[0]
ts -= ts0
ts /= 1000
# --------- process point coordinate and joint coordinate ------
offset_tip = offset_base
offset_center = offset_base + 6
offset_joints = offset_base + 6 + 9
data_tip = np.zeros((l, 3), np.float32)
# NOTE: Axes mapping: yzx -> xyz
data_tip[:, 0] = array[:, offset_tip + 2]
data_tip[:, 1] = array[:, offset_tip + 0]
data_tip[:, 2] = array[:, offset_tip + 1]
data_center = np.zeros((l, 3), np.float32)
# NOTE: Axes mapping: yzx -> xyz
data_center[:, 0] = array[:, offset_center + 2]
data_center[:, 1] = array[:, offset_center + 0]
data_center[:, 2] = array[:, offset_center + 1]
data_joints = np.zeros((l, 5, 5, 3), np.float32)
# Load joint positions
for j in range(5):
for k in range(5):
index = j * 5 * 3 + k * 3
# NOTE: Axes mapping: yzx -> xyz
data_joints[:, j, k, 0] = array[:, offset_joints + index + 2]
data_joints[:, j, k, 1] = array[:, offset_joints + index + 0]
data_joints[:, j, k, 2] = array[:, offset_joints + index + 1]
# Load confidences and valid flags
confs = array[:, -2]
valids = array[:, -1]
self.length = l
self.ts = ts.astype(np.float32)
self.tsc = tsc.astype(np.float32)
self.tip = data_tip
self.center = data_center
self.joints = data_joints
self.confs = confs.astype(np.float32)
self.valids = valids.astype(np.float32)
load_from_file(self, fn, mode)
General interface to load the raw signal from a file.
NOTE: This is only for internal usage. If an FMSignalLeap object is needed, use the class method "construct_from_file()" instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
The file name (without the ".csv" or ".npy" extension). |
required |
mode |
string |
The file format, either "raw_csv", or "raw_npy". |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the "mode" is unknown. |
FileNotFoundError |
if the file does not exist. |
Source code in code_fmkit/fmsignal.py
def load_from_file(self, fn, mode):
"""General interface to load the raw signal from a file.
**NOTE**: This is only for internal usage. If an FMSignalLeap object is
needed, use the class method "construct_from_file()" instead.
Args:
fn (string): The file name (without the ".csv" or ".npy" extension).
mode (string): The file format, either "raw_csv", or "raw_npy".
Returns:
None: No return value.
Raises:
ValueError: if the "mode" is unknown.
FileNotFoundError: if the file does not exist.
"""
if mode == "raw_internal":
fn += ".txt"
# NOTE: The "dtype" needs to be np.float64 for the raw timestamps!
array = np.loadtxt(fn, np.float64, delimiter=",")
self.load_from_buffer_raw_internal(array)
elif mode == "raw_csv":
fn += ".csv"
array = np.loadtxt(fn, np.float32, delimiter=",")
self.load_from_buffer_raw(array)
elif mode == "raw_npy":
fn += ".npy"
array = np.load(fn)
assert array.dtype == np.float32
self.load_from_buffer_raw(array)
else:
raise ValueError("Unknown file mode %s!" % mode)
pose_normalize(self, point, start, end, p_yaw=0, p_pitch=0, p_roll=0)
Normalize the position and orientation of the signal.
NOTE: The new x-axis is the average of the pointing direction, the new z-axis is the vertical up direction of the leap motion sensor (assuming the sensor is placed on a horizontal surface), and the new y-axis is determined by the new x-axis and y-axis.
NOTE: The three angles in the argument are offsets that applied during the normalization in radians. This can be used in orientation purtubation for data augmentation or correcting the pointing direction.
NOTE: This method is only used in the preprocessing procedure. This method modifies the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
point |
string |
The point on the hand, either "tip" or "center". |
required |
start |
int |
The start time index of the signal for normalization. |
required |
end |
int |
The end time index of the signal for normalization. |
required |
p_yaw |
float |
The pertubation yaw angle in radian. |
0 |
p_pitch |
float |
The pertubation pitch angle in radian. |
0 |
p_roll |
float |
The pertubation roll angle in radian. |
0 |
Returns:
Type | Description |
---|---|
None |
No return value. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the "point" is wrong or the "start" or "end" is bad. |
Source code in code_fmkit/fmsignal.py
def pose_normalize(self, point, start, end, p_yaw=0, p_pitch=0, p_roll=0):
"""Normalize the position and orientation of the signal.
**NOTE**: The new x-axis is the average of the pointing direction,
the new z-axis is the vertical up direction of the leap motion sensor
(assuming the sensor is placed on a horizontal surface), and the new
y-axis is determined by the new x-axis and y-axis.
**NOTE**: The three angles in the argument are offsets that applied
during the normalization in radians. This can be used in orientation
purtubation for data augmentation or correcting the pointing direction.
**NOTE**: This method is only used in the preprocessing procedure.
This method modifies the signal.
Args:
point (string): The point on the hand, either "tip" or "center".
start (int): The start time index of the signal for normalization.
end (int): The end time index of the signal for normalization.
p_yaw (float): The pertubation yaw angle in radian.
p_pitch (float): The pertubation pitch angle in radian.
p_roll (float): The pertubation roll angle in radian.
Returns:
None: No return value.
Raises:
ValueError: If the "point" is wrong or the "start" or "end" is bad.
"""
l = self.length
if start < 0 and end >= l and end <= start:
raise ValueError("Bad start and end indices: %d, %d." \
% (start, end))
# Find the average position.
if point == "tip":
# Use the tip of the index finger.
pos = self.tip[start:end]
elif point == "center":
# Use the center of the palm.
pos = self.center[start:end]
else:
raise ValueError("Unknown point type: %s." % str(point))
pos_mean = np.mean(pos, axis=0)
# Position normalization.
self.translate(-pos_mean)
# Find the average orientation.
data_joints = self.joints
if point == "tip":
# Use the direction of the middle segment of the index finger.
p0 = data_joints[start:end, 1, 1, :]
p1 = data_joints[start:end, 1, 2, :]
elif point == "center":
# Use the direction of the last segment of the middle finger.
p0 = data_joints[start:end, 2, 0, :]
p1 = data_joints[start:end, 2, 1, :]
else:
raise ValueError("Unknown point type: %s" % str(point))
vx = np.mean(p1 - p0, axis=0)
vz_t = np.asarray((0.0, 0.0, 1.0), np.float32)
vy = np.cross(vz_t, vx)
vz = np.cross(vx, vy)
vx = vx / np.linalg.norm(vx)
vy = vy / np.linalg.norm(vy)
vz = vz / np.linalg.norm(vz)
vx = vx.reshape((3, 1))
vy = vy.reshape((3, 1))
vz = vz.reshape((3, 1))
# Orientation normalization.
# NOTE: "p_yaw", "p_pitch", "p_roll" are orientation pertubation. They
# are all in radian.
R_g2l = np.concatenate((vx, vy, vz), axis=1)
R_l2g = R_g2l.transpose()
R_offset = euler_zyx_to_rotation_matrix(p_yaw, p_pitch, p_roll)
R_l2g = np.matmul(R_offset, R_l2g)
self.rotate(R_l2g)
prepare_trim_by_velocity(self, point, threshold)
Determine the indicies for the start and end of the finger motion.
NOTE: The trimming process is split into two parts to accommodate other preprocessing steps. This method is the first part, which returns the start and end indices without actually throwing away the data samples when the hand is not moving.
NOTE: This method is only used in the preprocessing procedure. This method DOES NOT modify the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
point |
string |
The point on the hand, either "tip" or "center". |
required |
threshold |
float |
Threshold to detect the hand motion. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
NOTE: The threshold is relative. Typically some value between 0.2 to 0.5 would be good enough.
Source code in code_fmkit/fmsignal.py
def prepare_trim_by_velocity(self, point, threshold):
"""Determine the indicies for the start and end of the finger motion.
**NOTE**: The trimming process is split into two parts to accommodate
other preprocessing steps. This method is the first part, which returns
the start and end indices without actually throwing away the data
samples when the hand is not moving.
**NOTE**: This method is only used in the preprocessing procedure.
This method **DOES NOT** modify the signal.
Args:
point (string): The point on the hand, either "tip" or "center".
threshold (float): Threshold to detect the hand motion.
Returns:
None: No return value.
**NOTE**: The threshold is relative. Typically some value between 0.2
to 0.5 would be good enough.
"""
if point == "tip":
# Use the tip of the index finger.
pos = self.tip
elif point == "center":
# Use the center of the palm.
pos = self.center
else:
raise ValueError("Unknown point type: %s" % str(point))
vel = pos.copy()
l = self.length
# The "start" and "end" are determined by the relative velocity. It is
# calculated here by taking gradient of the position and then
# normalizing it.
vel[:, 0] = np.gradient(pos[:, 0])
vel[:, 1] = np.gradient(pos[:, 1])
vel[:, 2] = np.gradient(pos[:, 2])
v = np.linalg.norm(vel, axis=1)
v_std = np.std(v)
# NOTE: We cannot subtract the mean of velocity here.
vn = np.divide(v, v_std)
# Determine the start
start = 0
for i in range(l):
if abs(vn[i]) > threshold:
start = i - 5
break
start = max(start, 0)
end = 0
for i in range(l - 1, 0, -1):
if abs(vn[i]) > threshold:
end = i + 5
break
end = min(end, l)
# print(self.id_label, self.seq, l, start, l - end)
return (start, end)
preprocess(self, point, threshold=0.2, cut_freq=10, re_freq=50, p_yaw=0, p_pitch=0, p_roll=0)
Preprocess the signal.
NOTE: This method follows all the preprocessing steps. See the "Data Processing" document for details.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
point |
string |
The point on the hand, either "tip" or "center". |
required |
threshold |
float |
Threshold to trim the signal without hand motion. |
0.2 |
cut_freq |
float |
Low pass filtering cutoff frequency. |
10 |
re_freq |
float |
Resampling frequency. |
50 |
p_yaw |
float |
The pertubation yaw angle in radian. |
0 |
p_pitch |
float |
The pertubation pitch angle in radian. |
0 |
p_roll |
float |
The pertubation roll angle in radian. |
0 |
Returns:
Type | Description |
---|---|
FMSignal |
The preprocessed signal. |
NOTE: The default argument values are typically good enough.
Source code in code_fmkit/fmsignal.py
def preprocess(self, point, threshold=0.2, cut_freq=10, re_freq=50,
p_yaw=0, p_pitch=0, p_roll=0):
"""Preprocess the signal.
**NOTE**: This method follows all the preprocessing steps. See the
"Data Processing" document for details.
Args:
point (string): The point on the hand, either "tip" or "center".
threshold (float): Threshold to trim the signal without hand motion.
cut_freq (float): Low pass filtering cutoff frequency.
re_freq (float): Resampling frequency.
p_yaw (float): The pertubation yaw angle in radian.
p_pitch (float): The pertubation pitch angle in radian.
p_roll (float): The pertubation roll angle in radian.
Returns:
FMSignal: The preprocessed signal.
**NOTE**: The default argument values are typically good enough.
"""
self.fix_missing_samples()
self.resample(re_freq)
start, end = self.prepare_trim_by_velocity(point, threshold)
self.pose_normalize(point, start, end, p_yaw, p_pitch, p_roll)
# For the Leap Motion device, position is directly observable.
dim = 18
data = np.zeros((self.length, dim), dtype=np.float32)
self.dim = dim
self.data = data
self.estimate_linear_states(point)
self.estimate_angular_states(p_yaw, p_pitch, p_roll)
self.filter(re_freq, cut_freq)
self.trim(start, end)
assert self.ts.dtype == np.float32
assert self.data.dtype == np.float32
signal = FMSignal(self.length, self.dim, self.ts, self.data,
self.user, self.cid, self.seq)
return signal
resample(self, re_freq)
Resample the signal at a specified frequency.
NOTE: This method is only used in the preprocessing procedure. This method modifies the signal. This method uses linear interpolation for resampling.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
re_freq |
float |
Resampling frequency. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def resample(self, re_freq):
"""Resample the signal at a specified frequency.
**NOTE**: This method is only used in the preprocessing procedure.
This method modifies the signal. This method uses linear interpolation
for resampling.
Args:
re_freq (float): Resampling frequency.
Returns:
None: No return value.
"""
ts = self.ts
l = self.length
duration = ts[l - 1] - ts[0]
step = 1000.0 / re_freq
l_new = int(duration / 1000.0 * re_freq)
ts_re = np.arange(0, step * l_new, step, dtype=np.float32)
a_tip_re = np.zeros((l_new, 3), dtype=np.float32)
for j in range(3):
a_tip_re[:, j] = np.interp(ts_re, ts, self.tip[:, j])
a_center_re = np.zeros((l_new, 3), dtype=np.float32)
for j in range(3):
a_center_re[:, j] = np.interp(ts_re, ts, self.center[:, j])
data_joints_re = np.zeros((l_new, 5, 5, 3), np.float32)
for j in range(5):
for k in range(5):
for v in range(3):
data_joints_re[:, j, k, v] = np.interp(
ts_re, ts, self.joints[:, j, k, v]
)
confs_resample = np.interp(ts_re, ts, self.confs).astype(np.float32)
valids_resample = np.ones(l_new, dtype=np.float32)
# NOTE: Make sure they all have "dtype == np.float32"!
# print(ts_re.dtype)
# print(a_tip_re.dtype)
# print(a_center_re.dtype)
# print(data_joints_re.dtype)
# print(confs_resample.dtype)
# print(valids_resample.dtype)
self.length = l_new
self.ts = ts_re
self.tip = a_tip_re
self.center = a_center_re
self.joints = data_joints_re
self.confs = confs_resample
self.valids = valids_resample
rotate(self, R)
Rotate the signal coorindate reference frame.
NOTE: This method is only used in the preprocessing procedure. This method modifies the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
R |
ndarray |
The 3-by-3 rotation matrix. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def rotate(self, R):
"""Rotate the signal coorindate reference frame.
**NOTE**: This method is only used in the preprocessing procedure.
This method modifies the signal.
Args:
R (ndarray): The 3-by-3 rotation matrix.
Returns:
None: No return value.
"""
pv = self.tip[:, 0:3].T
pv = np.matmul(R, pv)
self.tip[:, 0:3] = pv.T
pv = self.center[:, 0:3].T
pv = np.matmul(R, pv)
self.center[:, 0:3] = pv.T
for j in range(5):
for k in range(5):
pv = self.joints[:, j, k].T
pv = np.matmul(R, pv)
self.joints[:, j, k] = pv.T
save_to_buffer_raw(self)
Save the raw signal to a NumPy ndarray.
NOTE: This is only for internal usage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
None |
None |
No argument. |
required |
Returns:
Type | Description |
---|---|
ndarray |
The buffer. |
Source code in code_fmkit/fmsignal.py
def save_to_buffer_raw(self):
"""Save the raw signal to a NumPy ndarray.
**NOTE**: This is only for internal usage.
Args:
None (None): No argument.
Returns:
ndarray: The buffer.
"""
m = 2 + 3 + 3 + 75 + 2
array = np.zeros((self.length, m), np.float32)
array[:, 0] = self.tsc
array[:, 1] = self.ts
offset_tip = 2
offset_center = 2 + 3
offset_joints = 2 + 3 + 3
array[:, offset_tip : offset_tip + 3] = self.tip
array[:, offset_center : offset_center + 3] = self.center
for j in range(5):
for k in range(5):
index = offset_joints + j * 5 * 3 + k * 3
array[:, index : index + 3] = self.joints[:, j, k, :]
array[:, -2] = self.confs
array[:, -1] = self.valids
return array
save_to_file(self, fn, mode)
General interface to save the raw signal to a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
The file name (without the ".csv" or ".npy" extension). |
required |
mode |
string |
The file format, either "raw_csv", or "raw_npy". |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def save_to_file(self, fn, mode):
"""General interface to save the raw signal to a file.
Args:
fn (string): The file name (without the ".csv" or ".npy" extension).
mode (string): The file format, either "raw_csv", or "raw_npy".
Returns:
None: No return value.
"""
if mode == "raw_csv":
array = self.save_to_buffer_raw()
fn += ".csv"
np.savetxt(fn, array, fmt="%.6f", delimiter=", ")
elif mode == "raw_npy":
array = self.save_to_buffer_raw()
# NOTE: NumPy library add the ".npy" file extension for us!
np.save(fn, array)
else:
raise ValueError("Unknown file mode %s!" % mode)
translate(self, t)
Translate the signal coorindate reference frame.
NOTE: This method is only used in the preprocessing procedure. This method modifies the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
t |
ndarray |
The 3D translation vector. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def translate(self, t):
"""Translate the signal coorindate reference frame.
**NOTE**: This method is only used in the preprocessing procedure.
This method modifies the signal.
Args:
t (ndarray): The 3D translation vector.
Returns:
None: No return value.
"""
self.tip += t
self.center += t
for j in range(5):
for k in range(5):
self.joints[:, j, k] += t
trim(self, start, end)
Trim the start and the end where the hand does not move.
NOTE: The trimming process is split into two parts to accommodate other steps of preprocessing. This method is the second part, which throws away the data samples given the start and end indices.
NOTE: This method is only used in the preprocessing procedure. This method modifies the signal.
Source code in code_fmkit/fmsignal.py
def trim(self, start, end):
"""Trim the start and the end where the hand does not move.
**NOTE**: The trimming process is split into two parts to accommodate
other steps of preprocessing. This method is the second part, which
throws away the data samples given the start and end indices.
**NOTE**: This method is only used in the preprocessing procedure.
This method modifies the signal.
"""
l_new = end - start
# NOTE: These "copy()" force the "ts" and "data" to have proper C-like
# array in memory, which is crucial for "dtw_c()"!!!
self.data = self.data[start:end, :].copy()
self.ts = self.ts[start:end].copy()
ts0 = self.ts[0]
self.ts -= ts0
self.length = l_new
if self.qs is not None:
self.qs = self.qs[start:end]
if self.rotms is not None:
self.rotms = self.rotms[start:end]
if self.trajectory is not None:
self.trajectory = self.trajectory[start:end]
self.tip = self.tip[start:end]
self.center = self.center[start:end]
self.joints = self.joints[start:end]
self.confs = self.confs[start:end]
self.valids = self.valids[start:end]
FMSignalTemplate
FMSignal signal template constructed from a collection of signals.
In most cases, template is identical to a signal (preprocessed, sensor agnostic) with a few additional attributes. Note that this class is derived from the class "FMSignal".
Attributes:
Name | Type | Description |
---|---|---|
variance |
ndarray |
The variance of the signal (length * 18 matrix). |
signals_aligned |
ndarray |
The signals for constructing the template. |
NOTE: The variance is calculates at sample level, so it has the same length and dimension as the data, i.e., a length * 18 matrix.
NOTE: The "signals_aligned" is only valid if it is constructed from a collection of signals, i.e., by calling "construct_from_signals()". It is not valid if the template is loaded from a file.
__init__(self, length=0, dim=0, ts=None, data=None, variance=None, signals_aligned=None, user='', cid='', seq=0)
special
Constructor.
See the attributes of FMSignal for the meaning of the arguments.
Source code in code_fmkit/fmsignal.py
def __init__(self, length=0, dim=0, ts=None, data=None,
variance=None, signals_aligned=None, user="", cid="", seq=0):
"""Constructor.
See the attributes of FMSignal for the meaning of the arguments.
"""
FMSignal.__init__(self, length, dim, ts, data, user, cid, seq)
self.variance = variance
self.signals_aligned = signals_aligned
all_close_to(self, template)
Check whether this template is almost identical to another template.
NOTE: The criteria of "identical" is defined by "np.allclose()". The two templates must have the same type and length.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
template |
FMSignalTemplate |
The other template to compare. |
required |
Returns:
Type | Description |
---|---|
bool |
True if they are almost identical; False otherwise. |
Source code in code_fmkit/fmsignal.py
def all_close_to(self, template):
"""Check whether this template is almost identical to another template.
**NOTE**: The criteria of "identical" is defined by "np.allclose()".
The two templates must have the same type and length.
Args:
template (FMSignalTemplate): The other template to compare.
Returns:
bool: True if they are almost identical; False otherwise.
"""
if not isinstance(template, FMSignal):
return False
if self.length != template.length:
return False
# NOTE: The CSV format only stores six digits after the decimal point.
# Hence, "atol" can not be smaller than 1e-6.
r1 = np.allclose(self.ts, template.ts, atol=1e-6)
r2 = np.allclose(self.data, template.data, atol=1e-6)
r3 = np.allclose(self.variance, template.variance, atol=1e-6)
return r1 and r2 and r3
construct_from_file(fn, mode, user='', cid='', seq=0)
classmethod
Factory method to build the signal by loading a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
The file name (without extension). |
required |
mode |
string |
The file format, either "raw_csv", or "raw_npy". |
required |
user |
string |
The user who creates this signal. |
'' |
cid |
string |
The unique id indicating the content of the signal. |
'' |
seq |
int |
The sequence id in a set when loaded from a dataset. |
0 |
Returns:
Type | Description |
---|---|
FMSignalTemplate |
The constructed signal object. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the "mode" is wrong. |
FileNotFoundError |
If the file does not exist. |
Source code in code_fmkit/fmsignal.py
@classmethod
def construct_from_file(cls, fn, mode, user="", cid="", seq=0):
"""Factory method to build the signal by loading a file.
Args:
fn (string): The file name (without extension).
mode (string): The file format, either "raw_csv", or "raw_npy".
user (string): The user who creates this signal.
cid (string): The unique id indicating the content of the signal.
seq (int): The sequence id in a set when loaded from a dataset.
Returns:
FMSignalTemplate: The constructed signal object.
Raises:
ValueError: If the "mode" is wrong.
FileNotFoundError: If the file does not exist.
"""
template = cls()
template.load_from_file(fn, mode)
template.user = user
template.cid = cid
template.seq = seq
return template
construct_from_signals(signals, template_index, window=50, penalty=0)
classmethod
Construct the template by aligning and average a set of signals.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
signals |
list |
The collection of signals (FMSignal objects). |
required |
template_index |
int |
The index indicating the alignment. |
required |
window |
int |
Alignment window, see "dtw()". |
50 |
penalty |
int |
Element-wise misalignment penalty, see "dtw()". |
0 |
NOTE: The "signals" should be a collection of signals
Source code in code_fmkit/fmsignal.py
@classmethod
def construct_from_signals(cls, signals, template_index,
window=50, penalty=0):
"""Construct the template by aligning and average a set of signals.
Args:
signals (list): The collection of signals (FMSignal objects).
template_index (int): The index indicating the alignment.
window (int): Alignment window, see "dtw()".
penalty (int): Element-wise misalignment penalty, see "dtw()".
**NOTE**: The "signals" should be a collection of signals
"""
signal_t = signals[template_index]
signals_aligned = [signal_t]
k = len(signals)
# construct signal template
length = signal_t.length
dim = signal_t.dim
ts = signal_t.ts.copy()
data = signal_t.data.copy()
variance = np.zeros(signal_t.data.shape, signal_t.data.dtype)
for signal in signals:
if signal == signal_t:
continue
signal_aligned = signal.align_to(signal_t, window, penalty)
signals_aligned.append(signal_aligned)
data += signal_aligned.data
data /= k
for signal_aligned in signals_aligned:
variance += np.square(signal_aligned.data - data)
variance /= k
template = cls(length, dim, ts, data, variance, signals_aligned,
signal_t.user, signal_t.cid, signal_t.seq)
return template
load_from_file(self, fn, mode)
General interface to load the template from a file.
NOTE: This is only for internal usage. If an FMSignalLeap object is needed, use the class method "construct_from_file()" instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
The file name (without the ".csv" or ".npy" extension). |
required |
mode |
string |
The file format, either "raw_csv", or "raw_npy". |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the "mode" is unknown. |
FileNotFoundError |
if the file does not exist. |
Source code in code_fmkit/fmsignal.py
def load_from_file(self, fn, mode):
"""General interface to load the template from a file.
**NOTE**: This is only for internal usage. If an FMSignalLeap object is
needed, use the class method "construct_from_file()" instead.
Args:
fn (string): The file name (without the ".csv" or ".npy" extension).
mode (string): The file format, either "raw_csv", or "raw_npy".
Returns:
None: No return value.
Raises:
ValueError: if the "mode" is unknown.
FileNotFoundError: if the file does not exist.
"""
if mode == "csv":
fn += ".csv"
array = np.loadtxt(fn, dtype=np.float32, delimiter=",")
assert array.shape[1] == 37
l = array.shape[0]
d = 18
ts = array[:, 0:1]
data_and_variance = array[:, 1:]
data = data_and_variance[:, :d]
variance = data_and_variance[:, d:]
elif mode == "npy":
fn += ".npy"
array = np.load(fn)
assert array.dtype == np.float32
assert array.shape[1] == 37
l = array.shape[0]
d = 18
ts = array[:, 0:1]
data = array[:, 1:(d + 1)]
variance = array[:, (d + 1):]
else:
raise ValueError("Unknown file mode %s!" % mode)
assert d == 18
self.length = l
self.dim = d
self.ts = ts
self.data = data
self.variance = variance
save_to_file(self, fn, mode)
General interface to save the template to a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
string |
The file name (without the ".csv" or ".npy" extension). |
required |
mode |
string |
The file format, either "raw_csv", or "raw_npy". |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
Source code in code_fmkit/fmsignal.py
def save_to_file(self, fn, mode):
"""General interface to save the template to a file.
Args:
fn (string): The file name (without the ".csv" or ".npy" extension).
mode (string): The file format, either "raw_csv", or "raw_npy".
Returns:
None: No return value.
"""
l = self.length
array_tup = (self.ts.reshape((l, 1)), self.data, self.variance)
array = np.concatenate(array_tup, axis=1)
if mode == "csv":
fn += ".csv"
np.savetxt(fn, array, fmt="%.6f", delimiter=", ")
elif mode == "npy":
assert array.dtype == np.float32
# NOTE: NumPy library add the ".npy" file extension for us!
np.save(fn, array)
else:
raise ValueError("Unknown file mode %s!" % mode)
update(self, new_signal, factor)
Update the template with a new signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
new_signal |
FMSignal |
The new signal S used to update the template. |
required |
factor |
float |
The update factor, i.e., T_{new} = (1-factor)T_{old} + Sfactor. |
required |
Returns:
Type | Description |
---|---|
None |
No return value. |
NOTE: The "new_signal" must be already aligned to the template.
Source code in code_fmkit/fmsignal.py
def update(self, new_signal, factor):
"""Update the template with a new signal.
Args:
new_signal (FMSignal): The new signal S used to update the template.
factor (float): The update factor,
i.e., T_{new} = (1-factor)*T_{old} + S*factor.
Returns:
None: No return value.
**NOTE**: The "new_signal" must be already aligned to the template.
"""
# update the signal template
self.data = self.data * (1 - factor) + new_signal.data * factor
dtw(series_1, series_2, window=0, penalty=0)
Dynamic Time Warping (DTW) on two multidimensional time series.
SEE: Dynamic Time Warping
NOTE: This is the python implementation. It iteratively accesses each element of the time series array, which could be very slow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
series_1 |
ndarray |
The first array, n-by-d numpy ndarray. |
required |
series_2 |
ndarray |
The second array, m-by-d numpy ndarray. |
required |
window |
int |
The window constraint, by default no constraint. |
0 |
penalty |
float |
The misalign penalty, by default zero. |
0 |
Returns:
Type | Description |
---|---|
tuple |
A tuple containing the following.
|
Exceptions:
Type | Description |
---|---|
ValueError |
If the input series have incompatible dimensions. |
Source code in code_fmkit/fmsignal.py
def dtw(series_1, series_2, window=0, penalty=0):
"""Dynamic Time Warping (DTW) on two multidimensional time series.
**SEE**:
[Dynamic Time Warping](https://en.wikipedia.org/wiki/Dynamic_time_warping)
**NOTE**: This is the python implementation. It iteratively accesses each
element of the time series array, which could be very slow.
Args:
series_1 (ndarray): The first array, n-by-d numpy ndarray.
series_2 (ndarray): The second array, m-by-d numpy ndarray.
window (int): The window constraint, by default no constraint.
penalty (float): The misalign penalty, by default zero.
Returns:
tuple: A tuple containing the following.
int: The final DTW distance.
ndarray: The (n + 1)-by-(m + 1) distance matrix.
ndarray: The (n + 1)-by-(m + 1) direction matrix (warp path).
ndarray: The alignment starting indices from 2 to 1.
ndarray: The alignment ending indices from 2 to 1.
ndarray: The alignment starting indices from 1 to 2.
ndarray: The alignment ending indices from 1 to 2.
ndarray: The generated time series by aligning 2 to 1.
Raises:
ValueError: If the input series have incompatible dimensions.
"""
if not isinstance(series_1, np.ndarray) \
or not isinstance(series_2, np.ndarray) \
or len(series_1.shape) != 2 or len(series_2.shape) != 2 \
or series_1.dtype != np.float32 or series_2.dtype != np.float32:
raise ValueError('Input series must be l-by-d NumPy ndarrays!')
n = series_1.shape[0]
m = series_2.shape[0]
d1 = series_1.shape[1]
d2 = series_2.shape[1]
if d1 != d2:
raise ValueError('Input series must have the same dimension!')
if series_1.dtype != np.float32 or series_2.dtype != np.float32:
raise ValueError('Input series must have "dtype == np.float32"!')
# NOTE: By default, the window is set to a sufficiently large value (m * 2)
# to essentially remove the window constraint.
if window <= 0:
window = m * 2
# These are the distance matrix and the direction table.
dist_matrix = np.zeros((n + 1, m + 1), np.float32)
direction = np.zeros((n + 1, m + 1), np.int32)
# These are the index mapping to align series_2 to series_1.
a2to1_start = np.zeros(n, np.int32)
a2to1_end = np.zeros(n, np.int32)
# These are the index mapping to align series_1 to series_2.
a1to2_start = np.zeros(m, np.int32)
a1to2_end = np.zeros(m, np.int32)
# Initialization.
dist_matrix.fill(1e6)
dist_matrix[0, 0] = 0
direction[0, 0] = 0
#dist_matrix[:, 0] = 0
#dist_matrix[0, :] = 0
# find the warping path
for i in range(1, n + 1):
jj = int(float(m) / n * i)
start = jj - window if jj - window > 1 else 1
end = jj + window if jj + window < m + 1 else m + 1
for j in range(start, end):
# CAUTION: series_1[0] and series_2[0] mapps to dists[1][1],
# and i, j here are indexing dists instead of the series,
# i.e., dists[i][j] is comparing series_1[i - 1] and series_2[j - 1]
cost = np.linalg.norm(series_1[i - 1] - series_2[j - 1])
min_dist = dist_matrix[i - 1, j - 1]
direction[i, j] = 1 # 1 stands for diagonal
if dist_matrix[i - 1, j] + penalty < min_dist:
min_dist = dist_matrix[i - 1, j] + penalty
direction[i, j] = 2 # 2 stands for the i direction
if dist_matrix[i][j - 1] + penalty < min_dist:
min_dist = dist_matrix[i][j - 1] + penalty
direction[i, j] = 4 # 4 stands for the j direction
dist_matrix[i][j] = cost + min_dist
# trace back the warping path to find element-wise mapping
# print('warping path done')
a2to1_start[n - 1] = m - 1
a2to1_end[n - 1] = m - 1
a1to2_start[m - 1] = n - 1
a1to2_end[m - 1] = n - 1
i = n
j = m
while True:
if direction[i, j] == 2: # the i direction
i -= 1
a2to1_start[i - 1] = j - 1
a2to1_end[i - 1] = j - 1
a1to2_start[j - 1] = i - 1
elif direction[i, j] == 4: # the j direction
j -= 1
a1to2_start[j - 1] = i - 1
a1to2_end[j - 1] = i - 1
a2to1_start[i - 1] = j - 1
elif direction[i, j] == 1: # the diagonal direction
i -= 1
j -= 1
if i == 0 and j == 0:
break
a2to1_start[i - 1] = j - 1
a2to1_end[i - 1] = j - 1
a1to2_start[j - 1] = i - 1
a1to2_end[j - 1] = i - 1
else: # direction[i][j] == 0, the corner
break
series_2to1 = np.zeros(series_1.shape, series_1.dtype)
for i, jj, kk in zip(range(n), a2to1_start, a2to1_end):
if jj == kk:
series_2to1[i] = series_2[jj]
else:
series_2to1[i] = np.mean(series_2[jj : kk + 1, :], axis=0)
return dist_matrix[n, m], dist_matrix[1:, 1:], direction, \
a2to1_start, a2to1_end, a1to2_start, a1to2_end, series_2to1
dtw_c(series_1, series_2, window=0, penalty=0)
Python wrapper of the Dynamic Time Warping (DTW) implemented in C.
SEE: Dynamic Time Warping
SEE: The "fmkit_cutils" package in this project has the implementation.
NOTE: This is a Python wrapper of the C implementation, which is relative fast compared to the pure Python implementation.
NOTE: The input series must have "dtype == np.float32".
Parameters:
Name | Type | Description | Default |
---|---|---|---|
series_1 |
ndarray |
The first array, n-by-d numpy ndarray. |
required |
series_2 |
ndarray |
The second array, m-by-d numpy ndarray. |
required |
window |
int |
The window constraint, by default no constraint. |
0 |
penalty |
float |
The misalign penalty, by default zero. |
0 |
Returns:
Type | Description |
---|---|
tuple |
A tuple containing the following.
|
Exceptions:
Type | Description |
---|---|
ValueError |
If the input series have incompatible dimensions. |
Source code in code_fmkit/fmsignal.py
def dtw_c(series_1, series_2, window=0, penalty=0):
"""Python wrapper of the Dynamic Time Warping (DTW) implemented in C.
**SEE**:
[Dynamic Time Warping](https://en.wikipedia.org/wiki/Dynamic_time_warping)
**SEE**: The "fmkit_cutils" package in this project has the implementation.
**NOTE**: This is a Python wrapper of the C implementation, which is
relative fast compared to the pure Python implementation.
**NOTE**: The input series must have "dtype == np.float32".
Args:
series_1 (ndarray): The first array, n-by-d numpy ndarray.
series_2 (ndarray): The second array, m-by-d numpy ndarray.
window (int, optional): The window constraint, by default no constraint.
penalty (float, optional): The misalign penalty, by default zero.
Returns:
tuple: A tuple containing the following.
int: The final DTW distance.
ndarray: The (n + 1)-by-(m + 1) distance matrix.
ndarray: The (n + 1)-by-(m + 1) direction matrix (warp path).
ndarray: The alignment starting indices from 2 to 1.
ndarray: The alignment ending indices from 2 to 1.
ndarray: The alignment starting indices from 1 to 2.
ndarray: The alignment ending indices from 1 to 2.
ndarray: The generated time series by aligning 2 to 1.
Raises:
ValueError: If the input series have incompatible dimensions.
"""
if not isinstance(series_1, np.ndarray) \
or not isinstance(series_2, np.ndarray) \
or len(series_1.shape) != 2 or len(series_2.shape) != 2:
raise ValueError('Input series must be l-by-d NumPy ndarrays!')
n = series_1.shape[0]
m = series_2.shape[0]
d1 = series_1.shape[1]
d2 = series_2.shape[1]
if d1 != d2:
raise ValueError('Input series must have the same dimension!')
if series_1.dtype != np.float32 or series_2.dtype != np.float32:
raise ValueError('Input series must have "dtype == np.float32"!')
if window <= 0:
window = m * 2
# print('begin dtw_c()')
# NOTE: It may be problematic to manage the memory and Python object
# life-cycle in C code, and hence, we allocate space here in the wrapper.
dist_matrix = np.zeros((n + 1, m + 1), np.float32)
direction = np.zeros((n + 1, m + 1), np.int32)
# These are the index mapping to align series_2 to series_1.
a2to1_start = np.zeros(n, np.int32)
a2to1_end = np.zeros(n, np.int32)
# These are the index mapping to align series_1 to series_2.
a1to2_start = np.zeros(m, np.int32)
a1to2_end = np.zeros(m, np.int32)
series_2to1 = np.zeros(series_1.shape, np.float32)
# Initialization.
dist_matrix.fill(1e6)
dist = fmkit_utilities.dtw_c(
series_1,
series_2,
series_1.shape[1],
n,
m,
window,
penalty,
dist_matrix,
direction,
a2to1_start,
a2to1_end,
a1to2_start,
a1to2_end,
series_2to1,
)
# print('end dtw_c()')
return dist, dist_matrix[1:, 1:], direction, \
a2to1_start, a2to1_end, a1to2_start, a1to2_end, series_2to1
normalize_warping_path_a2to1(a2to1_start, a2to1_end, n=100)
Normalize the warping path from l2-by-l1 to n-by-n.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
a2to1_start |
ndarray |
The alignment starting indices from 2 to 1. |
required |
a2to1_end |
ndarray |
The alignment ending indices from 2 to 1. |
required |
n |
int |
The size of the warping path after normalization. |
100 |
Returns:
Type | Description |
---|---|
(ndarray) |
The "a2to1_start" after normalization. (ndarray): The "a2to1_end" after normalization. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the input arrays have wrong type or value. |
Source code in code_fmkit/fmsignal.py
def normalize_warping_path_a2to1(a2to1_start, a2to1_end, n=100):
"""Normalize the warping path from l2-by-l1 to n-by-n.
Args:
a2to1_start (ndarray): The alignment starting indices from 2 to 1.
a2to1_end (ndarray): The alignment ending indices from 2 to 1.
n (int): The size of the warping path after normalization.
Returns:
(ndarray): The "a2to1_start" after normalization.
(ndarray): The "a2to1_end" after normalization.
Raises:
ValueError: If the input arrays have wrong type or value.
"""
if not isinstance(a2to1_start, np.ndarray) \
or not isinstance(a2to1_end, np.ndarray) \
or len(a2to1_start.shape) != 1 or len(a2to1_end.shape) != 1 \
or a2to1_start.dtype != np.int or a2to1_end.dtype != np.int:
raise ValueError('Input series must be two 1D NumPy arrays!')
assert a2to1_start.shape[0] == a2to1_end.shape[0]
assert np.all((a2to1_end - a2to1_start) >= 0)
assert n > 0
l1 = a2to1_start.shape[0]
l2 = a2to1_end[-1]
xp = np.arange(l1)
x_n = np.linspace(0, l1 - 1, n)
a2to1_start_n = np.interp(x_n, xp, a2to1_start)
a2to1_start_n = a2to1_start_n / l2 * (n - 1)
a2to1_end_n = np.interp(x_n, xp, a2to1_end)
a2to1_end_n = a2to1_end_n / l2 * (n - 1)
return a2to1_start_n.astype(np.int32), a2to1_end_n.astype(np.int32)
warping_path_to_xy_sequences(a2to1_start, a2to1_end)
Convert the warping path from alignment indices to xy coordinates.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
a2to1_start |
ndarray |
The alignment starting indices from 2 to 1. |
required |
a2to1_end |
ndarray |
The alignment ending indices from 2 to 1. |
required |
Returns:
Type | Description |
---|---|
(list) |
The x coordinates. (list): The y coordinates. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the input arrays have wrong type or value. |
Source code in code_fmkit/fmsignal.py
def warping_path_to_xy_sequences(a2to1_start, a2to1_end):
"""Convert the warping path from alignment indices to xy coordinates.
Args:
a2to1_start (ndarray): The alignment starting indices from 2 to 1.
a2to1_end (ndarray): The alignment ending indices from 2 to 1.
Returns:
(list): The x coordinates.
(list): The y coordinates.
Raises:
ValueError: If the input arrays have wrong type or value.
"""
if not isinstance(a2to1_start, np.ndarray) \
or not isinstance(a2to1_end, np.ndarray) \
or len(a2to1_start.shape) != 1 or len(a2to1_end.shape) != 1 \
or a2to1_start.dtype != np.int or a2to1_end.dtype != np.int:
raise ValueError('Input series must be two 1D NumPy arrays!')
assert a2to1_start.shape[0] == a2to1_end.shape[0]
assert np.all((a2to1_end - a2to1_start) >= 0)
xs = []
ys = []
for i, (start, end) in enumerate(zip(a2to1_start, a2to1_end)):
for j in range(start, end + 1):
xs.append(i)
ys.append(j)
#print(i, j, start, end)
return xs, ys