Add support for zip input files

feature/plugin_sentimental
JulioV 2020-12-15 18:19:11 -05:00
parent 8c726f5d4f
commit 4b9857562b
6 changed files with 114 additions and 13 deletions

View File

@ -290,7 +290,11 @@ for provider in config["FITBIT_STEPS_INTRADAY"]["PROVIDERS"].keys():
for provider in config["EMPATICA_ACCELEROMETER"]["PROVIDERS"].keys():
if config["EMPATICA_ACCELEROMETER"]["PROVIDERS"][provider]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_raw.csv", pid=config["PIDS"]))
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_accelerometer_features/empatica_accelerometer_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ACCELEROMETER"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_accelerometer.csv", pid=config["PIDS"]))
@ -299,7 +303,11 @@ for provider in config["EMPATICA_ACCELEROMETER"]["PROVIDERS"].keys():
for provider in config["EMPATICA_HEARTRATE"]["PROVIDERS"].keys():
if config["EMPATICA_HEARTRATE"]["PROVIDERS"][provider]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_raw.csv", pid=config["PIDS"]))
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_heartrate_features/empatica_heartrate_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_HEARTRATE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_heartrate.csv", pid=config["PIDS"]))
@ -308,7 +316,11 @@ for provider in config["EMPATICA_HEARTRATE"]["PROVIDERS"].keys():
for provider in config["EMPATICA_TEMPERATURE"]["PROVIDERS"].keys():
if config["EMPATICA_TEMPERATURE"]["PROVIDERS"][provider]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_raw.csv", pid=config["PIDS"]))
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_temperature_features/empatica_temperature_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_TEMPERATURE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_temperature.csv", pid=config["PIDS"]))
@ -317,7 +329,11 @@ for provider in config["EMPATICA_TEMPERATURE"]["PROVIDERS"].keys():
for provider in config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"].keys():
if config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"][provider]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_raw.csv", pid=config["PIDS"]))
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_electrodermal_activity_features/empatica_electrodermal_activity_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_electrodermal_activity.csv", pid=config["PIDS"]))
@ -326,7 +342,11 @@ for provider in config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"].keys():
for provider in config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"].keys():
if config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"][provider]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_raw.csv", pid=config["PIDS"]))
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_blood_volume_pulse_features/empatica_blood_volume_pulse_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_blood_volume_pulse.csv", pid=config["PIDS"]))
@ -335,7 +355,11 @@ for provider in config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"].keys():
for provider in config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"].keys():
if config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][provider]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_raw.csv", pid=config["PIDS"]))
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_inter_beat_interval_features/empatica_inter_beat_interval_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_inter_beat_interval.csv", pid=config["PIDS"]))
@ -344,7 +368,11 @@ for provider in config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"].keys():
for provider in config["EMPATICA_TAGS"]["PROVIDERS"].keys():
if config["EMPATICA_TAGS"]["PROVIDERS"][provider]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_raw.csv", pid=config["PIDS"]))
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_tags_features/empatica_tags_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_TAGS"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_tags.csv", pid=config["PIDS"]))

View File

@ -414,7 +414,8 @@ FITBIT_STEPS_INTRADAY:
EMPATICA_DATA_CONFIGURATION:
SOURCE:
TYPE: FILE
TYPE: ZIP_FILE
FOLDER: data/external/empatica
TIMEZONE:
TYPE: SINGLE # Fitbit devices don't support time zones so we read this data in the timezone indicated by VALUE
VALUE: *timezone

View File

@ -30,3 +30,20 @@ def get_phone_sensor_names():
phone_sensor_names.append(config_key)
return phone_sensor_names
from pathlib import Path
import re
def get_zip_suffixes(pid):
zipfiles = list(Path("data/external/empatica").rglob(pid+"*.zip"))
suffixes = []
pattern = re.compile("{}(.*)".format(pid))
for zipfile in zipfiles:
name = zipfile.stem
results = pattern.search(name)
suffixes.append(results.group(1))
return suffixes
def get_all_raw_empatica_sensor_files(wildcards):
suffixes = get_zip_suffixes(wildcards.pid)
files = ["data/raw/{}/empatica_{}_raw_{}.csv".format(wildcards.pid, wildcards.sensor, suffix) for suffix in suffixes]
return(files)

View File

@ -243,26 +243,43 @@ rule fitbit_readable_datetime:
script:
"../src/data/readable_datetime.R"
def empatica_input(wildcards):
return expand("data/external/empatica/{{pid}}/{filename}.csv", filename=config["EMPATICA_" + str(wildcards.sensor).upper()]["TABLE"])
from pathlib import Path
rule unzip_empatica_data:
input:
input_file = Path(config["EMPATICA_DATA_CONFIGURATION"]["SOURCE"]["FOLDER"]) / Path("{pid}{suffix}.zip"),
participant_file = "data/external/participant_files/{pid}.yaml"
params:
sensor = "{sensor}"
output:
sensor_output = "data/raw/{pid}/empatica_{sensor}_unzipped_{suffix}.csv"
script:
"../src/data/empatica/unzip_empatica_data.py"
rule extract_empatica_data:
input:
input_file = empatica_input,
input_file = "data/raw/{pid}/empatica_{sensor}_unzipped_{suffix}.csv",
participant_file = "data/external/participant_files/{pid}.yaml"
params:
data_configuration = config["EMPATICA_DATA_CONFIGURATION"],
sensor = "{sensor}",
table = lambda wildcards: config["EMPATICA_" + str(wildcards.sensor).upper()]["TABLE"],
output:
sensor_output = "data/raw/{pid}/empatica_{sensor}_raw.csv"
sensor_output = "data/raw/{pid}/empatica_{sensor}_raw_{suffix}.csv"
script:
"../src/data/empatica/extract_empatica_data.py"
rule join_empatica_data:
input:
input_files = get_all_raw_empatica_sensor_files,
output:
sensor_output = "data/raw/{pid}/empatica_{sensor}_joined.csv"
script:
"../src/data/empatica/join_empatica_data.R"
rule empatica_readable_datetime:
input:
sensor_input = "data/raw/{pid}/empatica_{sensor}_raw.csv",
sensor_input = "data/raw/{pid}/empatica_{sensor}_joined.csv",
time_segments = "data/interim/time_segments/{pid}_time_segments.csv"
params:
timezones = config["PHONE_DATA_CONFIGURATION"]["TIMEZONE"]["TYPE"],

View File

@ -0,0 +1,17 @@
source("renv/activate.R")
library("tidyr")
library("dplyr", warn.conflicts = F)
empatica_files <- snakemake@input[["input_files"]]
empatica_data <- setNames(data.frame(matrix(ncol = 1, nrow = 0)), c("timestamp"))
for(file in empatica_files){
data <- read.csv(file)
if(! ("timestamp" %in% colnames(data)))
stop(paste("This file does not have a timestamp column, something might have gone wrong while unzipping it:", file))
empatica_data <- merge(empatica_data, data, all = TRUE)
}
write.csv(empatica_data, snakemake@output[[1]], row.names = FALSE)

View File

@ -0,0 +1,21 @@
from zipfile import ZipFile
import warnings
sensor_short_name = {"accelerometer":"acc",
"temperature":"temp",
"tags":"tags",
"heartrate":"hr",
"inter_beat_interval":"ibi",
"blood_volume_pulse":"bvp",
"electrodermal_activity":"eda"}
sensor_csv = sensor_short_name[snakemake.params["sensor"]] + '.csv'
warning = True
with ZipFile(snakemake.input[0], 'r') as zipFile:
listOfFileNames = zipFile.namelist()
for fileName in listOfFileNames:
if fileName.endswith(sensor_csv):
with open(snakemake.output[0], 'wb') as outputFile:
outputFile.write(zipFile.read(fileName))
warning = False
if(warning):
warnings.warn("We could not find a zipped file for {} in {} (we tried to find {})".format(snakemake.params["sensor"], snakemake.input[0], sensor_csv))