From 5532043b1f4e4f71ddab9e140c503d44712f07b6 Mon Sep 17 00:00:00 2001 From: Primoz Date: Wed, 25 May 2022 19:39:47 +0000 Subject: [PATCH] Patching IBI with BVP - completed. --- rules/features.smk | 1 - src/data/streams/empatica_zip/container.py | 63 ++++++++++++++++--- .../empatica_inter_beat_interval/cr/main.py | 12 ---- src/features/utils/utils.py | 6 +- 4 files changed, 54 insertions(+), 28 deletions(-) diff --git a/rules/features.smk b/rules/features.smk index 76b2e94c..1b6e0ad8 100644 --- a/rules/features.smk +++ b/rules/features.smk @@ -899,7 +899,6 @@ rule empatica_blood_volume_pulse_r_features: rule empatica_inter_beat_interval_python_features: input: sensor_data = "data/raw/{pid}/empatica_inter_beat_interval_with_datetime.csv", - bvp_sensor_data = "data/raw/{pid}/empatica_blood_volume_pulse_with_datetime.csv", time_segments_labels = "data/interim/time_segments/{pid}_time_segments_labels.csv" params: provider = lambda wildcards: config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][wildcards.provider_key.upper()], diff --git a/src/data/streams/empatica_zip/container.py b/src/data/streams/empatica_zip/container.py index 4edb8369..80224893 100644 --- a/src/data/streams/empatica_zip/container.py +++ b/src/data/streams/empatica_zip/container.py @@ -2,13 +2,16 @@ from zipfile import ZipFile import warnings from pathlib import Path import pandas as pd +import numpy as np from pandas.core import indexing import yaml import csv from collections import OrderedDict from io import BytesIO, StringIO +import sys, os -from cr_features.hrv import get_HRV_features +from cr_features.hrv import get_HRV_features, get_patched_ibi_with_bvp +from cr_features.helper_functions import empatica1d_to_array, empatica2d_to_array def processAcceleration(x, y, z): x = float(x) @@ -88,6 +91,10 @@ def pull_data(data_configuration, device, sensor, container, columns_to_download participant_data = pd.DataFrame(columns=columns_to_download.values()) participant_data.set_index('timestamp', inplace=True) + with open('config.yaml', 'r') as stream: + config = yaml.load(stream, Loader=yaml.FullLoader) + cr_ibi_provider = config['EMPATICA_INTER_BEAT_INTERVAL']['PROVIDERS']['CR'] + available_zipfiles = list((Path(data_configuration["FOLDER"]) / Path(device)).rglob("*.zip")) if len(available_zipfiles) == 0: warnings.warn("There were no zip files in: {}. If you were expecting data for this participant the [EMPATICA][DEVICE_IDS] key in their participant file is missing the pid".format((Path(data_configuration["FOLDER"]) / Path(device)))) @@ -96,17 +103,15 @@ def pull_data(data_configuration, device, sensor, container, columns_to_download print("Extracting {} data from {} for {}".format(sensor, zipfile, device)) with ZipFile(zipfile, 'r') as zipFile: listOfFileNames = zipFile.namelist() - if sensor == "EMPATICA_INTER_BEAT_INTERVAL": - extracted_bvp_data = extract_empatica_data(zipFile.read('BVP.csv'), "EMPATICA_BLOOD_VOLUME_PULSE") - hrv_time_and_freq_features, sample, bvp_rr, bvp_timings, peak_indx = \ - get_HRV_features(extracted_bvp_data['blood_volume_pulse'].to_numpy(), ma=False, detrend=False, m_deternd=False, - low_pass=False, winsorize=True, winsorize_value=25, - hampel_fiter=False, median_filter=False, mod_z_score_filter=True, - sampling=64, feature_names=['meanHr']) - print(bvp_rr, bvp_timings) for fileName in listOfFileNames: if fileName == sensor_csv: - participant_data = pd.concat([participant_data, extract_empatica_data(zipFile.read(fileName), sensor)], axis=0) + if sensor == "EMPATICA_INTER_BEAT_INTERVAL" and cr_ibi_provider.get('PATCH_WITH_BVP', False): + participant_data = \ + pd.concat([participant_data, patch_ibi_with_bvp(zipFile.read('IBI.csv'), zipFile.read('BVP.csv'))], axis=0) + #print("patch with ibi") + else: + participant_data = pd.concat([participant_data, extract_empatica_data(zipFile.read(fileName), sensor)], axis=0) + #print("no patching") warning = False if warning: warnings.warn("We could not find a zipped file for {} in {} (we tried to find {})".format(sensor, zipFile, sensor_csv)) @@ -117,4 +122,42 @@ def pull_data(data_configuration, device, sensor, container, columns_to_download participant_data["device_id"] = device return(participant_data) +def patch_ibi_with_bvp(ibi_data, bvp_data): + ibi_data_file = BytesIO(ibi_data).getvalue().decode('utf-8') + ibi_data_file = StringIO(ibi_data_file) + + ibi_data, ibi_start_timestamp = empatica2d_to_array(ibi_data_file) + + bvp_data_file = BytesIO(bvp_data).getvalue().decode('utf-8') + bvp_data_file = StringIO(bvp_data_file) + + bvp_data, bvp_start_timestamp, sample_rate = empatica1d_to_array(bvp_data_file) + + hrv_time_and_freq_features, sample, bvp_rr, bvp_timings, peak_indx = \ + get_HRV_features(bvp_data, ma=False, + detrend=False, m_deternd=False, low_pass=False, winsorize=True, + winsorize_value=25, hampel_fiter=False, median_filter=False, + mod_z_score_filter=True, sampling=64, feature_names=['meanHr']) + + ibi_timings, ibi_rr = get_patched_ibi_with_bvp(ibi_data[0], ibi_data[1], bvp_timings, bvp_rr, min_length=10) + + df = \ + pd.DataFrame(np.array([ibi_timings, ibi_rr]).transpose(), columns=['timestamp', 'inter_beat_interval']) + df.loc[-1] = [ibi_start_timestamp, 'IBI'] # adding a row + df.index = df.index + 1 # shifting index + df = df.sort_index() # sorting by index + + # Repeated as in extract_empatica_data for IBI + df['timings'] = df['timestamp'] + timestampstart = float(df['timestamp'][0]) + df['timestamp'] = (df['timestamp'][1:len(df)]).astype(float) + timestampstart + df = df.drop([0]) + df['inter_beat_interval'] = df['inter_beat_interval'].astype(float) + df = df.set_index('timestamp') + + # format timestamps + df.index *= 1000 + df.index = df.index.astype(int) + return(df) + # print(pull_data({'FOLDER': 'data/external/empatica'}, "e01", "EMPATICA_accelerometer", {'TIMESTAMP': 'timestamp', 'DEVICE_ID': 'device_id', 'DOUBLE_VALUES_0': 'x', 'DOUBLE_VALUES_1': 'y', 'DOUBLE_VALUES_2': 'z'})) \ No newline at end of file diff --git a/src/features/empatica_inter_beat_interval/cr/main.py b/src/features/empatica_inter_beat_interval/cr/main.py index 607e5638..8b4cab5d 100644 --- a/src/features/empatica_inter_beat_interval/cr/main.py +++ b/src/features/empatica_inter_beat_interval/cr/main.py @@ -48,21 +48,9 @@ def extract_ibi_features_from_intraday_data(ibi_intraday_data, features, window_ return ibi_intraday_features -def patch_IBI_with_BVP(bvp_intraday_data): - # get features method is used because - hrv_time_and_freq_features, sample, rr, timings, peak_indx = \ - get_HRV_features(bvp_intraday_data['blood_volume_pulse'].to_numpy(), hampel_fiter=False, median_filter=False, mod_z_score_filter=True, sampling=64, feature_names=['meanHr']) - - def cr_features(sensor_data_files, time_segment, provider, filter_data_by_segment, *args, **kwargs): - print(sensor_data_files) ibi_intraday_data = pd.read_csv(sensor_data_files["sensor_data"]) - if provider["PATCH_WITH_BVP"]: - bvp_intraday_data = pd.read_csv(sensor_data_files["bvp_sensor_data"]) - patch_IBI_with_BVP(bvp_intraday_data) - # sys.exit() - requested_intraday_features = provider["FEATURES"] calc_windows = kwargs.get('calc_windows', False) diff --git a/src/features/utils/utils.py b/src/features/utils/utils.py index a7bf9fe3..7303ac86 100644 --- a/src/features/utils/utils.py +++ b/src/features/utils/utils.py @@ -168,8 +168,4 @@ def run_provider_cleaning_script(provider, provider_key, sensor_key, sensor_data cleaning_function = getattr(cleaning_module, provider_key.lower() + "_cleaning") sensor_features = cleaning_function(sensor_data_files, provider) - return sensor_features - - -def empatica_patch_IBI_with_BVP(bvp_data): - pass + return sensor_features \ No newline at end of file