implement extract_empatica_data script

add support for all data types

Fix name comparison of zipped files
feature/plugin_sentimental
Juseong Kim 2020-12-30 23:17:30 +09:00 committed by JulioV
parent 4b9857562b
commit 5f5f19866f
4 changed files with 119 additions and 57 deletions

View File

@ -294,12 +294,12 @@ for provider in config["EMPATICA_ACCELEROMETER"]["PROVIDERS"].keys():
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_accelerometer_features/empatica_accelerometer_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ACCELEROMETER"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_accelerometer.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
# files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_joined.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_with_datetime.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/interim/{pid}/empatica_accelerometer_features/empatica_accelerometer_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ACCELEROMETER"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
# files_to_compute.extend(expand("data/processed/features/{pid}/empatica_accelerometer.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
for provider in config["EMPATICA_HEARTRATE"]["PROVIDERS"].keys():
if config["EMPATICA_HEARTRATE"]["PROVIDERS"][provider]["COMPUTE"]:
@ -307,12 +307,12 @@ for provider in config["EMPATICA_HEARTRATE"]["PROVIDERS"].keys():
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_heartrate_features/empatica_heartrate_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_HEARTRATE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_heartrate.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
# files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_joined.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_with_datetime.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/interim/{pid}/empatica_heartrate_features/empatica_heartrate_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_HEARTRATE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
# files_to_compute.extend(expand("data/processed/features/{pid}/empatica_heartrate.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
for provider in config["EMPATICA_TEMPERATURE"]["PROVIDERS"].keys():
if config["EMPATICA_TEMPERATURE"]["PROVIDERS"][provider]["COMPUTE"]:
@ -320,12 +320,12 @@ for provider in config["EMPATICA_TEMPERATURE"]["PROVIDERS"].keys():
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_temperature_features/empatica_temperature_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_TEMPERATURE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_temperature.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
# files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_joined.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_with_datetime.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/interim/{pid}/empatica_temperature_features/empatica_temperature_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_TEMPERATURE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
# files_to_compute.extend(expand("data/processed/features/{pid}/empatica_temperature.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
for provider in config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"].keys():
if config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"][provider]["COMPUTE"]:
@ -333,12 +333,12 @@ for provider in config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"].keys():
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_electrodermal_activity_features/empatica_electrodermal_activity_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_electrodermal_activity.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
# files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_joined.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_with_datetime.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/interim/{pid}/empatica_electrodermal_activity_features/empatica_electrodermal_activity_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
# files_to_compute.extend(expand("data/processed/features/{pid}/empatica_electrodermal_activity.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
for provider in config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"].keys():
if config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"][provider]["COMPUTE"]:
@ -346,12 +346,12 @@ for provider in config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"].keys():
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_blood_volume_pulse_features/empatica_blood_volume_pulse_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_blood_volume_pulse.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
# files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_joined.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_with_datetime.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/interim/{pid}/empatica_blood_volume_pulse_features/empatica_blood_volume_pulse_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
# files_to_compute.extend(expand("data/processed/features/{pid}/empatica_blood_volume_pulse.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
for provider in config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"].keys():
if config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][provider]["COMPUTE"]:
@ -359,12 +359,12 @@ for provider in config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"].keys():
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_inter_beat_interval_features/empatica_inter_beat_interval_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_inter_beat_interval.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
# files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_joined.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_with_datetime.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/interim/{pid}/empatica_inter_beat_interval_features/empatica_inter_beat_interval_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
# files_to_compute.extend(expand("data/processed/features/{pid}/empatica_inter_beat_interval.csv", pid=config["PIDS"]))
# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
for provider in config["EMPATICA_TAGS"]["PROVIDERS"].keys():
if config["EMPATICA_TAGS"]["PROVIDERS"][provider]["COMPUTE"]:

View File

@ -451,7 +451,7 @@ EMPATICA_TEMPERATURE:
SRC_LANGUAGE: "python"
EMPATICA_ELECTRODERMAL_ACTIVITY:
TABLE: temp
TABLE: eda
PROVIDERS:
DBDP:
COMPUTE: True
@ -481,7 +481,7 @@ EMPATICA_TAGS:
TABLE: tags
PROVIDERS:
DBDP:
COMPUTE: True
COMPUTE: False
FEATURES: []
SRC_FOLDER: "dbdp" # inside src/features/empatica_heartrate
SRC_LANGUAGE: "python"

View File

@ -2,20 +2,83 @@ import pandas as pd
from pandas.core import indexing
import yaml
import csv
from collections import OrderedDict
def processAcceleration(x, y, z):
x = float(x)
y = float(y)
z = float(z)
return {'x': x, 'y': y, 'z': z}
def readFile(file, dtype):
dict = OrderedDict()
with open(file, 'rt') as csvfile:
if dtype in ('electrodermal_activity', 'temperature', 'heartrate', 'blood_volume_pulse'):
reader = csv.reader(csvfile, delimiter='\n')
elif dtype == 'accelerometer':
reader = csv.reader(csvfile, delimiter=',')
i = 0
for row in reader:
if i == 0:
timestamp = float(row[0])
elif i == 1:
hertz = float(row[0])
else:
if i == 2:
pass
else:
timestamp = timestamp + 1.0 / hertz
if dtype in ('electrodermal_activity', 'temperature', 'heartrate', 'blood_volume_pulse'):
dict[timestamp] = row[0]
elif dtype == 'accelerometer':
dict[timestamp] = processAcceleration(row[0], row[1], row[2])
i += 1
return dict
def extract_empatica_data(sensor_data_file, output_file, start_date, end_date, timezone, sensor):
print(sensor_data_file)
print(output_file)
print(start_date)
print(end_date)
print(timezone)
print(sensor)
data = pd.read_csv(sensor_data_file)
print(data)
# read sensor data
if sensor in ('electrodermal_activity', 'temperature', 'heartrate', 'blood_volume_pulse'):
ddict = readFile(sensor_data_file, sensor)
df = pd.DataFrame.from_dict(ddict, orient='index', columns=[sensor])
df[sensor] = df[sensor].astype(float)
df.index.name = 'timestamp'
# extract
print(output_file)
data.to_csv(output_file, index = False)
elif sensor == 'accelerometer':
ddict = readFile(sensor_data_file, sensor)
df = pd.DataFrame.from_dict(ddict, orient='index', columns=['x', 'y', 'z'])
df['x'] = df['x'].astype(float)
df['y'] = df['y'].astype(float)
df['z'] = df['z'].astype(float)
df.index.name = 'timestamp'
elif sensor == 'inter_beat_interval':
df = pd.read_csv(sensor_data_file, names=['timestamp', sensor], header=None)
timestampstart = float(df['timestamp'][0])
df['timestamp'] = (df['timestamp'][1:len(df)]).astype(float) + timestampstart
df = df.drop([0])
df[sensor] = df[sensor].astype(float)
df = df.set_index('timestamp')
else:
raise ValueError(
"sensor can only be one of ['electrodermal_activity','temperature','heartrate','blood_volume_pulse','accelerometer','inter_beat_interval'].")
# filter based on given start and end date
start_date_utc = pd.Timestamp(start_date, tz=timezone).timestamp()
end_date_utc = pd.Timestamp(end_date, tz=timezone).timestamp()
df = df[start_date_utc:end_date_utc]
# format timestamps
df.index *= 1000
df.index = df.index.astype(int)
# output csv file
df.to_csv(output_file)
sensor_data_file = snakemake.input[0]
@ -29,4 +92,3 @@ timezone = snakemake.params["data_configuration"]["TIMEZONE"]["VALUE"]
sensor = snakemake.params["sensor"]
extract_empatica_data(sensor_data_file, output_file, start_date, end_date, timezone, sensor)

View File

@ -1,19 +1,19 @@
from zipfile import ZipFile
import warnings
sensor_short_name = {"accelerometer":"acc",
"temperature":"temp",
sensor_short_name = {"accelerometer":"ACC",
"temperature":"TEMP",
"tags":"tags",
"heartrate":"hr",
"inter_beat_interval":"ibi",
"blood_volume_pulse":"bvp",
"electrodermal_activity":"eda"}
"heartrate":"HR",
"inter_beat_interval":"IBI",
"blood_volume_pulse":"BVP",
"electrodermal_activity":"EDA"}
sensor_csv = sensor_short_name[snakemake.params["sensor"]] + '.csv'
warning = True
with ZipFile(snakemake.input[0], 'r') as zipFile:
listOfFileNames = zipFile.namelist()
for fileName in listOfFileNames:
if fileName.endswith(sensor_csv):
if fileName == sensor_csv:
with open(snakemake.output[0], 'wb') as outputFile:
outputFile.write(zipFile.read(fileName))
warning = False