2021-03-07 05:16:59 +01:00
from zipfile import ZipFile
import warnings
from pathlib import Path
2020-12-15 02:30:34 +01:00
import pandas as pd
2022-05-25 21:39:47 +02:00
import numpy as np
2020-12-15 02:30:34 +01:00
from pandas . core import indexing
import yaml
2020-12-30 15:17:30 +01:00
import csv
from collections import OrderedDict
2021-03-07 05:16:59 +01:00
from io import BytesIO , StringIO
2022-05-25 21:39:47 +02:00
import sys , os
2020-12-30 15:17:30 +01:00
2022-05-25 21:39:47 +02:00
from cr_features . hrv import get_HRV_features , get_patched_ibi_with_bvp
from cr_features . helper_functions import empatica1d_to_array , empatica2d_to_array
2022-05-24 13:07:18 +02:00
2020-12-30 15:17:30 +01:00
def processAcceleration ( x , y , z ) :
x = float ( x )
y = float ( y )
z = float ( z )
return { ' x ' : x , ' y ' : y , ' z ' : z }
def readFile ( file , dtype ) :
dict = OrderedDict ( )
2021-03-07 05:16:59 +01:00
# file is an in-memory buffer
with file as csvfile :
2021-03-09 22:42:02 +01:00
if dtype in ( ' EMPATICA_ELECTRODERMAL_ACTIVITY ' , ' EMPATICA_TEMPERATURE ' , ' EMPATICA_HEARTRATE ' , ' EMPATICA_BLOOD_VOLUME_PULSE ' ) :
2020-12-30 15:17:30 +01:00
reader = csv . reader ( csvfile , delimiter = ' \n ' )
2021-03-09 22:42:02 +01:00
elif dtype == ' EMPATICA_ACCELEROMETER ' :
2020-12-30 15:17:30 +01:00
reader = csv . reader ( csvfile , delimiter = ' , ' )
i = 0
for row in reader :
if i == 0 :
timestamp = float ( row [ 0 ] )
elif i == 1 :
hertz = float ( row [ 0 ] )
else :
if i == 2 :
pass
else :
timestamp = timestamp + 1.0 / hertz
2021-03-09 22:42:02 +01:00
if dtype in ( ' EMPATICA_ELECTRODERMAL_ACTIVITY ' , ' EMPATICA_TEMPERATURE ' , ' EMPATICA_HEARTRATE ' , ' EMPATICA_BLOOD_VOLUME_PULSE ' ) :
2020-12-30 15:17:30 +01:00
dict [ timestamp ] = row [ 0 ]
2021-03-09 22:42:02 +01:00
elif dtype == ' EMPATICA_ACCELEROMETER ' :
2020-12-30 15:17:30 +01:00
dict [ timestamp ] = processAcceleration ( row [ 0 ] , row [ 1 ] , row [ 2 ] )
i + = 1
return dict
2020-12-15 02:30:34 +01:00
2021-03-07 05:16:59 +01:00
def extract_empatica_data ( data , sensor ) :
sensor_data_file = BytesIO ( data ) . getvalue ( ) . decode ( ' utf-8 ' )
sensor_data_file = StringIO ( sensor_data_file )
2021-03-11 19:23:56 +01:00
column = sensor . replace ( " EMPATICA_ " , " " ) . lower ( )
2020-12-30 15:17:30 +01:00
# read sensor data
2021-03-09 22:42:02 +01:00
if sensor in ( ' EMPATICA_ELECTRODERMAL_ACTIVITY ' , ' EMPATICA_TEMPERATURE ' , ' EMPATICA_HEARTRATE ' , ' EMPATICA_BLOOD_VOLUME_PULSE ' ) :
2020-12-30 15:17:30 +01:00
ddict = readFile ( sensor_data_file , sensor )
2021-03-11 19:23:56 +01:00
df = pd . DataFrame . from_dict ( ddict , orient = ' index ' , columns = [ column ] )
df [ column ] = df [ column ] . astype ( float )
2020-12-30 15:17:30 +01:00
df . index . name = ' timestamp '
2020-12-15 02:30:34 +01:00
2021-03-09 22:42:02 +01:00
elif sensor == ' EMPATICA_ACCELEROMETER ' :
2020-12-30 15:17:30 +01:00
ddict = readFile ( sensor_data_file , sensor )
df = pd . DataFrame . from_dict ( ddict , orient = ' index ' , columns = [ ' x ' , ' y ' , ' z ' ] )
df [ ' x ' ] = df [ ' x ' ] . astype ( float )
df [ ' y ' ] = df [ ' y ' ] . astype ( float )
df [ ' z ' ] = df [ ' z ' ] . astype ( float )
df . index . name = ' timestamp '
2021-03-09 22:42:02 +01:00
elif sensor == ' EMPATICA_INTER_BEAT_INTERVAL ' :
2022-04-20 12:44:51 +02:00
2022-06-02 13:43:49 +02:00
df = pd . read_csv ( sensor_data_file , names = [ ' timings ' , column ] , header = None )
df [ ' timestamp ' ] = df [ ' timings ' ]
if df . empty :
df = df . set_index ( ' timestamp ' )
return df
2020-12-30 15:17:30 +01:00
timestampstart = float ( df [ ' timestamp ' ] [ 0 ] )
2022-04-20 12:44:51 +02:00
df [ ' timestamp ' ] = ( df [ ' timestamp ' ] [ 1 : len ( df ) ] ) . astype ( float ) + timestampstart
2020-12-30 15:17:30 +01:00
df = df . drop ( [ 0 ] )
2021-03-11 19:23:56 +01:00
df [ column ] = df [ column ] . astype ( float )
2020-12-30 15:17:30 +01:00
df = df . set_index ( ' timestamp ' )
2022-04-20 12:44:51 +02:00
2020-12-30 15:17:30 +01:00
else :
raise ValueError (
2021-03-11 19:23:56 +01:00
" sensor has an invalid name: {} " . format ( sensor ) )
2020-12-30 15:17:30 +01:00
# format timestamps
df . index * = 1000
df . index = df . index . astype ( int )
2021-03-07 05:16:59 +01:00
return ( df )
2021-03-09 22:42:02 +01:00
def pull_data ( data_configuration , device , sensor , container , columns_to_download ) :
sensor_csv = container + ' .csv '
2021-03-07 05:16:59 +01:00
warning = True
participant_data = pd . DataFrame ( columns = columns_to_download . values ( ) )
participant_data . set_index ( ' timestamp ' , inplace = True )
2022-05-25 21:39:47 +02:00
with open ( ' config.yaml ' , ' r ' ) as stream :
config = yaml . load ( stream , Loader = yaml . FullLoader )
cr_ibi_provider = config [ ' EMPATICA_INTER_BEAT_INTERVAL ' ] [ ' PROVIDERS ' ] [ ' CR ' ]
2021-03-11 19:35:34 +01:00
available_zipfiles = list ( ( Path ( data_configuration [ " FOLDER " ] ) / Path ( device ) ) . rglob ( " *.zip " ) )
if len ( available_zipfiles ) == 0 :
warnings . warn ( " There were no zip files in: {} . If you were expecting data for this participant the [EMPATICA][DEVICE_IDS] key in their participant file is missing the pid " . format ( ( Path ( data_configuration [ " FOLDER " ] ) / Path ( device ) ) ) )
for zipfile in available_zipfiles :
2021-03-07 05:16:59 +01:00
print ( " Extracting {} data from {} for {} " . format ( sensor , zipfile , device ) )
with ZipFile ( zipfile , ' r ' ) as zipFile :
listOfFileNames = zipFile . namelist ( )
for fileName in listOfFileNames :
if fileName == sensor_csv :
2022-05-25 21:39:47 +02:00
if sensor == " EMPATICA_INTER_BEAT_INTERVAL " and cr_ibi_provider . get ( ' PATCH_WITH_BVP ' , False ) :
participant_data = \
pd . concat ( [ participant_data , patch_ibi_with_bvp ( zipFile . read ( ' IBI.csv ' ) , zipFile . read ( ' BVP.csv ' ) ) ] , axis = 0 )
#print("patch with ibi")
else :
participant_data = pd . concat ( [ participant_data , extract_empatica_data ( zipFile . read ( fileName ) , sensor ) ] , axis = 0 )
#print("no patching")
2021-03-07 05:16:59 +01:00
warning = False
if warning :
warnings . warn ( " We could not find a zipped file for {} in {} (we tried to find {} ) " . format ( sensor , zipFile , sensor_csv ) )
participant_data . sort_index ( inplace = True , ascending = True )
participant_data . reset_index ( inplace = True )
participant_data . drop_duplicates ( subset = ' timestamp ' , keep = ' first ' , inplace = True )
participant_data [ " device_id " ] = device
return ( participant_data )
2022-05-25 21:39:47 +02:00
def patch_ibi_with_bvp ( ibi_data , bvp_data ) :
ibi_data_file = BytesIO ( ibi_data ) . getvalue ( ) . decode ( ' utf-8 ' )
ibi_data_file = StringIO ( ibi_data_file )
2022-06-02 13:43:49 +02:00
# Begin with the cr-features part
try :
ibi_data , ibi_start_timestamp = empatica2d_to_array ( ibi_data_file )
2022-06-03 14:34:36 +02:00
except IndexError as e :
2022-06-02 13:43:49 +02:00
# Checks whether IBI.csv is empty
df_test = pd . read_csv ( ibi_data_file , names = [ ' timings ' , ' inter_beat_interval ' ] , header = None )
if df_test . empty :
df_test [ ' timestamp ' ] = df_test [ ' timings ' ]
df_test = df_test . set_index ( ' timestamp ' )
return df_test
2022-06-03 14:34:36 +02:00
else :
raise IndexError ( " Something went wrong with indices. Error that was previously caught: \n " , repr ( e ) )
2022-05-25 21:39:47 +02:00
bvp_data_file = BytesIO ( bvp_data ) . getvalue ( ) . decode ( ' utf-8 ' )
bvp_data_file = StringIO ( bvp_data_file )
bvp_data , bvp_start_timestamp , sample_rate = empatica1d_to_array ( bvp_data_file )
hrv_time_and_freq_features , sample , bvp_rr , bvp_timings , peak_indx = \
get_HRV_features ( bvp_data , ma = False ,
detrend = False , m_deternd = False , low_pass = False , winsorize = True ,
winsorize_value = 25 , hampel_fiter = False , median_filter = False ,
mod_z_score_filter = True , sampling = 64 , feature_names = [ ' meanHr ' ] )
2022-06-03 14:34:36 +02:00
ibi_timings , ibi_rr = get_patched_ibi_with_bvp ( ibi_data [ 0 ] , ibi_data [ 1 ] , bvp_timings , bvp_rr )
2022-05-25 21:39:47 +02:00
df = \
pd . DataFrame ( np . array ( [ ibi_timings , ibi_rr ] ) . transpose ( ) , columns = [ ' timestamp ' , ' inter_beat_interval ' ] )
df . loc [ - 1 ] = [ ibi_start_timestamp , ' IBI ' ] # adding a row
df . index = df . index + 1 # shifting index
df = df . sort_index ( ) # sorting by index
# Repeated as in extract_empatica_data for IBI
df [ ' timings ' ] = df [ ' timestamp ' ]
timestampstart = float ( df [ ' timestamp ' ] [ 0 ] )
df [ ' timestamp ' ] = ( df [ ' timestamp ' ] [ 1 : len ( df ) ] ) . astype ( float ) + timestampstart
df = df . drop ( [ 0 ] )
df [ ' inter_beat_interval ' ] = df [ ' inter_beat_interval ' ] . astype ( float )
df = df . set_index ( ' timestamp ' )
# format timestamps
df . index * = 1000
df . index = df . index . astype ( int )
return ( df )
2021-03-07 05:16:59 +01:00
# print(pull_data({'FOLDER': 'data/external/empatica'}, "e01", "EMPATICA_accelerometer", {'TIMESTAMP': 'timestamp', 'DEVICE_ID': 'device_id', 'DOUBLE_VALUES_0': 'x', 'DOUBLE_VALUES_1': 'y', 'DOUBLE_VALUES_2': 'z'}))