From 4b9857562bd850a071f61659a714212e8bffd2a3 Mon Sep 17 00:00:00 2001 From: JulioV Date: Tue, 15 Dec 2020 18:19:11 -0500 Subject: [PATCH] Add support for zip input files --- Snakefile | 42 ++++++++++++++++++++---- config.yaml | 3 +- rules/common.smk | 17 ++++++++++ rules/preprocessing.smk | 27 ++++++++++++--- src/data/empatica/join_empatica_data.R | 17 ++++++++++ src/data/empatica/unzip_empatica_data.py | 21 ++++++++++++ 6 files changed, 114 insertions(+), 13 deletions(-) create mode 100644 src/data/empatica/join_empatica_data.R create mode 100644 src/data/empatica/unzip_empatica_data.py diff --git a/Snakefile b/Snakefile index bf44d71d..33d3ad54 100644 --- a/Snakefile +++ b/Snakefile @@ -290,7 +290,11 @@ for provider in config["FITBIT_STEPS_INTRADAY"]["PROVIDERS"].keys(): for provider in config["EMPATICA_ACCELEROMETER"]["PROVIDERS"].keys(): if config["EMPATICA_ACCELEROMETER"]["PROVIDERS"][provider]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_raw.csv", pid=config["PIDS"])) + for pid in config["PIDS"]: + suffixes = get_zip_suffixes(pid) + files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_raw_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_joined.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_with_datetime.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/interim/{pid}/empatica_accelerometer_features/empatica_accelerometer_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ACCELEROMETER"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) files_to_compute.extend(expand("data/processed/features/{pid}/empatica_accelerometer.csv", pid=config["PIDS"])) @@ -299,7 +303,11 @@ for provider in config["EMPATICA_ACCELEROMETER"]["PROVIDERS"].keys(): for provider in config["EMPATICA_HEARTRATE"]["PROVIDERS"].keys(): if config["EMPATICA_HEARTRATE"]["PROVIDERS"][provider]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_raw.csv", pid=config["PIDS"])) + for pid in config["PIDS"]: + suffixes = get_zip_suffixes(pid) + files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_raw_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_joined.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_with_datetime.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/interim/{pid}/empatica_heartrate_features/empatica_heartrate_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_HEARTRATE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) files_to_compute.extend(expand("data/processed/features/{pid}/empatica_heartrate.csv", pid=config["PIDS"])) @@ -308,7 +316,11 @@ for provider in config["EMPATICA_HEARTRATE"]["PROVIDERS"].keys(): for provider in config["EMPATICA_TEMPERATURE"]["PROVIDERS"].keys(): if config["EMPATICA_TEMPERATURE"]["PROVIDERS"][provider]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_raw.csv", pid=config["PIDS"])) + for pid in config["PIDS"]: + suffixes = get_zip_suffixes(pid) + files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_raw_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_joined.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_with_datetime.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/interim/{pid}/empatica_temperature_features/empatica_temperature_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_TEMPERATURE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) files_to_compute.extend(expand("data/processed/features/{pid}/empatica_temperature.csv", pid=config["PIDS"])) @@ -317,7 +329,11 @@ for provider in config["EMPATICA_TEMPERATURE"]["PROVIDERS"].keys(): for provider in config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"].keys(): if config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"][provider]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_raw.csv", pid=config["PIDS"])) + for pid in config["PIDS"]: + suffixes = get_zip_suffixes(pid) + files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_raw_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_joined.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_with_datetime.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/interim/{pid}/empatica_electrodermal_activity_features/empatica_electrodermal_activity_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) files_to_compute.extend(expand("data/processed/features/{pid}/empatica_electrodermal_activity.csv", pid=config["PIDS"])) @@ -326,7 +342,11 @@ for provider in config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"].keys(): for provider in config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"].keys(): if config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"][provider]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_raw.csv", pid=config["PIDS"])) + for pid in config["PIDS"]: + suffixes = get_zip_suffixes(pid) + files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_raw_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_joined.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_with_datetime.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/interim/{pid}/empatica_blood_volume_pulse_features/empatica_blood_volume_pulse_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) files_to_compute.extend(expand("data/processed/features/{pid}/empatica_blood_volume_pulse.csv", pid=config["PIDS"])) @@ -335,7 +355,11 @@ for provider in config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"].keys(): for provider in config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"].keys(): if config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][provider]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_raw.csv", pid=config["PIDS"])) + for pid in config["PIDS"]: + suffixes = get_zip_suffixes(pid) + files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_raw_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_joined.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_with_datetime.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/interim/{pid}/empatica_inter_beat_interval_features/empatica_inter_beat_interval_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) files_to_compute.extend(expand("data/processed/features/{pid}/empatica_inter_beat_interval.csv", pid=config["PIDS"])) @@ -344,7 +368,11 @@ for provider in config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"].keys(): for provider in config["EMPATICA_TAGS"]["PROVIDERS"].keys(): if config["EMPATICA_TAGS"]["PROVIDERS"][provider]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_raw.csv", pid=config["PIDS"])) + for pid in config["PIDS"]: + suffixes = get_zip_suffixes(pid) + files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_unzipped_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_raw_{suffix}.csv", pid=pid, suffix=suffixes)) + files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_joined.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_with_datetime.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/interim/{pid}/empatica_tags_features/empatica_tags_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_TAGS"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) files_to_compute.extend(expand("data/processed/features/{pid}/empatica_tags.csv", pid=config["PIDS"])) diff --git a/config.yaml b/config.yaml index 55317e92..736e7d9a 100644 --- a/config.yaml +++ b/config.yaml @@ -414,7 +414,8 @@ FITBIT_STEPS_INTRADAY: EMPATICA_DATA_CONFIGURATION: SOURCE: - TYPE: FILE + TYPE: ZIP_FILE + FOLDER: data/external/empatica TIMEZONE: TYPE: SINGLE # Fitbit devices don't support time zones so we read this data in the timezone indicated by VALUE VALUE: *timezone diff --git a/rules/common.smk b/rules/common.smk index 53c05122..e6bfa030 100644 --- a/rules/common.smk +++ b/rules/common.smk @@ -30,3 +30,20 @@ def get_phone_sensor_names(): phone_sensor_names.append(config_key) return phone_sensor_names +from pathlib import Path +import re + +def get_zip_suffixes(pid): + zipfiles = list(Path("data/external/empatica").rglob(pid+"*.zip")) + suffixes = [] + pattern = re.compile("{}(.*)".format(pid)) + for zipfile in zipfiles: + name = zipfile.stem + results = pattern.search(name) + suffixes.append(results.group(1)) + return suffixes + +def get_all_raw_empatica_sensor_files(wildcards): + suffixes = get_zip_suffixes(wildcards.pid) + files = ["data/raw/{}/empatica_{}_raw_{}.csv".format(wildcards.pid, wildcards.sensor, suffix) for suffix in suffixes] + return(files) diff --git a/rules/preprocessing.smk b/rules/preprocessing.smk index 33d0fe66..d62edb37 100644 --- a/rules/preprocessing.smk +++ b/rules/preprocessing.smk @@ -243,26 +243,43 @@ rule fitbit_readable_datetime: script: "../src/data/readable_datetime.R" -def empatica_input(wildcards): - return expand("data/external/empatica/{{pid}}/{filename}.csv", filename=config["EMPATICA_" + str(wildcards.sensor).upper()]["TABLE"]) +from pathlib import Path +rule unzip_empatica_data: + input: + input_file = Path(config["EMPATICA_DATA_CONFIGURATION"]["SOURCE"]["FOLDER"]) / Path("{pid}{suffix}.zip"), + participant_file = "data/external/participant_files/{pid}.yaml" + params: + sensor = "{sensor}" + output: + sensor_output = "data/raw/{pid}/empatica_{sensor}_unzipped_{suffix}.csv" + script: + "../src/data/empatica/unzip_empatica_data.py" rule extract_empatica_data: input: - input_file = empatica_input, + input_file = "data/raw/{pid}/empatica_{sensor}_unzipped_{suffix}.csv", participant_file = "data/external/participant_files/{pid}.yaml" params: data_configuration = config["EMPATICA_DATA_CONFIGURATION"], sensor = "{sensor}", table = lambda wildcards: config["EMPATICA_" + str(wildcards.sensor).upper()]["TABLE"], output: - sensor_output = "data/raw/{pid}/empatica_{sensor}_raw.csv" + sensor_output = "data/raw/{pid}/empatica_{sensor}_raw_{suffix}.csv" script: "../src/data/empatica/extract_empatica_data.py" +rule join_empatica_data: + input: + input_files = get_all_raw_empatica_sensor_files, + output: + sensor_output = "data/raw/{pid}/empatica_{sensor}_joined.csv" + script: + "../src/data/empatica/join_empatica_data.R" + rule empatica_readable_datetime: input: - sensor_input = "data/raw/{pid}/empatica_{sensor}_raw.csv", + sensor_input = "data/raw/{pid}/empatica_{sensor}_joined.csv", time_segments = "data/interim/time_segments/{pid}_time_segments.csv" params: timezones = config["PHONE_DATA_CONFIGURATION"]["TIMEZONE"]["TYPE"], diff --git a/src/data/empatica/join_empatica_data.R b/src/data/empatica/join_empatica_data.R new file mode 100644 index 00000000..480ae58d --- /dev/null +++ b/src/data/empatica/join_empatica_data.R @@ -0,0 +1,17 @@ +source("renv/activate.R") + +library("tidyr") +library("dplyr", warn.conflicts = F) + +empatica_files <- snakemake@input[["input_files"]] +empatica_data <- setNames(data.frame(matrix(ncol = 1, nrow = 0)), c("timestamp")) + + +for(file in empatica_files){ + data <- read.csv(file) + if(! ("timestamp" %in% colnames(data))) + stop(paste("This file does not have a timestamp column, something might have gone wrong while unzipping it:", file)) + empatica_data <- merge(empatica_data, data, all = TRUE) +} + +write.csv(empatica_data, snakemake@output[[1]], row.names = FALSE) \ No newline at end of file diff --git a/src/data/empatica/unzip_empatica_data.py b/src/data/empatica/unzip_empatica_data.py new file mode 100644 index 00000000..11e64bfb --- /dev/null +++ b/src/data/empatica/unzip_empatica_data.py @@ -0,0 +1,21 @@ +from zipfile import ZipFile +import warnings +sensor_short_name = {"accelerometer":"acc", + "temperature":"temp", + "tags":"tags", + "heartrate":"hr", + "inter_beat_interval":"ibi", + "blood_volume_pulse":"bvp", + "electrodermal_activity":"eda"} + +sensor_csv = sensor_short_name[snakemake.params["sensor"]] + '.csv' +warning = True +with ZipFile(snakemake.input[0], 'r') as zipFile: + listOfFileNames = zipFile.namelist() + for fileName in listOfFileNames: + if fileName.endswith(sensor_csv): + with open(snakemake.output[0], 'wb') as outputFile: + outputFile.write(zipFile.read(fileName)) + warning = False +if(warning): + warnings.warn("We could not find a zipped file for {} in {} (we tried to find {})".format(snakemake.params["sensor"], snakemake.input[0], sensor_csv))