rapids/src/features/empatica_inter_beat_interval/cr/main.py

86 lines
3.9 KiB
Python

import pandas as pd
import numpy as np
from cr_features.helper_functions import convert_ibi_to2d_time, hrv_features, hrv_freq_features
from cr_features.hrv import extract_hrv_features_2d_wrapper
import math
import sys
pd.set_option('display.max_rows', 1000)
pd.set_option('display.max_columns', None)
def get_sample_rate(data):
try:
timestamps_diff = data['timestamp'].diff().dropna().mean()
except:
raise Exception("Error occured while trying to get the mean sample rate from the data.")
return int(1000/timestamps_diff)
def extract_ibi_features_from_intraday_data(ibi_intraday_data, features, window_length, time_segment, filter_data_by_segment):
ibi_intraday_features = pd.DataFrame(columns=["local_segment"] + features)
if not ibi_intraday_data.empty:
sample_rate = get_sample_rate(ibi_intraday_data)
ibi_intraday_data = filter_data_by_segment(ibi_intraday_data, time_segment)
if not ibi_intraday_data.empty:
ibi_intraday_features = pd.DataFrame()
# np.set_printoptions(threshold=sys.maxsize)
# print(ibi_intraday_data.groupby('local_segment').apply(lambda x: math.ceil(x['timings'].iloc[-1])))
# nekaj = ibi_intraday_data.groupby('local_segment').apply(lambda x: \
# convert_ibi_to2d_time(x[['timings', 'inter_beat_interval']], window_length)[1])
# sys.exit()
# apply methods from calculate features module
if window_length is None:
ibi_intraday_features = \
ibi_intraday_data.groupby('local_segment').apply(\
lambda x:
extract_hrv_features_2d_wrapper(
signal_2D = \
convert_ibi_to2d_time(x[['timings', 'inter_beat_interval']], math.ceil(x['timings'].iloc[-1]))[0],
ibi_timings = \
convert_ibi_to2d_time(x[['timings', 'inter_beat_interval']], math.ceil(x['timings'].iloc[-1]))[1],
sampling=None, hampel_fiter=False, median_filter=False, mod_z_score_filter=True, feature_names=features))
else:
ibi_intraday_features = \
ibi_intraday_data.groupby('local_segment').apply(\
lambda x:
extract_hrv_features_2d_wrapper(
signal_2D = convert_ibi_to2d_time(x[['timings', 'inter_beat_interval']], window_length)[0],
ibi_timings = convert_ibi_to2d_time(x[['timings', 'inter_beat_interval']], window_length)[1],
sampling=None, hampel_fiter=False, median_filter=False, mod_z_score_filter=True, feature_names=features))
ibi_intraday_features.reset_index(inplace=True)
return ibi_intraday_features
def cr_features(sensor_data_files, time_segment, provider, filter_data_by_segment, *args, **kwargs):
ibi_intraday_data = pd.read_csv(sensor_data_files["sensor_data"])
requested_intraday_features = provider["FEATURES"]
calc_windows = kwargs.get('calc_windows', False)
if provider["WINDOWS"]["COMPUTE"] and calc_windows:
requested_window_length = provider["WINDOWS"]["WINDOW_LENGTH"]
else:
requested_window_length = None
# name of the features this function can compute
base_intraday_features_names = hrv_features + hrv_freq_features
# the subset of requested features this function can compute
intraday_features_to_compute = list(set(requested_intraday_features) & set(base_intraday_features_names))
# extract features from intraday data
ibi_intraday_features = extract_ibi_features_from_intraday_data(ibi_intraday_data, intraday_features_to_compute,
requested_window_length, time_segment, filter_data_by_segment)
return ibi_intraday_features