Change MUTATION structure

pull/128/head
JulioV 2021-03-09 16:42:02 -05:00
parent 58ef276179
commit 6970954358
16 changed files with 251 additions and 373 deletions

View File

@ -23,7 +23,7 @@ Both the `container.[R|py]` and the `format.yaml` are saved under `src/data/stre
## Implement a Container
The `container` script of a data stream should be implemented in R (strongly recommended) or python. This script must have two functions if you are implementing a stream for phone data or one function otherwise. The script can contain any other auxiliary functions that your data stream might need.
The `container` script of a data stream can be implemented in R (strongly recommended) or python. This script must have two functions if you are implementing a stream for phone data or one function otherwise. The script can contain any other auxiliary functions that your data stream might need.
First of all, add any parameters your script might need in `config.yaml` under `(device)_DATA_STREAMS`. These parameters will be available in the `stream_parameters` argument of the one or two functions you implement. For example, if you are adding support for `Beiwe` data stored in `PostgreSQL` and your container needs a set of credentials to connect to a database, your new data stream configuration would be:
@ -117,92 +117,134 @@ Then implement one or both of the following functions:
## Implement a Format
A format describes the mapping between your stream's raw data and the data that RAPIDS needs. This file has a section per sensor (e.g. `PHONE_ACCELEROMETER`), and each section has two keys (attributes):
A format describes the mapping between your stream's raw data and the data that RAPIDS needs. This file has a section per sensor (e.g. `PHONE_ACCELEROMETER`), and each section has two attributes (keys):
1. `COLUMN_MAPPINGS` is a mapping between the columns RAPIDS needs and the columns your raw data has.
2. `MUTATION_SCRIPTS` are a collection of R or Python scripts that transform your raw data into the format RAPIDS needs.
1. `RAPIDS_COLUMN_MAPPINGS` are mappings between the columns RAPIDS needs and the columns your raw data already has.
Let's explain these keys with examples.
1. The reserved keyword `FLAG_TO_MUTATE` flags columns that RAPIDS requires but that are not initially present in your container (database, CSV file). These columns have to be created by your mutation scripts.
2. `MUTATION`. Columns marked as `FLAG_TO_MUTATE` need to be created before RAPIDS can process data from a sensor
2. `COLUMN_MAPPINGS` are mappings between the columns a mutation `SCRIPT` needs and the columns your raw data has.
2. `SCRIPTS` are a collection of R or Python scripts that transform one or more raw data columns into the format RAPIDS needs.
!!! hint
`[RAPIDS_COLUMN_MAPPINGS]` and `[MUTATE][COLUMN_MAPPINGS]` have a `key` (left-hand side string) and a `value` (right-hand side string). The `values` are the names used to pulled columns from a container (e.g., columns in a database table). All `values` are renamed to their `keys` in lower case. The renamed columns are sent to every mutation script within the `data` argument, and the final output is the input RAPIDS process further.
For example, let's assume we are using `aware_mysql` and defining the following format for `PHONE_FAKESENSOR`:
```yaml
PHONE_FAKESENSOR:
ANDROID:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: beiwe_timestamp
DEVICE_ID: beiwe_deviceID
MAGNITUDE_SQUARED: FLAG_TO_MUTATE
MUTATE:
COLUMN_MAPPINGS:
MAGNITUDE: beiwe_value
SCRIPTS:
- src/data/streams/mutations/phone/square_magnitude.py
```
RAPIDS will:
1. Download `beiwe_timestamp`, `beiwe_deviceID`, and `beiwe_value` from the container of `aware_mysql` (MySQL DB)
2. Rename these columns to `timestamp`, `device_id`, and `magnitude`, respectively.
3. Execute `square_magnitude.py` with a data frame as an argument containing the renamed columns. This script will square `magnitude` and rename it to `magnitude_squared`
4. Verify the data frame returned by `square_magnitude.py` has the columns RAPIDS needs `timestamp`, `device_id`, and `magnitude_squared`.
5. Use this data frame as the input to be processed in the pipeline.
Note that although `RAPIDS_COLUMN_MAPPINGS` and `[MUTATE][COLUMN_MAPPINGS]` keys are in capital letters for readability (e.g. `MAGNITUDE_SQUARED`), the names of the final columns you mutate in your scripts should be lower case.
Let's explain this column mapping further with examples.
### Name mapping
The mapping for some sensors is straightforward. For example, accelerometer data most of the time has a timestamp, three axis (x,y,z) and a device id that produced it. It is likely that AWARE and a different sensing app like Beiwe logged accelerometer data in the same way but with different columns names. In this case we only need to match Beiwe data columns to RAPIDS columns one-to-one:
The mapping for some sensors is straightforward. For example, accelerometer data most of the time has a timestamp, three axes (x,y,z), and a device id that produced it. AWARE and a different sensing app like Beiwe likely logged accelerometer data in the same way but with different column names. In this case, we only need to match Beiwe data columns to RAPIDS columns one-to-one:
```yaml hl_lines="4 5 6 7 8"
PHONE_ACCELEROMETER:
ANDROID:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: beiwe_timestamp
DEVICE_ID: beiwe_deviceID
DOUBLE_VALUES_0: beiwe_x
DOUBLE_VALUES_1: beiwe_y
DOUBLE_VALUES_2: beiwe_z
MUTATION_SCRIPTS: # it's ok if this is empty
MUTATE:
COLUMN_MAPPINGS:
SCRIPTS: # it's ok if this is empty
```
### Value mapping
For some sensors we need to map column names and values. For example, screen data has ON and OFF events, let's suppose Beiwe represents an ON event with the number `1` but RAPIDS identifies ON events with the number `2`. In this case we need to mutate the raw data coming from Beiwe and replace all `1`s with `2`s.
For some sensors, we need to map column names and values. For example, screen data has ON and OFF events; let's suppose Beiwe represents an ON event with the number `1,` but RAPIDS identifies ON events with the number `2`. In this case, we need to mutate the raw data coming from Beiwe and replace all `1`s with `2`s.
We do this by listing one or more R or Python scripts in `MUTATION_SCRIPTS` that will be executed in order:
We do this by listing one or more R or Python scripts in `MUTATION_SCRIPTS` that will be executed in order. We usually store all mutation scripts under `src/data/streams/mutations/[device]/[platform]/` and they can be reused across data streams.
```yaml hl_lines="8"
```yaml hl_lines="10"
PHONE_SCREEN:
ANDROID:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: beiwe_timestamp
DEVICE_ID: beiwe_deviceID
EVENT: beiwe_event
MUTATION_SCRIPTS:
MUTATE:
COLUMN_MAPPINGS:
SCRIPTS:
- src/data/streams/mutations/phone/beiwe/beiwe_screen_map.py
```
Every `MUTATION_SCRIPT` has a `main` function that receives a data frame with your raw sensor data and should return the mutated data. We usually store all mutation scripts under `src/data/streams/mutations/[device]/[platform]/` and they can be reused across data streams.
!!! hint
This `MUTATION_SCRIPT` can also be used to clean/preprocess your data before extracting behavioral features.
- A `MUTATION_SCRIPT` can also be used to clean/preprocess your data before extracting behavioral features.
- A mutation script has to have a `main` function that receives two arguments, `data` and `stream_parameters`.
- The `stream_parameters` argument contains the `config.yaml` key/values of your data stream (this is the same argument that your `container.[py|R]` script receives, see [Implement a Container](#implement-a-container)).
=== "python"
Example of a python mutation script
```python
import pandas as pd
=== "python"
Example of a python mutation script
```python
import pandas as pd
def main(data):
# mutate data
return(data)
```
=== "R"
Example of a R mutation script
```r
source("renv/activate.R") # needed to use RAPIDS renv environment
library(dplyr)
def main(data, stream_parameters):
# mutate data
return(data)
```
=== "R"
Example of a R mutation script
```r
source("renv/activate.R") # needed to use RAPIDS renv environment
library(dplyr)
main <- function(data){
# mutate data
return(data)
}
```
main <- function(data, stream_parameters){
# mutate data
return(data)
}
```
### Complex mapping
Sometimes, your raw data doesn't even have the same columns RAPIDS expects for a sensor. For example, let's pretend Beiwe stores `PHONE_ACCELEROMETER` axis data in a single column called `acc_col` instead of three: `x-y-z`. You need to create a `MUTATION_SCRIPT` to split `acc_col` into three columns `x`, `y`, and `z`.
Sometimes, your raw data doesn't even have the same columns RAPIDS expects for a sensor. For example, let's pretend Beiwe stores `PHONE_ACCELEROMETER` axis data in a single column called `acc_col` instead of three. You have to create a `MUTATION_SCRIPT` to split `acc_col` into three columns `x`, `y`, and `z`.
For this, you mark the missing `COLUMN_MAPPINGS` with the word `FLAG_TO_MUTATE`, map `acc_col` to `FLAG_AS_EXTRA`, and list a Python script under `MUTATION_SCRIPT` with the code to split `acc_col`.
For this, you mark the three axes columns RAPIDS needs in `[RAPIDS_COLUMN_MAPPINGS]` with the word `FLAG_TO_MUTATE`, map `acc_col` in `[MUTATION][COLUMN_MAPPINGS]`, and list a Python script under `[MUTATION][SCRIPTS]` with the code to split `acc_col`. See an example below.
Every column mapped with `FLAG_AS_EXTRA` will be included in the data frame you receive in your mutation script and we recommend deleting them from the returned data frame after they are not needed anymore.
RAPIDS expects that every column mapped as `FLAG_TO_MUTATE` will be generated by your mutation script, so it won't try to retrieve them from your container (database, CSV file, etc.).
!!! hint
Note that although `COLUMN_MAPPINGS` keys are in capital letters for readability (e.g. `DOUBLE_VALUES_0`), the names of the final columns you mutate in your scripts should be lower case.
In our example, `acc_col` will be fetched from the stream's container and renamed to `JOINED_AXES` because `beiwe_split_acc.py` will split it into `double_values_0`, `double_values_1`, and `double_values_2`.
```yaml hl_lines="6 7 8 9 11"
```yaml hl_lines="6 7 8 11 13"
PHONE_ACCELEROMETER:
ANDROID:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: beiwe_timestamp
DEVICE_ID: beiwe_deviceID
DOUBLE_VALUES_0: FLAG_TO_MUTATE
DOUBLE_VALUES_1: FLAG_TO_MUTATE
DOUBLE_VALUES_2: FLAG_TO_MUTATE
FLAG_AS_EXTRA: acc_col
MUTATION_SCRIPTS:
MUTATE:
COLUMN_MAPPINGS:
JOINED_AXES: acc_col
SCRIPTS:
- src/data/streams/mutations/phone/beiwe/beiwe_split_acc.py
```
@ -210,7 +252,7 @@ This is a draft of `beiwe_split_acc.py` `MUTATION_SCRIPT`:
```python
import pandas as pd
def main(data):
def main(data, stream_parameters):
# data has the acc_col
# split acc_col into three columns: double_values_0, double_values_1, double_values_2 to match RAPIDS format
# remove acc_col since we don't need it anymore
@ -222,28 +264,32 @@ There is a special case for a complex mapping scenario for smartphone data strea
In case you didn't notice, the examples we have used so far are grouped under an `ANDROID` key, which means they will be applied to data collected by Android phones. Additionally, each sensor has an `IOS` key for a similar purpose. We use the complex mapping described above to transform iOS data into an Android format (it's always iOS to Android and any new phone data stream must do the same).
For example, this is the `format.yaml` key for `PHONE_ACTVITY_RECOGNITION`. Note that the `ANDROID` mapping is simple (one-to-one) but the `IOS` mapping is complex with two `FLAG_TO_MUTATE` columns, one `FLAG_AS_EXTRA` column, and one `MUTATION_SCRIPT`.
For example, this is the `format.yaml` key for `PHONE_ACTVITY_RECOGNITION`. Note that the `ANDROID` mapping is simple (one-to-one) but the `IOS` mapping is complex with two `FLAG_TO_MUTATE` columns, one `[MUTATE][COLUMN_MAPPINGS]` mapping, and one `[MUTATION][SCRIPT]`.
```yaml hl_lines="14 15 17 19"
```yaml hl_lines="16 17 21 23"
PHONE_ACTIVITY_RECOGNITION:
ANDROID:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
ACTIVITY_TYPE: activity_type
ACTIVITY_NAME: activity_name
CONFIDENCE: confidence
MUTATION_SCRIPTS:
MUTATE:
COLUMN_MAPPINGS:
SCRIPTS:
IOS:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
ACTIVITY_TYPE: FLAG_TO_MUTATE
ACTIVITY_NAME: FLAG_TO_MUTATE
CONFIDENCE: confidence
FLAG_AS_EXTRA: activities
MUTATION_SCRIPTS:
- "src/data/streams/mutations/phone/aware/activity_recogniton_ios_unification.R"
MUTATE:
COLUMN_MAPPINGS:
ACTIVITIES: activities
SCRIPTS:
- "src/data/streams/mutations/phone/aware/activity_recogniton_ios_unification.R"
```
??? "Example activity_recogniton_ios_unification.R"
@ -304,7 +350,7 @@ PHONE_ACTIVITY_RECOGNITION:
return(ios_gar)
}
main <- function(data){
main <- function(data, stream_parameters){
return(unify_ios_activity_recognition(data))
}
```

View File

@ -29,7 +29,7 @@ Stream columns named `FLAG_TO_MUTATE` means they are extracted from the `FLAG_AS
=== "ANDROID"
**COLUMN_MAPPINGS**
**RAPIDS_COLUMN_MAPPINGS**
| RAPIDS column | Stream column |
|-----------------|-----------------|
@ -51,7 +51,7 @@ Stream columns named `FLAG_TO_MUTATE` means they are extracted from the `FLAG_AS
=== "ANDROID"
**COLUMN_MAPPINGS**
**RAPIDS_COLUMN_MAPPINGS**
| RAPIDS column | Stream column |
|-----------------|-----------------|
@ -67,7 +67,7 @@ Stream columns named `FLAG_TO_MUTATE` means they are extracted from the `FLAG_AS
=== "IOS"
**COLUMN_MAPPINGS**
**RAPIDS_COLUMN_MAPPINGS**
| RAPIDS column | Stream column |
|-----------------|-----------------|
@ -136,7 +136,7 @@ Stream columns named `FLAG_TO_MUTATE` means they are extracted from the `FLAG_AS
=== "ANDROID"
**COLUMN_MAPPINGS**
**RAPIDS_COLUMN_MAPPINGS**
| RAPIDS column | Stream column |
|----------------------|---------------------|
@ -153,7 +153,7 @@ Stream columns named `FLAG_TO_MUTATE` means they are extracted from the `FLAG_AS
=== "IOS"
**COLUMN_MAPPINGS**
**RAPIDS_COLUMN_MAPPINGS**
Same as ANDROID

View File

@ -17,7 +17,7 @@ The `format.yaml` maps and transforms columns in your raw data stream to the [ma
src/data/streams/fitbitjson_mysql/format.yaml
```
If you want RAPIDS to process Fitbit sensor data using this stream, you will need to replace the following `COLUMN_MAPPINGS` inside **each sensor** section in `format.yaml` to match your raw data column names:
If you want RAPIDS to process Fitbit sensor data using this stream, you will need to replace the following `RAPIDS_COLUMN_MAPPINGS` inside **each sensor** section in `format.yaml` to match your raw data column names:
| Column | Description |
|-----------------|-----------------|
@ -28,7 +28,7 @@ If you want RAPIDS to process Fitbit sensor data using this stream, you will nee
??? info "FITBIT_HEARTRATE_SUMMARY section"
**COLUMN_MAPPINGS**
**RAPIDS_COLUMN_MAPPINGS**
| RAPIDS column | Stream column |
|-----------------|-----------------|

View File

@ -20,14 +20,14 @@ The `format.yaml` maps and transforms columns in your raw data stream to the [ma
src/data/streams/fitbitparsed_mysql/format.yaml
```
If you want RAPIDS to process Fitbit sensor data using this stream, you will need to replace any `COLUMN_MAPPINGS` inside **each sensor** section in `format.yaml` to match your raw data column names.
If you want RAPIDS to process Fitbit sensor data using this stream, you will need to replace any `RAPIDS_COLUMN_MAPPINGS` inside **each sensor** section in `format.yaml` to match your raw data column names.
All columns are mandatory, however, all except `device_id` and `local_date_time` can be empty if you don't have that data. Just have in mind that some features will be empty if some of these columns are empty.
??? info "FITBIT_HEARTRATE_SUMMARY section"
**COLUMN_MAPPINGS**
**RAPIDS_COLUMN_MAPPINGS**
| RAPIDS column | Stream column |
|-----------------|-----------------|

View File

@ -52,7 +52,14 @@ def pull_phone_data_input_with_mutation_scripts(wilcards):
raise ValueError("{sensor} is not defined in the schema {schema}".format(sensor=sensor, schema=input.get("stream_format")))
for device_os in schema[sensor].keys():
scripts = schema[sensor][device_os]["MUTATION_SCRIPTS"]
if "MUTATION" not in schema[sensor][device_os]:
raise ValueError("MUTATION is missing from [{sensor}][{device_os}] of {schema}".format(sensor=sensor, device_os=device_os,schema=input.get("stream_format")))
if "COLUMN_MAPPINGS" not in schema[sensor][device_os]["MUTATION"]:
raise ValueError("COLUMN_MAPPINGS is missing from [{sensor}][{device_os}][MUTATION] of {schema}".format(sensor=sensor, device_os=device_os, schema=input.get("stream_format")))
if "SCRIPTS" not in schema[sensor][device_os]["MUTATION"]:
raise ValueError("SCRIPTS is missing from [{sensor}][{device_os}][MUTATION] of {schema}".format(sensor=sensor, device_os=device_os, schema=input.get("stream_format")))
scripts = schema[sensor][device_os]["MUTATION"]["SCRIPTS"]
if isinstance(scripts, list):
for idx, script in enumerate(scripts):
if not script.lower().endswith((".py", ".r")):
@ -70,62 +77,40 @@ def input_tzcodes_file(wilcards):
return [config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"]]
return []
def pull_empatica_data_input_with_mutation_scripts(wilcards):
def pull_wearable_data_input_with_mutation_scripts(wilcards):
import yaml
from pathlib import Path
input = dict()
empatica_stream = config["EMPATICA_DATA_STREAMS"]["USE"]
device = wilcards.device_type.upper()
device_stream = config[device+"_DATA_STREAMS"]["USE"]
input["participant_file"] = "data/external/participant_files/{pid}.yaml"
input["rapids_schema_file"] = "src/data/streams/rapids_columns.yaml"
input["stream_format"] = "src/data/streams/" + empatica_stream + "/format.yaml"
input["stream_format"] = "src/data/streams/" + device_stream + "/format.yaml"
if Path("src/data/streams/"+ empatica_stream + "/container.R").exists():
input["stream_container"] = "src/data/streams/"+ empatica_stream + "/container.R"
elif Path("src/data/streams/"+ empatica_stream + "/container.py").exists():
input["stream_container"] = "src/data/streams/"+ empatica_stream + "/container.py"
if Path("src/data/streams/"+ device_stream + "/container.R").exists():
input["stream_container"] = "src/data/streams/"+ device_stream + "/container.R"
elif Path("src/data/streams/"+ device_stream + "/container.py").exists():
input["stream_container"] = "src/data/streams/"+ device_stream + "/container.py"
else:
raise ValueError("The container script for {stream} is missing: src/data/streams/{stream}/container.[py|R]".format(stream=empatica_stream))
raise ValueError("The container script for {stream} is missing: src/data/streams/{stream}/container.[py|R]".format(stream=device_stream))
schema = yaml.load(open(input.get("stream_format"), 'r'), Loader=yaml.FullLoader)
sensor = ("empatica_" + wilcards.sensor).upper()
sensor = (device + "_" + wilcards.sensor).upper()
if sensor not in schema:
raise ValueError("{sensor} is not defined in the schema {schema}".format(sensor=sensor, schema=input.get("stream_format")))
scripts = schema[sensor]["MUTATION_SCRIPTS"]
if "MUTATION" not in schema[sensor]:
raise ValueError("MUTATION is missing from [{sensor}] of {schema}".format(sensor=sensor, schema=input.get("stream_format")))
if "COLUMN_MAPPINGS" not in schema[sensor]["MUTATION"]:
raise ValueError("COLUMN_MAPPINGS is missing from [{sensor}][MUTATION] of {schema}".format(sensor=sensor, schema=input.get("stream_format")))
if "SCRIPTS" not in schema[sensor]["MUTATION"]:
raise ValueError("SCRIPTS is missing from [{sensor}][MUTATION] of {schema}".format(sensor=sensor, schema=input.get("stream_format")))
scripts = schema[sensor]["MUTATION"]["SCRIPTS"]
if isinstance(scripts, list):
for idx, script in enumerate(scripts):
if not script.lower().endswith((".py", ".r")):
raise ValueError("Mutate scripts can only be Python or R scripts (.py, .R).\n Instead we got {script} in \n [{sensor}] of {schema}".format(script=script, sensor=sensor, schema=input.get("stream_format")))
input["mutationscript"+str(idx)] = script
return input
def pull_fitbit_data_input_with_mutation_scripts(wilcards):
import yaml
from pathlib import Path
input = dict()
fitbit_stream = config["FITBIT_DATA_STREAMS"]["USE"]
input["participant_file"] = "data/external/participant_files/{pid}.yaml"
input["rapids_schema_file"] = "src/data/streams/rapids_columns.yaml"
input["stream_format"] = "src/data/streams/" + fitbit_stream + "/format.yaml"
if Path("src/data/streams/"+ fitbit_stream + "/container.R").exists():
input["stream_container"] = "src/data/streams/"+ fitbit_stream + "/container.R"
elif Path("src/data/streams/"+ fitbit_stream + "/container.py").exists():
input["stream_container"] = "src/data/streams/"+ fitbit_stream + "/container.py"
else:
raise ValueError("The container script for {stream} is missing: src/data/streams/{stream}/container.[py|R]".format(stream=fitbit_stream))
schema = yaml.load(open(input.get("stream_format"), 'r'), Loader=yaml.FullLoader)
sensor = ("fitbit_" + wilcards.sensor).upper()
if sensor not in schema:
raise ValueError("{sensor} is not defined in the schema {schema}".format(sensor=sensor, schema=input.get("stream_format")))
scripts = schema[sensor]["MUTATION_SCRIPTS"]
if isinstance(scripts, list):
for idx, script in enumerate(scripts):
if not script.lower().endswith((".py", ".r")):
raise ValueError("Mutate scripts can only be Python or R scripts (.py, .R).\n Instead we got {script} in \n [{sensor}] of {schema}".format(script=script, sensor=sensor, schema=input.get("stream_format")))
raise ValueError("Mutate scripts can only be Python or R scripts (.py, .R).\n Instead we got {script} in [{sensor}] of {schema}".format(script=script, sensor=sensor, schema=input.get("stream_format")))
input["mutationscript"+str(idx)] = script
return input

View File

@ -184,17 +184,20 @@ rule phone_application_categories:
script:
"../src/data/application_categories.R"
rule pull_fitbit_data:
input: unpack(pull_fitbit_data_input_with_mutation_scripts)
rule pull_wearable_data:
input: unpack(pull_wearable_data_input_with_mutation_scripts)
params:
data_configuration = config["FITBIT_DATA_STREAMS"][config["FITBIT_DATA_STREAMS"]["USE"]],
sensor = "fitbit_" + "{sensor}",
tables = lambda wildcards: config["FITBIT_" + str(wildcards.sensor).upper()]["TABLE"],
data_configuration = lambda wildcards: config[wildcards.device_type.upper() +"_DATA_STREAMS"][config[wildcards.device_type.upper() +"_DATA_STREAMS"]["USE"]],
device_type = "{device_type}",
sensor = "{device_type}" + "_" + "{sensor}",
pid = "{pid}",
tables = lambda wildcards: config[wildcards.device_type.upper() + "_" + str(wildcards.sensor).upper()]["TABLE"],
wildcard_constraints:
device_type="(empatica|fitbit)"
output:
"data/raw/{pid}/fitbit_{sensor}_raw.csv"
"data/raw/{pid}/{device_type}_{sensor}_raw.csv"
script:
"../src/data/streams/pull_fitbit_data.R"
"../src/data/streams/pull_wearable_data.R"
rule fitbit_readable_datetime:
input:
@ -213,60 +216,6 @@ rule fitbit_readable_datetime:
script:
"../src/data/datetime/readable_datetime.R"
# rule fitbit_parse_heartrate:
# input:
# participant_file = "data/external/participant_files/{pid}.yaml",
# raw_data = "data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_raw.csv"
# params:
# timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"],
# table = lambda wildcards: config["FITBIT_HEARTRATE_"+str(wildcards.fitbit_data_type).upper()]["TABLE"],
# column_format = config["FITBIT_DATA_CONFIGURATION"]["SOURCE"]["COLUMN_FORMAT"],
# fitbit_data_type = "{fitbit_data_type}"
# output:
# "data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_parsed.csv"
# script:
# "../src/data/fitbit_parse_heartrate.py"
# rule fitbit_parse_steps:
# input:
# participant_file = "data/external/participant_files/{pid}.yaml",
# raw_data = "data/raw/{pid}/fitbit_steps_{fitbit_data_type}_raw.csv"
# params:
# timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"],
# table = lambda wildcards: config["FITBIT_STEPS_"+str(wildcards.fitbit_data_type).upper()]["TABLE"],
# column_format = config["FITBIT_DATA_CONFIGURATION"]["SOURCE"]["COLUMN_FORMAT"],
# fitbit_data_type = "{fitbit_data_type}"
# output:
# "data/raw/{pid}/fitbit_steps_{fitbit_data_type}_parsed.csv"
# script:
# "../src/data/fitbit_parse_steps.py"
# rule fitbit_parse_sleep:
# input:
# participant_file = "data/external/participant_files/{pid}.yaml",
# raw_data = "data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_raw.csv"
# params:
# timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"],
# table = lambda wildcards: config["FITBIT_SLEEP_"+str(wildcards.fitbit_data_type).upper()]["TABLE"],
# column_format = config["FITBIT_DATA_CONFIGURATION"]["SOURCE"]["COLUMN_FORMAT"],
# fitbit_data_type = "{fitbit_data_type}",
# sleep_episode_timestamp = config["FITBIT_SLEEP_SUMMARY"]["SLEEP_EPISODE_TIMESTAMP"]
# output:
# "data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_parsed.csv"
# script:
# "../src/data/fitbit_parse_sleep.py"
rule pull_empatica_data:
input: unpack(pull_empatica_data_input_with_mutation_scripts)
params:
data_configuration = config["EMPATICA_DATA_STREAMS"][config["EMPATICA_DATA_STREAMS"]["USE"]],
sensor = "empatica_" + "{sensor}",
pid = "{pid}"
output:
"data/raw/{pid}/empatica_{sensor}_raw.csv"
script:
"../src/data/streams/pull_empatica_data.R"
rule empatica_readable_datetime:
input:
sensor_input = "data/raw/{pid}/empatica_{sensor}_raw.csv",

View File

@ -61,9 +61,9 @@ infer_device_os <- function(stream_parameters, device){
#' @param columns the columns needed from this sensor (we recommend to only return these columns instead of every column in sensor_container)
#' @return A dataframe with the sensor data for device
pull_data <- function(stream_parameters, device, sensor_container, columns){
pull_data <- function(stream_parameters, device, sensor, sensor_container, columns){
dbEngine <- get_db_engine(stream_parameters$DATABASE_GROUP)
query <- paste0("SELECT ", paste(columns, collapse = ",")," FROM ", sensor_container, " WHERE device_id = '", device,"'")
query <- paste0("SELECT ", paste(columns, collapse = ",")," FROM ", sensor_container, " WHERE ", columns$DEVICE_ID ," = '", device,"'")
# Letting the user know what we are doing
message(paste0("Executing the following query to download data: ", query))
sensor_data <- dbGetQuery(dbEngine, query)

View File

@ -1,40 +1,48 @@
PHONE_ACCELEROMETER:
ANDROID:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
DOUBLE_VALUES_0: double_values_0
DOUBLE_VALUES_1: double_values_1
DOUBLE_VALUES_2: double_values_2
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
MUTATION:
COLUMN_MAPPINGS:
SCRIPTS: # List any python or r scripts that mutate your raw data
IOS:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
DOUBLE_VALUES_0: double_values_0
DOUBLE_VALUES_1: double_values_1
DOUBLE_VALUES_2: double_values_2
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
MUTATION:
COLUMN_MAPPINGS:
SCRIPTS: # List any python or r scripts that mutate your raw data
PHONE_ACTIVITY_RECOGNITION:
ANDROID:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
ACTIVITY_TYPE: activity_type
ACTIVITY_NAME: activity_name
CONFIDENCE: confidence
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
MUTATION:
COLUMN_MAPPINGS:
SCRIPTS: # List any python or r scripts that mutate your raw data
IOS:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
ACTIVITY_TYPE: FLAG_TO_MUTATE
ACTIVITY_NAME: FLAG_TO_MUTATE
CONFIDENCE: confidence
FLAG_AS_EXTRA: activities
MUTATION_SCRIPTS:
- "src/data/streams/mutations/phone/aware/activity_recogniton_ios_unification.R"
MUTATION:
COLUMN_MAPPINGS:
ACTIVITIES: activities
SCRIPTS: # List any python or r scripts that mutate your raw data
- "src/data/streams/mutations/phone/aware/activity_recogniton_ios_unification.R"
PHONE_APPLICATIONS_FOREGROUND:
ANDROID:
@ -65,21 +73,26 @@ PHONE_BATTERY:
PHONE_CONVERSATION:
ANDROID:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
DOUBLE_ENERGY: double_energy
INFERENCE: inference
DOUBLE_CONVO_START: double_convo_start
DOUBLE_CONVO_END: double_convo_end
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
MUTATION:
COLUMN_MAPPINGS:
SCRIPTS: # List any python or r scripts that mutate your raw data
IOS:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
DOUBLE_ENERGY: double_energy
INFERENCE: inference
DOUBLE_CONVO_START: double_convo_start
DOUBLE_CONVO_END: double_convo_end
MUTATION_SCRIPTS:
- "src/data/streams/mutations/phone/aware/conversation_ios_timestamp.R"
MUTATION:
COLUMN_MAPPINGS:
SCRIPTS: # List any python or r scripts that mutate your raw data
- "src/data/streams/mutations/phone/aware/conversation_ios_timestamp.R"

View File

@ -19,9 +19,9 @@ def readFile(file, dtype):
dict = OrderedDict()
# file is an in-memory buffer
with file as csvfile:
if dtype in ('electrodermal_activity', 'temperature', 'heartrate', 'blood_volume_pulse'):
if dtype in ('EMPATICA_ELECTRODERMAL_ACTIVITY', 'EMPATICA_TEMPERATURE', 'EMPATICA_HEARTRATE', 'EMPATICA_BLOOD_VOLUME_PULSE'):
reader = csv.reader(csvfile, delimiter='\n')
elif dtype == 'accelerometer':
elif dtype == 'EMPATICA_ACCELEROMETER':
reader = csv.reader(csvfile, delimiter=',')
i = 0
for row in reader:
@ -34,9 +34,9 @@ def readFile(file, dtype):
pass
else:
timestamp = timestamp + 1.0 / hertz
if dtype in ('electrodermal_activity', 'temperature', 'heartrate', 'blood_volume_pulse'):
if dtype in ('EMPATICA_ELECTRODERMAL_ACTIVITY', 'EMPATICA_TEMPERATURE', 'EMPATICA_HEARTRATE', 'EMPATICA_BLOOD_VOLUME_PULSE'):
dict[timestamp] = row[0]
elif dtype == 'accelerometer':
elif dtype == 'EMPATICA_ACCELEROMETER':
dict[timestamp] = processAcceleration(row[0], row[1], row[2])
i += 1
return dict
@ -45,15 +45,14 @@ def readFile(file, dtype):
def extract_empatica_data(data, sensor):
sensor_data_file = BytesIO(data).getvalue().decode('utf-8')
sensor_data_file = StringIO(sensor_data_file)
# read sensor data
if sensor in ('electrodermal_activity', 'temperature', 'heartrate', 'blood_volume_pulse'):
if sensor in ('EMPATICA_ELECTRODERMAL_ACTIVITY', 'EMPATICA_TEMPERATURE', 'EMPATICA_HEARTRATE', 'EMPATICA_BLOOD_VOLUME_PULSE'):
ddict = readFile(sensor_data_file, sensor)
df = pd.DataFrame.from_dict(ddict, orient='index', columns=[sensor])
df[sensor] = df[sensor].astype(float)
df.index.name = 'timestamp'
elif sensor == 'accelerometer':
elif sensor == 'EMPATICA_ACCELEROMETER':
ddict = readFile(sensor_data_file, sensor)
df = pd.DataFrame.from_dict(ddict, orient='index', columns=['x', 'y', 'z'])
df['x'] = df['x'].astype(float)
@ -61,7 +60,7 @@ def extract_empatica_data(data, sensor):
df['z'] = df['z'].astype(float)
df.index.name = 'timestamp'
elif sensor == 'inter_beat_interval':
elif sensor == 'EMPATICA_INTER_BEAT_INTERVAL':
df = pd.read_csv(sensor_data_file, names=['timestamp', sensor], header=None)
timestampstart = float(df['timestamp'][0])
df['timestamp'] = (df['timestamp'][1:len(df)]).astype(float) + timestampstart
@ -78,17 +77,8 @@ def extract_empatica_data(data, sensor):
df.index = df.index.astype(int)
return(df)
def pull_data(data_configuration, device, sensor, columns_to_download):
sensor = sensor[9:].lower()
sensor_short_name = {"accelerometer":"ACC",
"temperature":"TEMP",
"tags":"tags",
"heartrate":"HR",
"inter_beat_interval":"IBI",
"blood_volume_pulse":"BVP",
"electrodermal_activity":"EDA"}
sensor_csv = sensor_short_name[sensor] + '.csv'
def pull_data(data_configuration, device, sensor, container, columns_to_download):
sensor_csv = container + '.csv'
warning = True
participant_data = pd.DataFrame(columns=columns_to_download.values())
participant_data.set_index('timestamp', inplace=True)

View File

@ -1,50 +1,64 @@
EMPATICA_ACCELEROMETER:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
DOUBLE_VALUES_0: x
DOUBLE_VALUES_1: 'y'
DOUBLE_VALUES_2: z
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
MUTATION:
COLUMN_MAPPINGS:
SCRIPTS: # List any python or r scripts that mutate your raw data
EMPATICA_HEARTRATE:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
HEARTRATE: heartrate
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
MUTATION:
COLUMN_MAPPINGS:
SCRIPTS: # List any python or r scripts that mutate your raw data
EMPATICA_TEMPERATURE:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
TEMPERATURE: temperature
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
MUTATION:
COLUMN_MAPPINGS:
SCRIPTS: # List any python or r scripts that mutate your raw data
EMPATICA_ELECTRODERMAL_ACTIVITY:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
ELECTRODERMAL_ACTIVITY: electrodermal_activity
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
MUTATION:
COLUMN_MAPPINGS:
SCRIPTS: # List any python or r scripts that mutate your raw data
EMPATICA_BLOOD_VOLUME_PULSE:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
BLOOD_VOLUME_PULSE: blood_volume_pulse
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
MUTATION:
COLUMN_MAPPINGS:
SCRIPTS: # List any python or r scripts that mutate your raw data
EMPATICA_INTER_BEAT_INTERVAL:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
INTER_BEAT_INTERVAL: inter_beat_interval
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
MUTATION:
COLUMN_MAPPINGS:
SCRIPTS: # List any python or r scripts that mutate your raw data
EMPATICA_EMPATICA_TAGS:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: timestamp
DEVICE_ID: device_id
TAGS: tags
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
MUTATION:
COLUMN_MAPPINGS:
SCRIPTS: # List any python or r scripts that mutate your raw data

View File

@ -30,7 +30,7 @@ get_db_engine <- function(group){
#' @param columns the columns needed from this sensor (we recommend to only return these columns instead of every column in sensor_container)
#' @return A dataframe with the sensor data for device
pull_data <- function(stream_parameters, device, sensor_container, columns){
pull_data <- function(stream_parameters, device, sensor, sensor_container, columns){
dbEngine <- get_db_engine(stream_parameters$DATABASE_GROUP)
query <- paste0("SELECT ", paste(columns, collapse = ",")," FROM ", sensor_container, " WHERE ",columns$DEVICE_ID," = '", device,"'")
# Letting the user know what we are doing

View File

@ -1,9 +1,11 @@
FITBIT_STEPS_SUMMARY:
COLUMN_MAPPINGS:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: FLAG_TO_MUTATE
DEVICE_ID: device_id
LOCAL_DATE_TIME: FLAG_TO_MUTATE
STEPS: FLAG_TO_MUTATE
FLAG_AS_EXTRA: fitbit_data # text column with JSON objects
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
- src/data/streams/mutations/fitbit/parse_steps_summary_json.py
MUTATION:
COLUMN_MAPPINGS:
JSON_FITBIT_COLUMN: fitbit_data # text column with JSON objects
SCRIPTS: # List any python or r scripts that mutate your raw data
- src/data/streams/mutations/fitbit/parse_steps_summary_json.py

View File

@ -13,7 +13,7 @@ def parseStepsData(steps_data):
records = []
# Parse JSON into individual records
for record in steps_data.fitbit_data:
for record in steps_data.json_fitbit_column:
record = json.loads(record) # Parse text into JSON
if "activities-steps" in record.keys():
curr_date = datetime.strptime(record["activities-steps"][0]["dateTime"], "%Y-%m-%d")

View File

@ -1,121 +0,0 @@
source("renv/activate.R")
library(yaml)
library(dplyr)
library(readr)
# we use reticulate but only load it if we are going to use it to minimize the case when old RAPIDS deployments need to update ther renv
mutate_data <- function(scripts, data, data_configuration){
for(script in scripts){
if(grepl("\\.(R)$", script)){
myEnv <- new.env()
source(script, local=myEnv)
attach(myEnv, name="sourced_scripts_rapids")
if(exists("main", myEnv)){
message(paste("Applying mutation script", script))
data <- main(data, data_configuration)
} else{
stop(paste0("The following mutation script does not have main function: ", script))
}
# rm(list = ls(envir = myEnv), envir = myEnv, inherits = FALSE)
detach("sourced_scripts_rapids")
} else{ # python
library(reticulate)
module <- gsub(pattern = "\\.py$", "", basename(script))
script_functions <- import_from_path(module, path = dirname(script))
if(py_has_attr(script_functions, "main")){
message(paste("Applying mutation script", script))
data <- script_functions$main(data, data_configuration)
} else{
stop(paste0("The following mutation script does not have a main function: ", script))
}
}
}
return(data)
}
rename_columns <- function(name_maps, data){
for(name in names(name_maps))
data <- data %>% rename(!!tolower(name) := name_maps[[name]])
return(data)
}
validate_expected_columns_mapping <- function(schema, rapids_schema, sensor, rapids_schema_file, stream_format){
columns <- names(schema[[sensor]][["COLUMN_MAPPINGS"]])
columns <- columns[(columns != "FLAG_AS_EXTRA")]
rapids_columns <- rapids_schema[[sensor]]
if(is.null(rapids_columns))
stop(paste(sensor, " columns are not listed in RAPIDS' column specification. If you are adding support for a new phone sensor, add any mandatory columns in ", rapids_schema_file))
if(length(setdiff(rapids_columns, columns)) > 0)
stop(paste(sensor," mappings are missing one or more mandatory columns. The missing column mappings are for ", paste(setdiff(rapids_columns, columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
if(length(setdiff(columns, rapids_columns)) > 0)
stop(paste(sensor," mappings have one or more columns than required, add them as FLAG_AS_EXTRA instead. The extra column mappings are for ", paste(setdiff(columns, rapids_columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
}
load_container_script <- function(stream_container){
language <- if_else(endsWith(tolower(stream_container), "py"), "python", "r")
if(language == "python"){
library(reticulate)
container <- import_from_path(gsub(pattern = "\\.py$", "", basename(stream_container)), path = dirname(stream_container))
if(!py_has_attr(container, "pull_data"))
stop(paste0("The following container.py script does not have a pull_data function: ", stream_container))
return(container$pull_data)
} else if(language == "r"){
source(stream_container)
if(!exists("pull_data"))
stop(paste0("The following container.R script does not have a pull_data function: ", stream_container))
return(pull_data)
}
}
pull_fitbit_data_main <- function(){
participant_file <- snakemake@input[["participant_file"]]
stream_format <- snakemake@input[["stream_format"]]
rapids_schema_file <- snakemake@input[["rapids_schema_file"]]
stream_container <- snakemake@input[["stream_container"]]
data_configuration <- snakemake@params[["data_configuration"]]
pid <- snakemake@params[["pid"]]
table <- snakemake@params[["tables"]]
sensor <- toupper(snakemake@params[["sensor"]])
output_data_file <- snakemake@output[[1]]
participant_data <- read_yaml(participant_file)
stream_schema <- read_yaml(stream_format)
rapids_schema <- read_yaml(rapids_schema_file)
devices <- participant_data$FITBIT$DEVICE_IDS
if(length(devices) == 0)
devices <- c(pid)
validate_expected_columns_mapping(stream_schema, rapids_schema, sensor, rapids_schema_file, stream_format)
expected_columns <- tolower(names(stream_schema[[sensor]][["COLUMN_MAPPINGS"]]))
expected_columns <- expected_columns[(expected_columns != "flag_as_extra")]
participant_data <- setNames(data.frame(matrix(ncol = length(expected_columns), nrow = 0)), expected_columns)
pull_data_container <- load_container_script(stream_container)
for(idx in seq_along(devices)){ #TODO remove length
device <- devices[idx]
message(paste0("\nProcessing ", sensor, " for ", device))
columns_to_download <- stream_schema[[sensor]][["COLUMN_MAPPINGS"]]
columns_to_download <- columns_to_download[(columns_to_download != "FLAG_TO_MUTATE")]
data <- pull_data_container(data_configuration, device, table, columns_to_download)
columns_to_rename <- stream_schema[[sensor]][["COLUMN_MAPPINGS"]]
columns_to_rename <- (columns_to_rename[(columns_to_rename != "FLAG_TO_MUTATE" & names(columns_to_rename) != "FLAG_AS_EXTRA")])
renamed_data <- rename_columns(columns_to_rename, data)
mutation_scripts <- stream_schema[[sensor]][["MUTATION_SCRIPTS"]]
mutated_data <- mutate_data(mutation_scripts, renamed_data, data_configuration)
if(length(setdiff(expected_columns, colnames(mutated_data))) > 0)
stop(paste("The mutated data for", device, "is missing these columns expected by RAPIDS: [", paste(setdiff(expected_columns, colnames(mutated_data)), collapse=","),"]. One ore more mutation scripts in [", sensor,"][MUTATION_SCRIPTS] are removing or not adding these columns"))
participant_data <- rbind(participant_data, mutated_data)
}
write_csv(participant_data, output_data_file)
}
pull_fitbit_data_main()

View File

@ -64,7 +64,7 @@ validate_expected_columns_mapping <- function(schema, rapids_schema, sensor, rap
stop(paste(sensor, " columns are not listed in RAPIDS' column specification. If you are adding support for a new phone sensor, add any mandatory columns in ", rapids_schema_file))
if("ANDROID" %in% schema[[sensor]]){
android_columns <- names(schema[[sensor]][["ANDROID"]][["COLUMN_MAPPINGS"]])
android_columns <- names(schema[[sensor]][["ANDROID"]][["RAPIDS_COLUMN_MAPPINGS"]])
android_columns <- android_columns[(android_columns != "FLAG_AS_EXTRA")]
if(length(setdiff(rapids_columns, android_columns)) > 0)
stop(paste(sensor," mappings are missing one or more mandatory columns for ANDROID. The missing column mappings are for ", paste(setdiff(rapids_columns, android_columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
@ -73,7 +73,7 @@ validate_expected_columns_mapping <- function(schema, rapids_schema, sensor, rap
}
if("IOS" %in% schema[[sensor]]){
ios_columns <- names(schema[[sensor]][["IOS"]][["COLUMN_MAPPINGS"]])
ios_columns <- names(schema[[sensor]][["IOS"]][["RAPIDS_COLUMN_MAPPINGS"]])
ios_columns <- ios_columns[(ios_columns != "FLAG_AS_EXTRA")]
if(length(setdiff(rapids_columns, ios_columns)) > 0)
stop(paste(sensor," mappings are missing one or more mandatory columns for IOS. The missing column mappings are for ", paste(setdiff(rapids_columns, ios_columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
@ -110,6 +110,7 @@ pull_phone_data <- function(){
data_configuration <- snakemake@params[["data_configuration"]]
tables <- snakemake@params[["tables"]]
sensor <- toupper(snakemake@params[["sensor"]])
device_type <- "phone"
output_data_file <- snakemake@output[[1]]
participant_data <- read_yaml(participant_file)
@ -124,9 +125,8 @@ pull_phone_data <- function(){
device_oss <- rep(device_oss, length(devices))
validate_expected_columns_mapping(stream_schema, rapids_schema, sensor, rapids_schema_file, stream_format)
# ANDROID or IOS COLUMN_MAPPINGS are guaranteed to be the same at this point (see validate_expected_columns_mapping function)
# ANDROID or IOS RAPIDS_COLUMN_MAPPINGS are guaranteed to be the same at this point (see validate_expected_columns_mapping function)
expected_columns <- tolower(rapids_schema[[sensor]])
expected_columns <- expected_columns[(expected_columns != "flag_as_extra")]
participant_data <- setNames(data.frame(matrix(ncol = length(expected_columns), nrow = 0)), expected_columns)
container_functions <- load_container_script(stream_container)
@ -145,20 +145,20 @@ pull_phone_data <- function(){
os_table <- ifelse(length(tables) > 1, tables[[toupper(device_os)]], tables) # some sensor tables have a different name for android and ios
columns_to_download <- stream_schema[[sensor]][[toupper(device_os)]][["COLUMN_MAPPINGS"]]
columns_to_download <- c(stream_schema[[sensor]][[toupper(device_os)]][["RAPIDS_COLUMN_MAPPINGS"]], stream_schema[[sensor]][[toupper(device_os)]][["MUTATION"]][["COLUMN_MAPPINGS"]])
columns_to_download <- columns_to_download[(columns_to_download != "FLAG_TO_MUTATE")]
data <- pull_data_container(data_configuration, device, os_table, columns_to_download)
data <- pull_data_container(data_configuration, device, sensor, os_table, columns_to_download)
# Rename all COLUMN_MAPPINGS except those mapped as FLAG_AS_EXTRA or FLAG_TO_MUTATE
columns_to_rename <- stream_schema[[sensor]][[toupper(device_os)]][["COLUMN_MAPPINGS"]]
columns_to_rename <- (columns_to_rename[(columns_to_rename != "FLAG_TO_MUTATE" & names(columns_to_rename) != "FLAG_AS_EXTRA")])
renamed_data <- rename_columns(columns_to_rename, data)
if(!setequal(columns_to_download, colnames(data)))
stop(paste0("The pulled data for ", device, " does not have the expected columns (including [RAPIDS_COLUMN_MAPPINGS] and [MUTATE][COLUMN_MAPPINGS]). The container script returned [", paste(colnames(data), collapse=","),"] but the format mappings expected [",paste(columns_to_download, collapse=","), "]. The conainer script is: ", stream_container))
mutation_scripts <- stream_schema[[sensor]][[toupper(device_os)]][["MUTATION_SCRIPTS"]]
renamed_data <- rename_columns(columns_to_download, data)
mutation_scripts <- stream_schema[[sensor]][[toupper(device_os)]][["MUTATION"]][["SCRIPTS"]]
mutated_data <- mutate_data(mutation_scripts, renamed_data, data_configuration)
if(length(setdiff(expected_columns, colnames(mutated_data))) > 0)
stop(paste("The mutated data for", device, "is missing these columns expected by RAPIDS: [", paste(setdiff(expected_columns, colnames(mutated_data)), collapse=","),"]. One ore more mutation scripts in [", sensor,"][",toupper(device_os), "]","[MUTATION_SCRIPTS] are removing or not adding these columns"))
if(!setequal(expected_columns, colnames(mutated_data)))
stop(paste0("The mutated data for ", device, " does not have the columns RAPIDS expects. The container script returned [", paste(colnames(mutated_data), collapse=","),"] but RAPIDS expected [",paste(expected_columns, collapse=","), "]. One ore more mutation scripts in [", sensor,"][MUTATION][SCRIPTS] are adding extra columns or removing or not adding the ones expected"))
participant_data <- rbind(participant_data, mutated_data)
}

View File

@ -41,8 +41,7 @@ rename_columns <- function(name_maps, data){
}
validate_expected_columns_mapping <- function(schema, rapids_schema, sensor, rapids_schema_file, stream_format){
columns <- names(schema[[sensor]][["COLUMN_MAPPINGS"]])
columns <- columns[(columns != "FLAG_AS_EXTRA")]
columns <- names(schema[[sensor]][["RAPIDS_COLUMN_MAPPINGS"]])
rapids_columns <- rapids_schema[[sensor]]
if(is.null(rapids_columns))
@ -50,7 +49,7 @@ validate_expected_columns_mapping <- function(schema, rapids_schema, sensor, rap
if(length(setdiff(rapids_columns, columns)) > 0)
stop(paste(sensor," mappings are missing one or more mandatory columns. The missing column mappings are for ", paste(setdiff(rapids_columns, columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
if(length(setdiff(columns, rapids_columns)) > 0)
stop(paste(sensor," mappings have one or more columns than required, add them as FLAG_AS_EXTRA instead. The extra column mappings are for ", paste(setdiff(columns, rapids_columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
stop(paste(sensor," mappings have one or more columns than required. If you mutation scripts need them, add them as [MUTATION][COLUMN_MAPPINGS] instead. The extra column mappings are for ", paste(setdiff(columns, rapids_columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
}
load_container_script <- function(stream_container){
@ -69,14 +68,15 @@ load_container_script <- function(stream_container){
}
}
pull_empatica_data_main <- function(){
pull_wearable_data_main <- function(){
participant_file <- snakemake@input[["participant_file"]]
stream_format <- snakemake@input[["stream_format"]]
rapids_schema_file <- snakemake@input[["rapids_schema_file"]]
stream_container <- snakemake@input[["stream_container"]]
data_configuration <- snakemake@params[["data_configuration"]]
pid <- snakemake@params[["pid"]]
table <- snakemake@params[["table"]]
table <- snakemake@params[["tables"]]
device_type <- snakemake@params[["device_type"]]
sensor <- toupper(snakemake@params[["sensor"]])
output_data_file <- snakemake@output[[1]]
@ -84,12 +84,11 @@ pull_empatica_data_main <- function(){
participant_data <- read_yaml(participant_file)
stream_schema <- read_yaml(stream_format)
rapids_schema <- read_yaml(rapids_schema_file)
devices <- participant_data$EMPATICA$DEVICE_IDS
devices <- participant_data[[toupper(device_type)]]$DEVICE_IDS
if(length(devices) == 0)
devices <- c(pid)
validate_expected_columns_mapping(stream_schema, rapids_schema, sensor, rapids_schema_file, stream_format)
expected_columns <- tolower(names(stream_schema[[sensor]][["COLUMN_MAPPINGS"]]))
expected_columns <- expected_columns[(expected_columns != "flag_extra")]
expected_columns <- tolower(names(stream_schema[[sensor]][["RAPIDS_COLUMN_MAPPINGS"]]))
participant_data <- setNames(data.frame(matrix(ncol = length(expected_columns), nrow = 0)), expected_columns)
pull_data_container <- load_container_script(stream_container)
@ -98,19 +97,20 @@ pull_empatica_data_main <- function(){
device <- devices[idx]
message(paste0("\nProcessing ", sensor, " for ", device))
columns_to_download <- stream_schema[[sensor]][["COLUMN_MAPPINGS"]]
columns_to_download <- c(stream_schema[[sensor]][["RAPIDS_COLUMN_MAPPINGS"]], stream_schema[[sensor]][["MUTATION"]][["COLUMN_MAPPINGS"]])
columns_to_download <- columns_to_download[(columns_to_download != "FLAG_TO_MUTATE")]
data <- pull_data_container(data_configuration, device, sensor, columns_to_download)
data <- pull_data_container(data_configuration, device, sensor, table, columns_to_download)
columns_to_rename <- stream_schema[[sensor]][["COLUMN_MAPPINGS"]]
columns_to_rename <- (columns_to_rename[(columns_to_rename != "FLAG_TO_MUTATE" & names(columns_to_rename) != "FLAG_AS_EXTRA")])
renamed_data <- rename_columns(columns_to_rename, data)
if(!setequal(columns_to_download, colnames(data)))
stop(paste0("The pulled data for ", device, " does not have the expected columns (including [RAPIDS_COLUMN_MAPPINGS] and [MUTATE][COLUMN_MAPPINGS]). The container script returned [", paste(colnames(data), collapse=","),"] but the format mappings expected [",paste(columns_to_download, collapse=","), "]. The conainer script is: ", stream_container))
renamed_data <- rename_columns(columns_to_download, data)
mutation_scripts <- stream_schema[[sensor]][["MUTATION_SCRIPTS"]]
mutation_scripts <- stream_schema[[sensor]][["MUTATION"]][["SCRIPTS"]]
mutated_data <- mutate_data(mutation_scripts, renamed_data, data_configuration)
if(length(setdiff(expected_columns, colnames(mutated_data))) > 0)
stop(paste("The mutated data for", device, "is missing these columns expected by RAPIDS: [", paste(setdiff(expected_columns, colnames(mutated_data)), collapse=","),"]. One ore more mutation scripts in [", sensor,"][",toupper(device_os), "]","[MUTATION_SCRIPTS] are removing or not adding these columns"))
if(!setequal(expected_columns, colnames(mutated_data)))
stop(paste0("The mutated data for ", device, " does not have the columns RAPIDS expects. The container script returned [", paste(colnames(mutated_data), collapse=","),"] but RAPIDS expected [",paste(expected_columns, collapse=","), "]. One ore more mutation scripts in [", sensor,"][MUTATION][SCRIPTS] are adding extra columns or removing or not adding the ones expected"))
participant_data <- rbind(participant_data, mutated_data)
}
@ -118,4 +118,4 @@ pull_empatica_data_main <- function(){
write_csv(participant_data, output_data_file)
}
pull_empatica_data_main()
pull_wearable_data_main()