rapids/src/data/streams/empatica_zip/container.py

182 lines
7.8 KiB
Python

from zipfile import ZipFile
import warnings
from pathlib import Path
import pandas as pd
import numpy as np
from pandas.core import indexing
import yaml
import csv
from collections import OrderedDict
from io import BytesIO, StringIO
import sys, os
from cr_features.hrv import get_HRV_features, get_patched_ibi_with_bvp
from cr_features.helper_functions import empatica1d_to_array, empatica2d_to_array
def processAcceleration(x, y, z):
x = float(x)
y = float(y)
z = float(z)
return {'x': x, 'y': y, 'z': z}
def readFile(file, dtype):
dict = OrderedDict()
# file is an in-memory buffer
with file as csvfile:
if dtype in ('EMPATICA_ELECTRODERMAL_ACTIVITY', 'EMPATICA_TEMPERATURE', 'EMPATICA_HEARTRATE', 'EMPATICA_BLOOD_VOLUME_PULSE'):
reader = csv.reader(csvfile, delimiter='\n')
elif dtype == 'EMPATICA_ACCELEROMETER':
reader = csv.reader(csvfile, delimiter=',')
i = 0
for row in reader:
if i == 0:
timestamp = float(row[0])
elif i == 1:
hertz = float(row[0])
else:
if i == 2:
pass
else:
timestamp = timestamp + 1.0 / hertz
if dtype in ('EMPATICA_ELECTRODERMAL_ACTIVITY', 'EMPATICA_TEMPERATURE', 'EMPATICA_HEARTRATE', 'EMPATICA_BLOOD_VOLUME_PULSE'):
dict[timestamp] = row[0]
elif dtype == 'EMPATICA_ACCELEROMETER':
dict[timestamp] = processAcceleration(row[0], row[1], row[2])
i += 1
return dict
def extract_empatica_data(data, sensor):
sensor_data_file = BytesIO(data).getvalue().decode('utf-8')
sensor_data_file = StringIO(sensor_data_file)
column = sensor.replace("EMPATICA_", "").lower()
# read sensor data
if sensor in ('EMPATICA_ELECTRODERMAL_ACTIVITY', 'EMPATICA_TEMPERATURE', 'EMPATICA_HEARTRATE', 'EMPATICA_BLOOD_VOLUME_PULSE'):
ddict = readFile(sensor_data_file, sensor)
df = pd.DataFrame.from_dict(ddict, orient='index', columns=[column])
df[column] = df[column].astype(float)
df.index.name = 'timestamp'
if df.empty:
return df
elif sensor == 'EMPATICA_ACCELEROMETER':
ddict = readFile(sensor_data_file, sensor)
df = pd.DataFrame.from_dict(ddict, orient='index', columns=['x', 'y', 'z'])
df['x'] = df['x'].astype(float)
df['y'] = df['y'].astype(float)
df['z'] = df['z'].astype(float)
df.index.name = 'timestamp'
if df.empty:
return df
elif sensor == 'EMPATICA_INTER_BEAT_INTERVAL':
df = pd.read_csv(sensor_data_file, names=['timings', column], header=None)
df['timestamp'] = df['timings']
if df.empty:
df = df.set_index('timestamp')
return df
timestampstart = float(df['timestamp'][0])
df['timestamp'] = (df['timestamp'][1:len(df)]).astype(float) + timestampstart
df = df.drop([0])
df[column] = df[column].astype(float)
df = df.set_index('timestamp')
else:
raise ValueError(
"sensor has an invalid name: {}".format(sensor))
# format timestamps
df.index *= 1000
df.index = df.index.astype(int)
return(df)
def pull_data(data_configuration, device, sensor, container, columns_to_download):
sensor_csv = container + '.csv'
warning = True
participant_data = pd.DataFrame(columns=columns_to_download.values())
participant_data.set_index('timestamp', inplace=True)
with open('config.yaml', 'r') as stream:
config = yaml.load(stream, Loader=yaml.FullLoader)
cr_ibi_provider = config['EMPATICA_INTER_BEAT_INTERVAL']['PROVIDERS']['CR']
available_zipfiles = list((Path(data_configuration["FOLDER"]) / Path(device)).rglob("*.zip"))
if len(available_zipfiles) == 0:
warnings.warn("There were no zip files in: {}. If you were expecting data for this participant the [EMPATICA][DEVICE_IDS] key in their participant file is missing the pid".format((Path(data_configuration["FOLDER"]) / Path(device))))
for zipfile in available_zipfiles:
print("Extracting {} data from {} for {}".format(sensor, zipfile, device))
with ZipFile(zipfile, 'r') as zipFile:
listOfFileNames = zipFile.namelist()
for fileName in listOfFileNames:
if fileName == sensor_csv:
if sensor == "EMPATICA_INTER_BEAT_INTERVAL" and cr_ibi_provider.get('PATCH_WITH_BVP', False):
participant_data = \
pd.concat([participant_data, patch_ibi_with_bvp(zipFile.read('IBI.csv'), zipFile.read('BVP.csv'))], axis=0)
#print("patch with ibi")
else:
participant_data = pd.concat([participant_data, extract_empatica_data(zipFile.read(fileName), sensor)], axis=0)
#print("no patching")
warning = False
if warning:
warnings.warn("We could not find a zipped file for {} in {} (we tried to find {})".format(sensor, zipFile, sensor_csv))
participant_data.sort_index(inplace=True, ascending=True)
participant_data.reset_index(inplace=True)
participant_data.drop_duplicates(subset='timestamp', keep='first',inplace=True)
participant_data["device_id"] = device
return(participant_data)
def patch_ibi_with_bvp(ibi_data, bvp_data):
ibi_data_file = BytesIO(ibi_data).getvalue().decode('utf-8')
ibi_data_file = StringIO(ibi_data_file)
# Begin with the cr-features part
try:
ibi_data, ibi_start_timestamp = empatica2d_to_array(ibi_data_file)
except (IndexError, KeyError) as e:
# Checks whether IBI.csv is empty
# It may raise a KeyError if df is empty here: startTimeStamp = df.time[0]
df_test = pd.read_csv(ibi_data_file, names=['timings', 'inter_beat_interval'], header=None)
if df_test.empty:
df_test['timestamp'] = df_test['timings']
df_test = df_test.set_index('timestamp')
return df_test
else:
raise IndexError("Something went wrong with indices. Error that was previously caught:\n", repr(e))
bvp_data_file = BytesIO(bvp_data).getvalue().decode('utf-8')
bvp_data_file = StringIO(bvp_data_file)
bvp_data, bvp_start_timestamp, sample_rate = empatica1d_to_array(bvp_data_file)
hrv_time_and_freq_features, sample, bvp_rr, bvp_timings, peak_indx = \
get_HRV_features(bvp_data, ma=False,
detrend=False, m_deternd=False, low_pass=False, winsorize=True,
winsorize_value=25, hampel_fiter=False, median_filter=False,
mod_z_score_filter=True, sampling=64, feature_names=['meanHr'])
ibi_timings, ibi_rr = get_patched_ibi_with_bvp(ibi_data[0], ibi_data[1], bvp_timings, bvp_rr)
df = \
pd.DataFrame(np.array([ibi_timings, ibi_rr]).transpose(), columns=['timestamp', 'inter_beat_interval'])
df.loc[-1] = [ibi_start_timestamp, 'IBI'] # adding a row
df.index = df.index + 1 # shifting index
df = df.sort_index() # sorting by index
# Repeated as in extract_empatica_data for IBI
df['timings'] = df['timestamp']
timestampstart = float(df['timestamp'][0])
df['timestamp'] = (df['timestamp'][1:len(df)]).astype(float) + timestampstart
df = df.drop([0])
df['inter_beat_interval'] = df['inter_beat_interval'].astype(float)
df = df.set_index('timestamp')
# format timestamps
df.index *= 1000
df.index = df.index.astype(int)
return(df)
# print(pull_data({'FOLDER': 'data/external/empatica'}, "e01", "EMPATICA_accelerometer", {'TIMESTAMP': 'timestamp', 'DEVICE_ID': 'device_id', 'DOUBLE_VALUES_0': 'x', 'DOUBLE_VALUES_1': 'y', 'DOUBLE_VALUES_2': 'z'}))