from zipfile import ZipFile import warnings from pathlib import Path import pandas as pd import numpy as np from pandas.core import indexing import yaml import csv from collections import OrderedDict from io import BytesIO, StringIO import sys, os from cr_features.hrv import get_HRV_features, get_patched_ibi_with_bvp from cr_features.helper_functions import empatica1d_to_array, empatica2d_to_array def processAcceleration(x, y, z): x = float(x) y = float(y) z = float(z) return {'x': x, 'y': y, 'z': z} def readFile(file, dtype): dict = OrderedDict() # file is an in-memory buffer with file as csvfile: if dtype in ('EMPATICA_ELECTRODERMAL_ACTIVITY', 'EMPATICA_TEMPERATURE', 'EMPATICA_HEARTRATE', 'EMPATICA_BLOOD_VOLUME_PULSE'): reader = csv.reader(csvfile, delimiter='\n') elif dtype == 'EMPATICA_ACCELEROMETER': reader = csv.reader(csvfile, delimiter=',') i = 0 for row in reader: if i == 0: timestamp = float(row[0]) elif i == 1: hertz = float(row[0]) else: if i == 2: pass else: timestamp = timestamp + 1.0 / hertz if dtype in ('EMPATICA_ELECTRODERMAL_ACTIVITY', 'EMPATICA_TEMPERATURE', 'EMPATICA_HEARTRATE', 'EMPATICA_BLOOD_VOLUME_PULSE'): dict[timestamp] = row[0] elif dtype == 'EMPATICA_ACCELEROMETER': dict[timestamp] = processAcceleration(row[0], row[1], row[2]) i += 1 return dict def extract_empatica_data(data, sensor): sensor_data_file = BytesIO(data).getvalue().decode('utf-8') sensor_data_file = StringIO(sensor_data_file) column = sensor.replace("EMPATICA_", "").lower() # read sensor data if sensor in ('EMPATICA_ELECTRODERMAL_ACTIVITY', 'EMPATICA_TEMPERATURE', 'EMPATICA_HEARTRATE', 'EMPATICA_BLOOD_VOLUME_PULSE'): ddict = readFile(sensor_data_file, sensor) df = pd.DataFrame.from_dict(ddict, orient='index', columns=[column]) df[column] = df[column].astype(float) df.index.name = 'timestamp' elif sensor == 'EMPATICA_ACCELEROMETER': ddict = readFile(sensor_data_file, sensor) df = pd.DataFrame.from_dict(ddict, orient='index', columns=['x', 'y', 'z']) df['x'] = df['x'].astype(float) df['y'] = df['y'].astype(float) df['z'] = df['z'].astype(float) df.index.name = 'timestamp' elif sensor == 'EMPATICA_INTER_BEAT_INTERVAL': df = pd.read_csv(sensor_data_file, names=['timings', column], header=None) df['timestamp'] = df['timings'] if df.empty: df = df.set_index('timestamp') return df timestampstart = float(df['timestamp'][0]) df['timestamp'] = (df['timestamp'][1:len(df)]).astype(float) + timestampstart df = df.drop([0]) df[column] = df[column].astype(float) df = df.set_index('timestamp') else: raise ValueError( "sensor has an invalid name: {}".format(sensor)) # format timestamps df.index *= 1000 df.index = df.index.astype(int) return(df) def pull_data(data_configuration, device, sensor, container, columns_to_download): sensor_csv = container + '.csv' warning = True participant_data = pd.DataFrame(columns=columns_to_download.values()) participant_data.set_index('timestamp', inplace=True) with open('config.yaml', 'r') as stream: config = yaml.load(stream, Loader=yaml.FullLoader) cr_ibi_provider = config['EMPATICA_INTER_BEAT_INTERVAL']['PROVIDERS']['CR'] available_zipfiles = list((Path(data_configuration["FOLDER"]) / Path(device)).rglob("*.zip")) if len(available_zipfiles) == 0: warnings.warn("There were no zip files in: {}. If you were expecting data for this participant the [EMPATICA][DEVICE_IDS] key in their participant file is missing the pid".format((Path(data_configuration["FOLDER"]) / Path(device)))) for zipfile in available_zipfiles: print("Extracting {} data from {} for {}".format(sensor, zipfile, device)) with ZipFile(zipfile, 'r') as zipFile: listOfFileNames = zipFile.namelist() for fileName in listOfFileNames: if fileName == sensor_csv: if sensor == "EMPATICA_INTER_BEAT_INTERVAL" and cr_ibi_provider.get('PATCH_WITH_BVP', False): participant_data = \ pd.concat([participant_data, patch_ibi_with_bvp(zipFile.read('IBI.csv'), zipFile.read('BVP.csv'))], axis=0) #print("patch with ibi") else: participant_data = pd.concat([participant_data, extract_empatica_data(zipFile.read(fileName), sensor)], axis=0) #print("no patching") warning = False if warning: warnings.warn("We could not find a zipped file for {} in {} (we tried to find {})".format(sensor, zipFile, sensor_csv)) participant_data.sort_index(inplace=True, ascending=True) participant_data.reset_index(inplace=True) participant_data.drop_duplicates(subset='timestamp', keep='first',inplace=True) participant_data["device_id"] = device return(participant_data) def patch_ibi_with_bvp(ibi_data, bvp_data): ibi_data_file = BytesIO(ibi_data).getvalue().decode('utf-8') ibi_data_file = StringIO(ibi_data_file) # Begin with the cr-features part try: ibi_data, ibi_start_timestamp = empatica2d_to_array(ibi_data_file) except IndexError as e: # Checks whether IBI.csv is empty df_test = pd.read_csv(ibi_data_file, names=['timings', 'inter_beat_interval'], header=None) if df_test.empty: df_test['timestamp'] = df_test['timings'] df_test = df_test.set_index('timestamp') return df_test else: raise IndexError("Something went wrong with indices. Error that was previously caught:\n", repr(e)) bvp_data_file = BytesIO(bvp_data).getvalue().decode('utf-8') bvp_data_file = StringIO(bvp_data_file) bvp_data, bvp_start_timestamp, sample_rate = empatica1d_to_array(bvp_data_file) hrv_time_and_freq_features, sample, bvp_rr, bvp_timings, peak_indx = \ get_HRV_features(bvp_data, ma=False, detrend=False, m_deternd=False, low_pass=False, winsorize=True, winsorize_value=25, hampel_fiter=False, median_filter=False, mod_z_score_filter=True, sampling=64, feature_names=['meanHr']) ibi_timings, ibi_rr = get_patched_ibi_with_bvp(ibi_data[0], ibi_data[1], bvp_timings, bvp_rr) df = \ pd.DataFrame(np.array([ibi_timings, ibi_rr]).transpose(), columns=['timestamp', 'inter_beat_interval']) df.loc[-1] = [ibi_start_timestamp, 'IBI'] # adding a row df.index = df.index + 1 # shifting index df = df.sort_index() # sorting by index # Repeated as in extract_empatica_data for IBI df['timings'] = df['timestamp'] timestampstart = float(df['timestamp'][0]) df['timestamp'] = (df['timestamp'][1:len(df)]).astype(float) + timestampstart df = df.drop([0]) df['inter_beat_interval'] = df['inter_beat_interval'].astype(float) df = df.set_index('timestamp') # format timestamps df.index *= 1000 df.index = df.index.astype(int) return(df) # print(pull_data({'FOLDER': 'data/external/empatica'}, "e01", "EMPATICA_accelerometer", {'TIMESTAMP': 'timestamp', 'DEVICE_ID': 'device_id', 'DOUBLE_VALUES_0': 'x', 'DOUBLE_VALUES_1': 'y', 'DOUBLE_VALUES_2': 'z'}))