Patching IBI with BVP - completed.

sociality-task
Primoz 2022-05-25 19:39:47 +00:00
parent bb62497ba6
commit 5532043b1f
4 changed files with 54 additions and 28 deletions

View File

@ -899,7 +899,6 @@ rule empatica_blood_volume_pulse_r_features:
rule empatica_inter_beat_interval_python_features:
input:
sensor_data = "data/raw/{pid}/empatica_inter_beat_interval_with_datetime.csv",
bvp_sensor_data = "data/raw/{pid}/empatica_blood_volume_pulse_with_datetime.csv",
time_segments_labels = "data/interim/time_segments/{pid}_time_segments_labels.csv"
params:
provider = lambda wildcards: config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][wildcards.provider_key.upper()],

View File

@ -2,13 +2,16 @@ from zipfile import ZipFile
import warnings
from pathlib import Path
import pandas as pd
import numpy as np
from pandas.core import indexing
import yaml
import csv
from collections import OrderedDict
from io import BytesIO, StringIO
import sys, os
from cr_features.hrv import get_HRV_features
from cr_features.hrv import get_HRV_features, get_patched_ibi_with_bvp
from cr_features.helper_functions import empatica1d_to_array, empatica2d_to_array
def processAcceleration(x, y, z):
x = float(x)
@ -88,6 +91,10 @@ def pull_data(data_configuration, device, sensor, container, columns_to_download
participant_data = pd.DataFrame(columns=columns_to_download.values())
participant_data.set_index('timestamp', inplace=True)
with open('config.yaml', 'r') as stream:
config = yaml.load(stream, Loader=yaml.FullLoader)
cr_ibi_provider = config['EMPATICA_INTER_BEAT_INTERVAL']['PROVIDERS']['CR']
available_zipfiles = list((Path(data_configuration["FOLDER"]) / Path(device)).rglob("*.zip"))
if len(available_zipfiles) == 0:
warnings.warn("There were no zip files in: {}. If you were expecting data for this participant the [EMPATICA][DEVICE_IDS] key in their participant file is missing the pid".format((Path(data_configuration["FOLDER"]) / Path(device))))
@ -96,17 +103,15 @@ def pull_data(data_configuration, device, sensor, container, columns_to_download
print("Extracting {} data from {} for {}".format(sensor, zipfile, device))
with ZipFile(zipfile, 'r') as zipFile:
listOfFileNames = zipFile.namelist()
if sensor == "EMPATICA_INTER_BEAT_INTERVAL":
extracted_bvp_data = extract_empatica_data(zipFile.read('BVP.csv'), "EMPATICA_BLOOD_VOLUME_PULSE")
hrv_time_and_freq_features, sample, bvp_rr, bvp_timings, peak_indx = \
get_HRV_features(extracted_bvp_data['blood_volume_pulse'].to_numpy(), ma=False, detrend=False, m_deternd=False,
low_pass=False, winsorize=True, winsorize_value=25,
hampel_fiter=False, median_filter=False, mod_z_score_filter=True,
sampling=64, feature_names=['meanHr'])
print(bvp_rr, bvp_timings)
for fileName in listOfFileNames:
if fileName == sensor_csv:
participant_data = pd.concat([participant_data, extract_empatica_data(zipFile.read(fileName), sensor)], axis=0)
if sensor == "EMPATICA_INTER_BEAT_INTERVAL" and cr_ibi_provider.get('PATCH_WITH_BVP', False):
participant_data = \
pd.concat([participant_data, patch_ibi_with_bvp(zipFile.read('IBI.csv'), zipFile.read('BVP.csv'))], axis=0)
#print("patch with ibi")
else:
participant_data = pd.concat([participant_data, extract_empatica_data(zipFile.read(fileName), sensor)], axis=0)
#print("no patching")
warning = False
if warning:
warnings.warn("We could not find a zipped file for {} in {} (we tried to find {})".format(sensor, zipFile, sensor_csv))
@ -117,4 +122,42 @@ def pull_data(data_configuration, device, sensor, container, columns_to_download
participant_data["device_id"] = device
return(participant_data)
def patch_ibi_with_bvp(ibi_data, bvp_data):
ibi_data_file = BytesIO(ibi_data).getvalue().decode('utf-8')
ibi_data_file = StringIO(ibi_data_file)
ibi_data, ibi_start_timestamp = empatica2d_to_array(ibi_data_file)
bvp_data_file = BytesIO(bvp_data).getvalue().decode('utf-8')
bvp_data_file = StringIO(bvp_data_file)
bvp_data, bvp_start_timestamp, sample_rate = empatica1d_to_array(bvp_data_file)
hrv_time_and_freq_features, sample, bvp_rr, bvp_timings, peak_indx = \
get_HRV_features(bvp_data, ma=False,
detrend=False, m_deternd=False, low_pass=False, winsorize=True,
winsorize_value=25, hampel_fiter=False, median_filter=False,
mod_z_score_filter=True, sampling=64, feature_names=['meanHr'])
ibi_timings, ibi_rr = get_patched_ibi_with_bvp(ibi_data[0], ibi_data[1], bvp_timings, bvp_rr, min_length=10)
df = \
pd.DataFrame(np.array([ibi_timings, ibi_rr]).transpose(), columns=['timestamp', 'inter_beat_interval'])
df.loc[-1] = [ibi_start_timestamp, 'IBI'] # adding a row
df.index = df.index + 1 # shifting index
df = df.sort_index() # sorting by index
# Repeated as in extract_empatica_data for IBI
df['timings'] = df['timestamp']
timestampstart = float(df['timestamp'][0])
df['timestamp'] = (df['timestamp'][1:len(df)]).astype(float) + timestampstart
df = df.drop([0])
df['inter_beat_interval'] = df['inter_beat_interval'].astype(float)
df = df.set_index('timestamp')
# format timestamps
df.index *= 1000
df.index = df.index.astype(int)
return(df)
# print(pull_data({'FOLDER': 'data/external/empatica'}, "e01", "EMPATICA_accelerometer", {'TIMESTAMP': 'timestamp', 'DEVICE_ID': 'device_id', 'DOUBLE_VALUES_0': 'x', 'DOUBLE_VALUES_1': 'y', 'DOUBLE_VALUES_2': 'z'}))

View File

@ -48,21 +48,9 @@ def extract_ibi_features_from_intraday_data(ibi_intraday_data, features, window_
return ibi_intraday_features
def patch_IBI_with_BVP(bvp_intraday_data):
# get features method is used because
hrv_time_and_freq_features, sample, rr, timings, peak_indx = \
get_HRV_features(bvp_intraday_data['blood_volume_pulse'].to_numpy(), hampel_fiter=False, median_filter=False, mod_z_score_filter=True, sampling=64, feature_names=['meanHr'])
def cr_features(sensor_data_files, time_segment, provider, filter_data_by_segment, *args, **kwargs):
print(sensor_data_files)
ibi_intraday_data = pd.read_csv(sensor_data_files["sensor_data"])
if provider["PATCH_WITH_BVP"]:
bvp_intraday_data = pd.read_csv(sensor_data_files["bvp_sensor_data"])
patch_IBI_with_BVP(bvp_intraday_data)
# sys.exit()
requested_intraday_features = provider["FEATURES"]
calc_windows = kwargs.get('calc_windows', False)

View File

@ -168,8 +168,4 @@ def run_provider_cleaning_script(provider, provider_key, sensor_key, sensor_data
cleaning_function = getattr(cleaning_module, provider_key.lower() + "_cleaning")
sensor_features = cleaning_function(sensor_data_files, provider)
return sensor_features
def empatica_patch_IBI_with_BVP(bvp_data):
pass
return sensor_features