Migrate empatica sensors to new data stream

pull/128/head
JulioV 2021-03-06 23:16:59 -05:00
parent 2eae84ff05
commit f65e3c8b1a
17 changed files with 371 additions and 211 deletions

3
.gitignore vendored
View File

@ -111,4 +111,5 @@ sn_profile_*/
!sn_profile_rapids
settings.dcf
tests/fakedata_generation/
site/
site/
credentials.yaml

View File

@ -296,11 +296,7 @@ for provider in config["FITBIT_STEPS_INTRADAY"]["PROVIDERS"].keys():
for provider in config["EMPATICA_ACCELEROMETER"]["PROVIDERS"].keys():
if config["EMPATICA_ACCELEROMETER"]["PROVIDERS"][provider]["COMPUTE"]:
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_raw.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_accelerometer_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_accelerometer_features/empatica_accelerometer_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ACCELEROMETER"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_accelerometer.csv", pid=config["PIDS"]))
@ -309,11 +305,7 @@ for provider in config["EMPATICA_ACCELEROMETER"]["PROVIDERS"].keys():
for provider in config["EMPATICA_HEARTRATE"]["PROVIDERS"].keys():
if config["EMPATICA_HEARTRATE"]["PROVIDERS"][provider]["COMPUTE"]:
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_raw.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_heartrate_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_heartrate_features/empatica_heartrate_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_HEARTRATE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_heartrate.csv", pid=config["PIDS"]))
@ -323,11 +315,7 @@ for provider in config["EMPATICA_HEARTRATE"]["PROVIDERS"].keys():
for provider in config["EMPATICA_TEMPERATURE"]["PROVIDERS"].keys():
if config["EMPATICA_TEMPERATURE"]["PROVIDERS"][provider]["COMPUTE"]:
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_raw.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_temperature_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_temperature_features/empatica_temperature_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_TEMPERATURE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_temperature.csv", pid=config["PIDS"]))
@ -336,11 +324,7 @@ for provider in config["EMPATICA_TEMPERATURE"]["PROVIDERS"].keys():
for provider in config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"].keys():
if config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"][provider]["COMPUTE"]:
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_raw.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_electrodermal_activity_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_electrodermal_activity_features/empatica_electrodermal_activity_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_electrodermal_activity.csv", pid=config["PIDS"]))
@ -349,11 +333,7 @@ for provider in config["EMPATICA_ELECTRODERMAL_ACTIVITY"]["PROVIDERS"].keys():
for provider in config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"].keys():
if config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"][provider]["COMPUTE"]:
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_raw.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_blood_volume_pulse_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_blood_volume_pulse_features/empatica_blood_volume_pulse_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_blood_volume_pulse.csv", pid=config["PIDS"]))
@ -362,11 +342,7 @@ for provider in config["EMPATICA_BLOOD_VOLUME_PULSE"]["PROVIDERS"].keys():
for provider in config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"].keys():
if config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][provider]["COMPUTE"]:
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_raw.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_inter_beat_interval_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_inter_beat_interval_features/empatica_inter_beat_interval_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_inter_beat_interval.csv", pid=config["PIDS"]))
@ -376,11 +352,7 @@ for provider in config["EMPATICA_INTER_BEAT_INTERVAL"]["PROVIDERS"].keys():
if isinstance(config["EMPATICA_TAGS"]["PROVIDERS"], dict):
for provider in config["EMPATICA_TAGS"]["PROVIDERS"].keys():
if config["EMPATICA_TAGS"]["PROVIDERS"][provider]["COMPUTE"]:
for pid in config["PIDS"]:
suffixes = get_zip_suffixes(pid)
files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_unzipped_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_raw_{suffix}.csv", pid=pid, suffix=suffixes))
files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_joined.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_raw.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/empatica_tags_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/empatica_tags_features/empatica_tags_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["EMPATICA_TAGS"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/empatica_tags.csv", pid=config["PIDS"]))

View File

@ -466,13 +466,12 @@ FITBIT_STEPS_INTRADAY:
# EMPATICA #
########################################################################################################################
EMPATICA_DATA_CONFIGURATION:
SOURCE:
TYPE: ZIP_FILE
EMPATICA_DATA_STREAMS:
USE: empatica_zipfiles
# AVAILABLE:
empatica_zipfiles:
FOLDER: data/external/empatica
TIMEZONE:
TYPE: SINGLE # Empatica devices don't support time zones so we read this data in the timezone indicated by VALUE
VALUE: *timezone
# Sensors ------

View File

@ -1,10 +1,8 @@
# Welcome to RAPIDS documentation
Reproducible Analysis Pipeline for Data Streams (RAPIDS) allows you to process smartphone and wearable data to [extract](features/feature-introduction.md) and [create](features/add-new-features.md) **behavioral features** (a.k.a. digital biomarkers), [visualize](visualizations/data-quality-visualizations.md) mobile sensor data and [structure](workflow-examples/analysis.md) your analysis into reproducible workflows.
Reproducible Analysis Pipeline for Data Streams (RAPIDS) allows you to process smartphone and wearable data to [extract](features/feature-introduction.md) and [create](features/add-new-features.md) **behavioral features** (a.k.a. digital biomarkers), [visualize](visualizations/data-quality-visualizations.md) mobile sensor data, and [structure](workflow-examples/analysis.md) your analysis into reproducible workflows.
RAPIDS is open source, documented, modular, tested, and reproducible. At the moment we support smartphone data, and wearable data from Fitbit devices, and Empatica devices (these in collaboration with the [DBDP](https://dbdp.org/)).
Read the [introduction to data streams](../../datastreams/data-streams-introduction) for more information on what data streams we support, and this tutorial to [add support for new data streams](../../datastreams/add-new-data-streams) for smartphones or Fitbits (formats/containers).
RAPIDS is open source, documented, modular, tested, and reproducible. At the moment, we support data streams logged by smartphones, Fitbit wearables, and, in collaboration with the [DBDP](https://dbdp.org/), Empatica wearables. Read the [introduction to data streams](../../datastreams/data-streams-introduction) for more information on what specific data streams RAPIDS can process, and this tutorial if you want to [add support for new data streams](../../datastreams/add-new-data-streams).
!!! tip
:material-slack: Questions or feedback can be posted on the \#rapids channel in AWARE Framework\'s [slack](http://awareframework.com:3000/).
@ -19,19 +17,19 @@ Read the [introduction to data streams](../../datastreams/data-streams-introduct
## How does it work?
RAPIDS is formed by R and Python scripts orchestrated by [Snakemake](https://snakemake.readthedocs.io/en/stable/). We suggest you read Snakemake's docs but in short: every link in the analysis chain is atomic and has files as input and output. Behavioral features are processed per sensor and per participant.
RAPIDS is formed by R and Python scripts orchestrated by [Snakemake](https://snakemake.readthedocs.io/en/stable/). We suggest you read Snakemake's docs but in short: every link in the analysis chain is atomic and has files as input and output. Behavioral features are processed per sensor and participant.
## What are the benefits of using RAPIDS?
1. **Consistent analysis**. Every participant sensor dataset is analyzed in the exact same way and isolated from each other.
2. **Efficient analysis**. Every analysis step is executed only once. Whenever your data or configuration changes only the affected files are updated.
1. **Consistent analysis**. Every participant sensor dataset is analyzed in the same way and isolated from each other.
2. **Efficient analysis**. Every analysis step is executed only once. Whenever your data or configuration changes, only the affected files are updated.
5. **Parallel execution**. Thanks to Snakemake, your analysis can be executed over multiple cores without changing your code.
6. **Code-free features**. Extract any of the behavioral features offered by RAPIDS without writing any code.
7. **Extensible code**. You can easily add your own behavioral features in R or Python, share them with the community, and keep authorship and citations.
8. **Timezone aware**. Your data is adjusted to the specified timezone (multiple timezones suport *coming soon*).
9. **Flexible time segments**. You can extract behavioral features on time windows of any length (e.g. 5 minutes, 3 hours, 2 days), on every day or particular days (e.g. weekends, Mondays, the 1st of each month, etc.) or around events of interest (e.g. surveys or clinical relapses).
10. **Tested code**. We are constantly adding tests to make sure our behavioral features are correct.
11. **Reproducible code**. If you structure your analysis within RAPIDS, you can be sure your code will run in other computers as intended thanks to R and Python virtual environments. You can share your analysis code along your publications without any overhead.
7. **Extensible code**. You can easily add your own data streams or behavioral features in R or Python, share them with the community, and keep authorship and citations.
8. **Timezone aware**. Your data is adjusted to one or more time zones per participant.
9. **Flexible time segments**. You can extract behavioral features on time windows of any length (e.g., 5 minutes, 3 hours, 2 days), on every day or particular days (e.g., weekends, Mondays, the 1st of each month, etc.), or around events of interest (e.g., surveys or clinical relapses).
10. **Tested code**. We are continually adding tests to make sure our behavioral features are correct.
11. **Reproducible code**. If you structure your analysis within RAPIDS, you can be sure your code will run in other computers as intended, thanks to R and Python virtual environments. You can share your analysis code along with your publications without any overhead.
12. **Private**. All your data is processed locally.
## How is it organized?

View File

@ -591,43 +591,40 @@ Modify the following keys in your `config.yaml` depending on the [data stream](.
=== "Empatica"
The relevant `config.yaml` section looks like this by default:
Set `[USE]` to the Empatica data stream you want to use, see the table in [introduction to data streams](../../datastreams/data-streams-introduction). Configure any parameters as inidicated below.
```yaml
SOURCE:
TYPE: ZIP_FILE
FOLDER: data/external/empatica
TIMEZONE:
TYPE: SINGLE # Empatica devices don't support time zones so we read this data in the timezone indicated by VALUE
VALUE: *timezone
EMPATICA_DATA_STREAMS:
USE: empatica_zipfiles
# AVAILABLE:
empatica_zipfiles:
FOLDER: data/external/empatica
```
**Parameters for `[EMPATICA_DATA_CONFIGURATION]`**
=== "empatica_zipfiles"
| Key | Description |
|---------------------|----------------------------------------------------------------------------------------------------------------------------|
| `[SOURCE] [TYPE]` | Only `ZIP_FILE` is supported (Empatica devices save sensor data in CSV files that are zipped together).|
| `[SOURCE] [FOLDER]` | The relative path to a folder containing one folder per participant. The name of a participant folder should match their pid in `config[PIDS]`, for example `p01`. Each participant folder can have one or more zip files with any name; in other words, the sensor data contained in those zip files belongs to a single participant. The zip files are [automatically](https://support.empatica.com/hc/en-us/articles/201608896-Data-export-and-formatting-from-E4-connect-) generated by Empatica and have a CSV file per sensor (`ACC`, `HR`, `TEMP`, `EDA`, `BVP`, `TAGS`). All CSV files of the same type contained in one or more zip files are uncompressed, parsed, sorted by timestamp, and joinned together.|
| `[TIMEZONE] [TYPE]` | Only `SINGLE` is supported for now |
| `[TIMEZONE] [VALUE]` | `*timezone` points to the value defined before in [Timezone of your study](#timezone-of-your-study) |
| Key | Description |
|---------------------|----------------------------------------------------------------------------------------------------------------------------|
| `[FOLDER]` | The relative path to a folder containing one subfolder per participant. The name of a participant folder should match their pid in `config[PIDS]`, for example `p01`. Each participant folder can have one or more zip files with any name; in other words, the sensor data contained in those zip files belongs to a single participant. The zip files are [automatically](https://support.empatica.com/hc/en-us/articles/201608896-Data-export-and-formatting-from-E4-connect-) generated by Empatica and have a CSV file per sensor (`ACC`, `HR`, `TEMP`, `EDA`, `BVP`, `TAGS`). All CSV files of the same type contained in one or more zip files are uncompressed, parsed, sorted by timestamp, and joinned together.|
??? example "Example of an EMPATICA FOLDER"
In the file tree below, we want to process the data of three participants: `p01`, `p02`, and `p03`. `p01` has two zip files, `p02` has only one zip file, and `p03` has three zip files. Each zip will have a CSV file per sensor that are joinned together and process by RAPIDS. These zip files are generated by Empatica.
```bash
data/ # this folder exists in the root RAPIDS folder
external/
empatica/
p01/
file1.zip
file2.zip
p02/
aaaa.zip
p03/
t1.zip
t2.zip
t3.zip
```
??? example "Example of an EMPATICA FOLDER"
In the file tree below, we want to process the data of three participants: `p01`, `p02`, and `p03`. `p01` has two zip files, `p02` has only one zip file, and `p03` has three zip files. Each zip will have a CSV file per sensor that are joinned together and process by RAPIDS. These zip files are generated by Empatica.
```bash
data/ # this folder exists in the root RAPIDS folder
external/
empatica/
p01/
file1.zip
file2.zip
p02/
aaaa.zip
p03/
t1.zip
t2.zip
t3.zip
```
---

View File

@ -24,6 +24,8 @@ markdown_extensions:
- pymdownx.mark
- pymdownx.smartsymbols
- pymdownx.superfences
- pymdownx.snippets:
check_paths: True
- pymdownx.tabbed
- pymdownx.tasklist:
custom_checkbox: True

View File

@ -30,21 +30,6 @@ def get_phone_sensor_names():
phone_sensor_names.append(config_key)
return phone_sensor_names
def get_zip_suffixes(pid):
from pathlib import Path
zipfiles = list((Path("data/external/empatica/") / Path(pid)).rglob("*.zip"))
suffixes = []
for zipfile in zipfiles:
suffixes.append(zipfile.stem)
return suffixes
def get_all_raw_empatica_sensor_files(wildcards):
suffixes = get_zip_suffixes(wildcards.pid)
files = ["data/raw/{}/empatica_{}_raw_{}.csv".format(wildcards.pid, wildcards.sensor, suffix) for suffix in suffixes]
return(files)
def download_phone_data_input_with_mutation_scripts(wilcards):
import yaml
input = dict()
@ -76,4 +61,34 @@ def input_tzcodes_file(wilcards):
if not Path(config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"]).exists():
raise ValueError("[TIMEZONE][MULTIPLE][TZCODES_FILE] should point to a CSV file, the file in the path you typed does not exist: " + config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"])
return [config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"]]
return []
return []
def pull_empatica_data_input_with_mutation_scripts(wilcards):
import yaml
from pathlib import Path
input = dict()
empatica_stream = config["EMPATICA_DATA_STREAMS"]["USE"]
input["participant_file"] = "data/external/participant_files/{pid}.yaml"
input["rapids_schema_file"] = "src/data/streams/rapids_columns.yaml"
input["stream_format"] = "src/data/streams/" + empatica_stream + "/format.yaml"
if Path("src/data/streams/"+ empatica_stream + "/container.R").exists():
input["stream_container"] = "src/data/streams/"+ empatica_stream + "/container.R"
elif Path("src/data/streams/"+ empatica_stream + "/container.py").exists():
input["stream_container"] = "src/data/streams/"+ empatica_stream + "/container.py"
else:
raise ValueError("The container script for {stream} is missing: src/data/streams/{stream}/container.[py|R]".format(stream=empatica_stream))
schema = yaml.load(open(input.get("stream_format"), 'r'), Loader=yaml.FullLoader)
sensor = ("empatica_" + wilcards.sensor).upper()
if sensor not in schema:
raise ValueError("{sensor} is not defined in the schema {schema}".format(sensor=sensor, schema=input.get("stream_format")))
scripts = schema[sensor]["MUTATION_SCRIPTS"]
if isinstance(scripts, list):
for idx, script in enumerate(scripts):
if not script.lower().endswith((".py", ".r")):
raise ValueError("Mutate scripts can only be Python or R scripts (.py, .R).\n Instead we got {script} in \n [{sensor}] of {schema}".format(script=script, sensor=sensor, schema=input.get("stream_format")))
input["mutationscript"+str(idx)] = script
return input

View File

@ -266,50 +266,30 @@ rule fitbit_readable_datetime:
script:
"../src/data/readable_datetime.R"
from pathlib import Path
rule unzip_empatica_data:
input:
input_file = Path(config["EMPATICA_DATA_CONFIGURATION"]["SOURCE"]["FOLDER"]) / Path("{pid}") / Path("{suffix}.zip"),
participant_file = "data/external/participant_files/{pid}.yaml"
rule pull_empatica_data:
input: unpack(pull_empatica_data_input_with_mutation_scripts)
params:
sensor = "{sensor}"
data_configuration = config["EMPATICA_DATA_STREAMS"][config["EMPATICA_DATA_STREAMS"]["USE"]],
sensor = "empatica_" + "{sensor}",
pid = "{pid}"
output:
sensor_output = "data/raw/{pid}/empatica_{sensor}_unzipped_{suffix}.csv"
"data/raw/{pid}/empatica_{sensor}_raw.csv"
script:
"../src/data/empatica/unzip_empatica_data.py"
rule extract_empatica_data:
input:
input_file = "data/raw/{pid}/empatica_{sensor}_unzipped_{suffix}.csv",
participant_file = "data/external/participant_files/{pid}.yaml"
params:
data_configuration = config["EMPATICA_DATA_CONFIGURATION"],
sensor = "{sensor}",
table = lambda wildcards: config["EMPATICA_" + str(wildcards.sensor).upper()]["TABLE"],
output:
sensor_output = "data/raw/{pid}/empatica_{sensor}_raw_{suffix}.csv"
script:
"../src/data/empatica/extract_empatica_data.py"
rule join_empatica_data:
input:
input_files = get_all_raw_empatica_sensor_files,
output:
sensor_output = "data/raw/{pid}/empatica_{sensor}_joined.csv"
script:
"../src/data/empatica/join_empatica_data.R"
"../src/data/pull_empatica_data.R"
rule empatica_readable_datetime:
input:
sensor_input = "data/raw/{pid}/empatica_{sensor}_joined.csv",
time_segments = "data/interim/time_segments/{pid}_time_segments.csv"
sensor_input = "data/raw/{pid}/empatica_{sensor}_raw.csv",
time_segments = "data/interim/time_segments/{pid}_time_segments.csv",
pid_file = "data/external/participant_files/{pid}.yaml",
tzcodes_file = input_tzcodes_file,
params:
timezones = config["PHONE_DATA_CONFIGURATION"]["TIMEZONE"]["TYPE"],
fixed_timezone = config["PHONE_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"],
device_type = "empatica",
timezone_parameters = config["TIMEZONE"],
pid = "{pid}",
time_segments_type = config["TIME_SEGMENTS"]["TYPE"],
include_past_periodic_segments = config["TIME_SEGMENTS"]["INCLUDE_PAST_PERIODIC_SEGMENTS"]
output:
"data/raw/{pid}/empatica_{sensor}_with_datetime.csv"
script:
"../src/data/readable_datetime.R"
"../src/data/datetime/readable_datetime.R"

View File

@ -29,13 +29,13 @@ filter_tz_per_device <- function(device_id, tz_codes, default, IF_MISSING_TZCODE
}
assign_tz_code <- function(data, tz_codes){
data$local_timezone = NA_character_
for(i in 1:nrow(tz_codes)) {
start_timestamp <- tz_codes[[i, "timestamp"]]
end_timestamp <- tz_codes[[i, "end_timestamp"]]
time_zone <- trimws(tz_codes[[i, "tzcode"]], which="both")
data$local_timezone <- ifelse(start_timestamp <= data$timestamp & data$timestamp < end_timestamp, time_zone, data$local_timezone)
data$local_timezone <- if_else(start_timestamp <= data$timestamp & data$timestamp < end_timestamp, time_zone, data$local_timezone)
}
return(data %>% filter(!is.na(local_timezone)))
@ -65,7 +65,7 @@ validate_devies_exist_in_participant_file <- function(devices, device_type, pid,
}
# TODO include CSV timezone file in rule
multiple_time_zone_assignment <- function(data, timezone_parameters, device_type, pid, participant_file){
multiple_time_zone_assignment <- function(sensor_data, timezone_parameters, device_type, pid, participant_file){
tz_codes <- read.csv(timezone_parameters$MULTIPLE$TZCODES_FILE)
default <- timezone_parameters$MULTIPLE$DEFAULT_TZCODE
IF_MISSING_TZCODE <- timezone_parameters$MULTIPLE$IF_MISSING_TZCODE
@ -76,9 +76,7 @@ multiple_time_zone_assignment <- function(data, timezone_parameters, device_type
phone_ids <- participant_data$PHONE$DEVICE_IDS
fitbit_ids <- participant_data$FITBIT$DEVICE_IDS
if(device_type == "empatica")
data$device_id = pid
else if(device_type == "fitbit"){
if(device_type == "fitbit"){
if(!ALLOW_MULTIPLE_TZ_PER_DEVICE){
validate_single_tz_per_fitbit_device(tz_codes, INFER_FROM_SMARTPHONE_TZ)
} else if(INFER_FROM_SMARTPHONE_TZ){
@ -86,18 +84,22 @@ multiple_time_zone_assignment <- function(data, timezone_parameters, device_type
validate_devies_exist_in_participant_file(fitbit_ids, "FITBIT", pid, participant_file)
unified_device_id <- paste0("unified_device_id", pid)
data <- data %>% mutate(device_id = if_else(device_id %in% phone_ids, unified_device_id, device_id))
sensor_data <- sensor_data %>% mutate(device_id = if_else(device_id %in% phone_ids, unified_device_id, device_id))
tz_codes <- tz_codes %>% mutate(device_id = if_else(device_id %in% fitbit_ids, unified_device_id, device_id))
}
}
tz_intervals <- buils_tz_intervals(tz_codes)
data <- data %>%
group_by(device_id) %>%
nest() %>%
mutate(tz_codes_per_device = map(device_id, filter_tz_per_device, tz_intervals, default, IF_MISSING_TZCODE)) %>%
mutate(data = map2(data, tz_codes_per_device, assign_tz_code )) %>%
select(-tz_codes_per_device) %>%
unnest(cols = data)
return(data)
sensor_data <- sensor_data %>% mutate(local_timezone = NA_character_)
if(nrow(sensor_data) > 0){
sensor_data <- sensor_data %>%
group_by(device_id) %>%
nest() %>%
mutate(tz_codes_per_device = map(device_id, filter_tz_per_device, tz_intervals, default, IF_MISSING_TZCODE)) %>%
mutate(data = map2(data, tz_codes_per_device, assign_tz_code )) %>%
select(-tz_codes_per_device) %>%
unnest(cols = data)
}
return(sensor_data)
}

View File

@ -51,6 +51,8 @@ validate_user_timezones <- function(timezone_parameters){
create_mising_temporal_column <- function(data, device_type){
if(device_type == "fitbit"){
# For fibit we infere timestamp from Fitbit's local date time
if(nrow(data) == 0)
return(data %>% mutate(timestamp = NA_real_))
return(data %>%
group_by(local_timezone) %>%
nest() %>%
@ -60,6 +62,8 @@ create_mising_temporal_column <- function(data, device_type){
unnest(cols = everything()))
} else {
# For the rest of devices we infere local date time from timestamp
if(nrow(data) == 0)
return(data %>% mutate(local_date_time = NA_character_))
return(data %>%
group_by(local_timezone) %>%
nest() %>%

View File

@ -1,17 +0,0 @@
source("renv/activate.R")
library("tidyr")
library("dplyr", warn.conflicts = F)
empatica_files <- snakemake@input[["input_files"]]
empatica_data <- setNames(data.frame(matrix(ncol = 1, nrow = 0)), c("timestamp"))
for(file in empatica_files){
data <- read.csv(file)
if(! ("timestamp" %in% colnames(data)))
stop(paste("This file does not have a timestamp column, something might have gone wrong while unzipping it:", file))
empatica_data <- merge(empatica_data, data, all = TRUE)
}
write.csv(empatica_data, snakemake@output[[1]], row.names = FALSE)

View File

@ -1,21 +0,0 @@
from zipfile import ZipFile
import warnings
sensor_short_name = {"accelerometer":"ACC",
"temperature":"TEMP",
"tags":"tags",
"heartrate":"HR",
"inter_beat_interval":"IBI",
"blood_volume_pulse":"BVP",
"electrodermal_activity":"EDA"}
sensor_csv = sensor_short_name[snakemake.params["sensor"]] + '.csv'
warning = True
with ZipFile(snakemake.input[0], 'r') as zipFile:
listOfFileNames = zipFile.namelist()
for fileName in listOfFileNames:
if fileName == sensor_csv:
with open(snakemake.output[0], 'wb') as outputFile:
outputFile.write(zipFile.read(fileName))
warning = False
if(warning):
warnings.warn("We could not find a zipped file for {} in {} (we tried to find {})".format(snakemake.params["sensor"], snakemake.input[0], sensor_csv))

View File

@ -0,0 +1,121 @@
source("renv/activate.R")
library(yaml)
library(dplyr)
library(readr)
# we use reticulate but only load it if we are going to use it to minimize the case when old RAPIDS deployments need to update ther renv
mutate_data <- function(scripts, data){
for(script in scripts){
if(grepl("\\.(R)$", script)){
myEnv <- new.env()
source(script, local=myEnv)
attach(myEnv, name="sourced_scripts_rapids")
if(exists("main", myEnv)){
message(paste("Applying mutation script", script))
data <- main(data)
} else{
stop(paste0("The following mutation script does not have main function: ", script))
}
# rm(list = ls(envir = myEnv), envir = myEnv, inherits = FALSE)
detach("sourced_scripts_rapids")
} else{ # python
library(reticulate)
module <- gsub(pattern = "\\.py$", "", basename(script))
script_functions <- import_from_path(module, path = dirname(script))
if(py_has_attr(script_functions, "main")){
message(paste("Applying mutation script", script))
data <- script_functions$main(data)
} else{
stop(paste0("The following mutation script does not have a main function: ", script))
}
}
}
return(data)
}
rename_columns <- function(name_maps, data){
for(name in names(name_maps))
data <- data %>% rename(!!tolower(name) := name_maps[[name]])
return(data)
}
validate_expected_columns_mapping <- function(schema, rapids_schema, sensor, rapids_schema_file, stream_format){
columns <- names(schema[[sensor]][["COLUMN_MAPPINGS"]])
columns <- columns[(columns != "FLAG_AS_EXTRA")]
rapids_columns <- rapids_schema[[sensor]]
if(is.null(rapids_columns))
stop(paste(sensor, " columns are not listed in RAPIDS' column specification. If you are adding support for a new phone sensor, add any mandatory columns in ", rapids_schema_file))
if(length(setdiff(rapids_columns, columns)) > 0)
stop(paste(sensor," mappings are missing one or more mandatory columns. The missing column mappings are for ", paste(setdiff(rapids_columns, columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
if(length(setdiff(columns, rapids_columns)) > 0)
stop(paste(sensor," mappings have one or more columns than required, add them as FLAG_AS_EXTRA instead. The extra column mappings are for ", paste(setdiff(columns, rapids_columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
}
load_container_script <- function(stream_container){
language <- if_else(endsWith(tolower(stream_container), "py"), "python", "r")
if(language == "python"){
library(reticulate)
container <- import_from_path(gsub(pattern = "\\.py$", "", basename(stream_container)), path = dirname(stream_container))
if(!py_has_attr(container, "pull_data"))
stop(paste0("The following container.py script does not have a pull_data function: ", stream_container))
return(container$pull_data)
} else if(language == "r"){
source(stream_container)
if(exists("pull_data"))
stop(paste0("The following container.R script does not have a pull_data function: ", stream_container))
return(pull_data)
}
}
pull_empatica_data_main <- function(){
participant_file <- snakemake@input[["participant_file"]]
stream_format <- snakemake@input[["stream_format"]]
rapids_schema_file <- snakemake@input[["rapids_schema_file"]]
stream_container <- snakemake@input[["stream_container"]]
data_configuration <- snakemake@params[["data_configuration"]]
pid <- snakemake@params[["pid"]]
table <- snakemake@params[["table"]]
sensor <- toupper(snakemake@params[["sensor"]])
output_data_file <- snakemake@output[[1]]
participant_data <- read_yaml(participant_file)
stream_schema <- read_yaml(stream_format)
rapids_schema <- read_yaml(rapids_schema_file)
devices <- participant_data$EMPATICA$DEVICE_IDS
if(length(devices) == 0)
devices <- c(pid)
validate_expected_columns_mapping(stream_schema, rapids_schema, sensor, rapids_schema_file, stream_format)
expected_columns <- tolower(names(stream_schema[[sensor]][["COLUMN_MAPPINGS"]]))
expected_columns <- expected_columns[(expected_columns != "flag_extra")]
participant_data <- setNames(data.frame(matrix(ncol = length(expected_columns), nrow = 0)), expected_columns)
pull_data_container <- load_container_script(stream_container)
for(idx in seq_along(devices)){ #TODO remove length
device <- devices[idx]
message(paste0("\nProcessing ", sensor, " for ", device))
columns_to_download <- stream_schema[[sensor]][["COLUMN_MAPPINGS"]]
columns_to_download <- columns_to_download[(columns_to_download != "FLAG_TO_MUTATE")]
data <- pull_data_container(data_configuration, device, sensor, columns_to_download)
columns_to_rename <- stream_schema[[sensor]][["COLUMN_MAPPINGS"]]
columns_to_rename <- (columns_to_rename[(columns_to_rename != "FLAG_TO_MUTATE" & names(columns_to_rename) != "FLAG_AS_EXTRA")])
renamed_data <- rename_columns(columns_to_rename, data)
mutation_scripts <- stream_schema[[sensor]][["MUTATION_SCRIPTS"]]
mutated_data <- mutate_data(mutation_scripts, renamed_data)
if(length(setdiff(expected_columns, colnames(mutated_data))) > 0)
stop(paste("The mutated data for", device, "is missing these columns expected by RAPIDS: [", paste(setdiff(expected_columns, colnames(mutated_data)), collapse=","),"]. One ore more mutation scripts in [", sensor,"][",toupper(device_os), "]","[MUTATION_SCRIPTS] are removing or not adding these columns"))
participant_data <- rbind(participant_data, mutated_data)
}
write_csv(participant_data, output_data_file)
}
pull_empatica_data_main()

View File

@ -1,10 +1,12 @@
from zipfile import ZipFile
import warnings
from pathlib import Path
import pandas as pd
from pandas.core import indexing
import yaml
import csv
from collections import OrderedDict
from io import BytesIO, StringIO
def processAcceleration(x, y, z):
x = float(x)
@ -15,8 +17,8 @@ def processAcceleration(x, y, z):
def readFile(file, dtype):
dict = OrderedDict()
with open(file, 'rt') as csvfile:
# file is an in-memory buffer
with file as csvfile:
if dtype in ('electrodermal_activity', 'temperature', 'heartrate', 'blood_volume_pulse'):
reader = csv.reader(csvfile, delimiter='\n')
elif dtype == 'accelerometer':
@ -40,7 +42,10 @@ def readFile(file, dtype):
return dict
def extract_empatica_data(sensor_data_file, output_file, start_date, end_date, timezone, sensor):
def extract_empatica_data(data, sensor):
sensor_data_file = BytesIO(data).getvalue().decode('utf-8')
sensor_data_file = StringIO(sensor_data_file)
# read sensor data
if sensor in ('electrodermal_activity', 'temperature', 'heartrate', 'blood_volume_pulse'):
ddict = readFile(sensor_data_file, sensor)
@ -68,27 +73,41 @@ def extract_empatica_data(sensor_data_file, output_file, start_date, end_date, t
raise ValueError(
"sensor can only be one of ['electrodermal_activity','temperature','heartrate','blood_volume_pulse','accelerometer','inter_beat_interval'].")
# filter based on given start and end date
start_date_utc = pd.Timestamp(start_date, tz=timezone).timestamp()
end_date_utc = pd.Timestamp(end_date, tz=timezone).timestamp()
df = df[start_date_utc:end_date_utc]
# format timestamps
df.index *= 1000
df.index = df.index.astype(int)
return(df)
# output csv file
df.to_csv(output_file)
def pull_data(data_configuration, device, sensor, columns_to_download):
sensor = sensor[9:].lower()
sensor_short_name = {"accelerometer":"ACC",
"temperature":"TEMP",
"tags":"tags",
"heartrate":"HR",
"inter_beat_interval":"IBI",
"blood_volume_pulse":"BVP",
"electrodermal_activity":"EDA"}
sensor_csv = sensor_short_name[sensor] + '.csv'
warning = True
participant_data = pd.DataFrame(columns=columns_to_download.values())
participant_data.set_index('timestamp', inplace=True)
sensor_data_file = snakemake.input[0]
output_file = snakemake.output[0]
with open(snakemake.input[1], "r", encoding="utf-8") as f:
participant_file = yaml.safe_load(f)
for zipfile in list((Path(data_configuration["FOLDER"]) / Path(device)).rglob("*.zip")):
print("Extracting {} data from {} for {}".format(sensor, zipfile, device))
with ZipFile(zipfile, 'r') as zipFile:
listOfFileNames = zipFile.namelist()
for fileName in listOfFileNames:
if fileName == sensor_csv:
participant_data = pd.concat([participant_data, extract_empatica_data(zipFile.read(fileName), sensor)], axis=0)
warning = False
if warning:
warnings.warn("We could not find a zipped file for {} in {} (we tried to find {})".format(sensor, zipFile, sensor_csv))
participant_data.sort_index(inplace=True, ascending=True)
participant_data.reset_index(inplace=True)
participant_data.drop_duplicates(subset='timestamp', keep='first',inplace=True)
participant_data["device_id"] = device
return(participant_data)
start_date = participant_file["EMPATICA"]["START_DATE"]
end_date = participant_file["EMPATICA"]["END_DATE"]
timezone = snakemake.params["data_configuration"]["TIMEZONE"]["VALUE"]
sensor = snakemake.params["sensor"]
extract_empatica_data(sensor_data_file, output_file, start_date, end_date, timezone, sensor)
# print(pull_data({'FOLDER': 'data/external/empatica'}, "e01", "EMPATICA_accelerometer", {'TIMESTAMP': 'timestamp', 'DEVICE_ID': 'device_id', 'DOUBLE_VALUES_0': 'x', 'DOUBLE_VALUES_1': 'y', 'DOUBLE_VALUES_2': 'z'}))

View File

@ -0,0 +1,50 @@
EMPATICA_ACCELEROMETER:
COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
DOUBLE_VALUES_0: x
DOUBLE_VALUES_1: 'y'
DOUBLE_VALUES_2: z
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
EMPATICA_HEARTRATE:
COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
HEARTRATE: heartrate
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
EMPATICA_TEMPERATURE:
COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
TEMPERATURE: temperature
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
EMPATICA_ELECTRODERMAL_ACTIVITY:
COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
ELECTRODERMAL_ACTIVITY: electrodermal_activity
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
EMPATICA_BLOOD_VOLUME_PULSE:
COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
BLOOD_VOLUME_PULSE: blood_volume_pulse
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
EMPATICA_INTER_BEAT_INTERVAL:
COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
INTER_BEAT_INTERVAL: inter_beat_interval
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
EMPATICA_EMPATICA_TAGS:
COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
TAGS: tags
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data

View File

@ -19,3 +19,41 @@ PHONE_CONVERSATION:
- INFERENCE
- DOUBLE_CONVO_START
- DOUBLE_CONVO_END
EMPATICA_ACCELEROMETER:
- TIMESTAMP
- DEVICE_ID
- DOUBLE_VALUES_0
- DOUBLE_VALUES_1
- DOUBLE_VALUES_2
EMPATICA_HEARTRATE:
- TIMESTAMP
- DEVICE_ID
- HEARTRATE
EMPATICA_TEMPERATURE:
- TIMESTAMP
- DEVICE_ID
- TEMPERATURE
EMPATICA_ELECTRODERMAL_ACTIVITY:
- TIMESTAMP
- DEVICE_ID
- ELECTRODERMAL_ACTIVITY
EMPATICA_BLOOD_VOLUME_PULSE:
- TIMESTAMP
- DEVICE_ID
- BLOOD_VOLUME_PULSE
EMPATICA_INTER_BEAT_INTERVAL:
- TIMESTAMP
- DEVICE_ID
- INTER_BEAT_INTERVAL
EMPATICA_TAGS:
- TIMESTAMP
- DEVICE_ID
- TAGS

View File

@ -17,7 +17,7 @@ def dbdp_features(sensor_data_files, time_segment, provider, filter_data_by_segm
if not acc_data.empty:
acc_features = pd.DataFrame()
# get magnitude related features: magnitude = sqrt(x^2+y^2+z^2)
magnitude = acc_data.apply(lambda row: np.sqrt(row["x"] ** 2 + row["y"] ** 2 + row["z"] ** 2), axis=1)
magnitude = acc_data.apply(lambda row: np.sqrt(row["double_values_0"] ** 2 + row["double_values_1"] ** 2 + row["double_values_2"] ** 2), axis=1)
acc_data = acc_data.assign(magnitude = magnitude.values)
if "maxmagnitude" in features_to_compute: