From 5f5f19866f8cebc4efaf7247866a3ebfa1b817ec Mon Sep 17 00:00:00 2001 From: Juseong Kim Date: Wed, 30 Dec 2020 23:17:30 +0900 Subject: [PATCH] implement extract_empatica_data script add support for all data types Fix name comparison of zipped files --- Snakefile | 72 +++++++++--------- config.yaml | 4 +- src/data/empatica/extract_empatica_data.py | 86 +++++++++++++++++++--- src/data/empatica/unzip_empatica_data.py | 14 ++-- 4 files changed, 119 insertions(+), 57 deletions(-) diff --git a/Snakefile b/Snakefile index 33d3ad54..475f21c4 100644 --- a/Snakefile +++ b/Snakefile @@ -294,12 +294,12 @@ for provider in config["EMPATICA_ACCELEROMETER"]["PROVIDERS"].keys(): suffixes = get_zip_suffixes(pid) files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_raw_{suffix}.csv", pid=pid, suffix=suffixes)) - files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_joined.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_with_datetime.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/interim/{pid}/empatica_accelerometer_features/empatica_accelerometer_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ACCELEROMETER"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) - files_to_compute.extend(expand("data/processed/features/{pid}/empatica_accelerometer.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) - files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") + # files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_joined.csv", pid=config["PIDS"])) + # files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_with_datetime.csv", pid=config["PIDS"])) + # files_to_compute.extend(expand("data/interim/{pid}/empatica_accelerometer_features/empatica_accelerometer_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ACCELEROMETER"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) + # files_to_compute.extend(expand("data/processed/features/{pid}/empatica_accelerometer.csv", pid=config["PIDS"])) + # files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) + # files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") for provider in config["EMPATICA_HEARTRATE"]["PROVIDERS"].keys(): if config["EMPATICA_HEARTRATE"]["PROVIDERS"][provider]["COMPUTE"]: @@ -307,12 +307,12 @@ for provider in config["EMPATICA_HEARTRATE"]["PROVIDERS"].keys(): suffixes = get_zip_suffixes(pid) files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_raw_{suffix}.csv", pid=pid, suffix=suffixes)) - files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_joined.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_with_datetime.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/interim/{pid}/empatica_heartrate_features/empatica_heartrate_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_HEARTRATE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) - files_to_compute.extend(expand("data/processed/features/{pid}/empatica_heartrate.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) - files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") +# files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_joined.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_with_datetime.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/interim/{pid}/empatica_heartrate_features/empatica_heartrate_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_HEARTRATE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) +# files_to_compute.extend(expand("data/processed/features/{pid}/empatica_heartrate.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) +# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") for provider in config["EMPATICA_TEMPERATURE"]["PROVIDERS"].keys(): if config["EMPATICA_TEMPERATURE"]["PROVIDERS"][provider]["COMPUTE"]: @@ -320,12 +320,12 @@ for provider in config["EMPATICA_TEMPERATURE"]["PROVIDERS"].keys(): suffixes = get_zip_suffixes(pid) files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_raw_{suffix}.csv", pid=pid, suffix=suffixes)) - files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_joined.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_with_datetime.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/interim/{pid}/empatica_temperature_features/empatica_temperature_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_TEMPERATURE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) - files_to_compute.extend(expand("data/processed/features/{pid}/empatica_temperature.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) - files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") +# files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_joined.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_with_datetime.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/interim/{pid}/empatica_temperature_features/empatica_temperature_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_TEMPERATURE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) +# files_to_compute.extend(expand("data/processed/features/{pid}/empatica_temperature.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) +# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") for provider in config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"].keys(): if config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"][provider]["COMPUTE"]: @@ -333,12 +333,12 @@ for provider in config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"].keys(): suffixes = get_zip_suffixes(pid) files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_raw_{suffix}.csv", pid=pid, suffix=suffixes)) - files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_joined.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_with_datetime.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/interim/{pid}/empatica_electrodermal_activity_features/empatica_electrodermal_activity_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) - files_to_compute.extend(expand("data/processed/features/{pid}/empatica_electrodermal_activity.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) - files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") +# files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_joined.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_with_datetime.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/interim/{pid}/empatica_electrodermal_activity_features/empatica_electrodermal_activity_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) +# files_to_compute.extend(expand("data/processed/features/{pid}/empatica_electrodermal_activity.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) +# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") for provider in config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"].keys(): if config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"][provider]["COMPUTE"]: @@ -346,12 +346,12 @@ for provider in config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"].keys(): suffixes = get_zip_suffixes(pid) files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_raw_{suffix}.csv", pid=pid, suffix=suffixes)) - files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_joined.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_with_datetime.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/interim/{pid}/empatica_blood_volume_pulse_features/empatica_blood_volume_pulse_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) - files_to_compute.extend(expand("data/processed/features/{pid}/empatica_blood_volume_pulse.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) - files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") +# files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_joined.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_with_datetime.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/interim/{pid}/empatica_blood_volume_pulse_features/empatica_blood_volume_pulse_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) +# files_to_compute.extend(expand("data/processed/features/{pid}/empatica_blood_volume_pulse.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) +# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") for provider in config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"].keys(): if config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][provider]["COMPUTE"]: @@ -359,12 +359,12 @@ for provider in config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"].keys(): suffixes = get_zip_suffixes(pid) files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_raw_{suffix}.csv", pid=pid, suffix=suffixes)) - files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_joined.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_with_datetime.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/interim/{pid}/empatica_inter_beat_interval_features/empatica_inter_beat_interval_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) - files_to_compute.extend(expand("data/processed/features/{pid}/empatica_inter_beat_interval.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) - files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") +# files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_joined.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_with_datetime.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/interim/{pid}/empatica_inter_beat_interval_features/empatica_inter_beat_interval_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) +# files_to_compute.extend(expand("data/processed/features/{pid}/empatica_inter_beat_interval.csv", pid=config["PIDS"])) +# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) +# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") for provider in config["EMPATICA_TAGS"]["PROVIDERS"].keys(): if config["EMPATICA_TAGS"]["PROVIDERS"][provider]["COMPUTE"]: diff --git a/config.yaml b/config.yaml index 736e7d9a..4fd977c2 100644 --- a/config.yaml +++ b/config.yaml @@ -451,7 +451,7 @@ EMPATICA_TEMPERATURE: SRC_LANGUAGE: "python" EMPATICA_ELECTRODERMAL_ACTIVITY: - TABLE: temp + TABLE: eda PROVIDERS: DBDP: COMPUTE: True @@ -481,7 +481,7 @@ EMPATICA_TAGS: TABLE: tags PROVIDERS: DBDP: - COMPUTE: True + COMPUTE: False FEATURES: [] SRC_FOLDER: "dbdp" # inside src/features/empatica_heartrate SRC_LANGUAGE: "python" diff --git a/src/data/empatica/extract_empatica_data.py b/src/data/empatica/extract_empatica_data.py index 31baf4ed..1798c1da 100644 --- a/src/data/empatica/extract_empatica_data.py +++ b/src/data/empatica/extract_empatica_data.py @@ -2,20 +2,83 @@ import pandas as pd from pandas.core import indexing import yaml +import csv +from collections import OrderedDict + + +def processAcceleration(x, y, z): + x = float(x) + y = float(y) + z = float(z) + return {'x': x, 'y': y, 'z': z} + + +def readFile(file, dtype): + dict = OrderedDict() + + with open(file, 'rt') as csvfile: + if dtype in ('electrodermal_activity', 'temperature', 'heartrate', 'blood_volume_pulse'): + reader = csv.reader(csvfile, delimiter='\n') + elif dtype == 'accelerometer': + reader = csv.reader(csvfile, delimiter=',') + i = 0 + for row in reader: + if i == 0: + timestamp = float(row[0]) + elif i == 1: + hertz = float(row[0]) + else: + if i == 2: + pass + else: + timestamp = timestamp + 1.0 / hertz + if dtype in ('electrodermal_activity', 'temperature', 'heartrate', 'blood_volume_pulse'): + dict[timestamp] = row[0] + elif dtype == 'accelerometer': + dict[timestamp] = processAcceleration(row[0], row[1], row[2]) + i += 1 + return dict + def extract_empatica_data(sensor_data_file, output_file, start_date, end_date, timezone, sensor): - print(sensor_data_file) - print(output_file) - print(start_date) - print(end_date) - print(timezone) - print(sensor) - data = pd.read_csv(sensor_data_file) - print(data) + # read sensor data + if sensor in ('electrodermal_activity', 'temperature', 'heartrate', 'blood_volume_pulse'): + ddict = readFile(sensor_data_file, sensor) + df = pd.DataFrame.from_dict(ddict, orient='index', columns=[sensor]) + df[sensor] = df[sensor].astype(float) + df.index.name = 'timestamp' - # extract - print(output_file) - data.to_csv(output_file, index = False) + elif sensor == 'accelerometer': + ddict = readFile(sensor_data_file, sensor) + df = pd.DataFrame.from_dict(ddict, orient='index', columns=['x', 'y', 'z']) + df['x'] = df['x'].astype(float) + df['y'] = df['y'].astype(float) + df['z'] = df['z'].astype(float) + df.index.name = 'timestamp' + + elif sensor == 'inter_beat_interval': + df = pd.read_csv(sensor_data_file, names=['timestamp', sensor], header=None) + timestampstart = float(df['timestamp'][0]) + df['timestamp'] = (df['timestamp'][1:len(df)]).astype(float) + timestampstart + df = df.drop([0]) + df[sensor] = df[sensor].astype(float) + df = df.set_index('timestamp') + + else: + raise ValueError( + "sensor can only be one of ['electrodermal_activity','temperature','heartrate','blood_volume_pulse','accelerometer','inter_beat_interval'].") + + # filter based on given start and end date + start_date_utc = pd.Timestamp(start_date, tz=timezone).timestamp() + end_date_utc = pd.Timestamp(end_date, tz=timezone).timestamp() + df = df[start_date_utc:end_date_utc] + + # format timestamps + df.index *= 1000 + df.index = df.index.astype(int) + + # output csv file + df.to_csv(output_file) sensor_data_file = snakemake.input[0] @@ -29,4 +92,3 @@ timezone = snakemake.params["data_configuration"]["TIMEZONE"]["VALUE"] sensor = snakemake.params["sensor"] extract_empatica_data(sensor_data_file, output_file, start_date, end_date, timezone, sensor) - diff --git a/src/data/empatica/unzip_empatica_data.py b/src/data/empatica/unzip_empatica_data.py index 11e64bfb..b4379caf 100644 --- a/src/data/empatica/unzip_empatica_data.py +++ b/src/data/empatica/unzip_empatica_data.py @@ -1,19 +1,19 @@ from zipfile import ZipFile import warnings -sensor_short_name = {"accelerometer":"acc", - "temperature":"temp", +sensor_short_name = {"accelerometer":"ACC", + "temperature":"TEMP", "tags":"tags", - "heartrate":"hr", - "inter_beat_interval":"ibi", - "blood_volume_pulse":"bvp", - "electrodermal_activity":"eda"} + "heartrate":"HR", + "inter_beat_interval":"IBI", + "blood_volume_pulse":"BVP", + "electrodermal_activity":"EDA"} sensor_csv = sensor_short_name[snakemake.params["sensor"]] + '.csv' warning = True with ZipFile(snakemake.input[0], 'r') as zipFile: listOfFileNames = zipFile.namelist() for fileName in listOfFileNames: - if fileName.endswith(sensor_csv): + if fileName == sensor_csv: with open(snakemake.output[0], 'wb') as outputFile: outputFile.write(zipFile.read(fileName)) warning = False