Rename download_data add support for py containers

pull/128/head
JulioV 2021-03-08 15:58:26 -05:00
parent 4b33ee43ba
commit 41711fcdb7
9 changed files with 120 additions and 77 deletions

View File

@ -50,13 +50,13 @@ TIMEZONE:
########################################################################################################################
# See https://www.rapids.science/latest/setup/configuration/#device-data-source-configuration
PHONE_DATA_CONFIGURATION:
SOURCE:
TYPE: aware_mysql
DATABASE_GROUP: *database_group
TIMEZONE:
TYPE: SINGLE
VALUE: *timezone
PHONE_DATA_STREAMS:
USE: aware_mysql
# AVAILABLE:
aware_mysql:
DATABASE_GROUP: MY_GROUP
# Sensors ------

View File

@ -14,38 +14,51 @@ The most common cases when you would want to implement a new data stream are:
## Formats and Containers in RAPIDS
**CONTAINER**. The container of a data stream is queried using a `container.R` script. This script implements functions that will pull data from a database, file, etc.
**CONTAINER**. The container of a data stream is queried using a `container.[R|py]` script. This script implements functions that will pull data from a database, file, etc.
**FORMAT**. The format of a data stream is described using a `format.yaml` file. A format file describes the mapping between your stream's raw data and the data that RAPIDS needs.
Both the `container.R` and the `format.yaml` are saved under `src/data/streams/[stream_name]` where `[stream_name]` can be
Both the `container.[R|py]` and the `format.yaml` are saved under `src/data/streams/[stream_name]` where `[stream_name]` can be
`aware_mysql` for example.
## Implement a Container
The `container.R` script of a data stream should be implemented in R. This script must have two functions if you are implementing a stream for phone data, or one function otherwise.
The `container` script of a data stream should be implemented in R (strongly recommended) or python. This script must have two functions if you are implementing a stream for phone data or one function otherwise. The script can contain any other auxiliary functions that your data stream might need.
=== "download_data"
First of all, add any parameters your script might need in `config.yaml` under `(device)_DATA_STREAMS`. These parameters will be available in the `stream_parameters` argument of the one or two functions you implement. For example, if you are adding support for `Beiwe` data stored in `PostgreSQL` and your container needs a set of credentials to connect to a database, your new data stream configuration would be:
```yaml hl_lines="7 8"
PHONE_DATA_STREAMS:
USE: aware_python
# AVAILABLE:
aware_mysql:
DATABASE_GROUP: MY_GROUP
beiwe_postgresql:
DATABASE_GROUP: MY_GROUP # users define this group (user, password, host, etc.) in credentials.yaml
```
Then implement one or both of the following functions:
=== "pull_data"
This function returns the data columns for a specific sensor and participant. It has the following parameters:
| Param | Description |
|--------------------|-------------------------------------------------------------------------------------------------------|
| data_configuration | Any parameters (keys/values) set by the user in any `[DEVICE_DATA_STREAMS][stream_name]` key of `config.yaml`. For example, `[DATABASE_GROUP]` inside `[FITBIT_DATA_STREAMS][fitbitjson_mysql]` |
| stream_parameters | Any parameters (keys/values) set by the user in any `[DEVICE_DATA_STREAMS][stream_name]` key of `config.yaml`. For example, `[DATABASE_GROUP]` inside `[FITBIT_DATA_STREAMS][fitbitjson_mysql]` |
| sensor_container | The value set by the user in any `[DEVICE_SENSOR][CONTAINER]` key of `config.yaml`. It can be a table, file path, or whatever data source you want to support that contains the **data from a single sensor for all participants**. For example, `[PHONE_ACCELEROMETER][CONTAINER]`|
| device | The device id that you need to get the data for (this is set by the user in the [participant files](../../setup/configuration/#participant-files)). For example, in AWARE this device is a uuid|
| columns | A list of the columns that you need to get from `sensor_container`. You specify these columns in your stream's `format.yaml`|
!!! example
This is the `download_data` function we implemented for `aware_mysql`. Note that we can `message`, `warn` or `stop` the user during execution.
This is the `pull_data` function we implemented for `aware_mysql`. Note that we can `message`, `warn` or `stop` the user during execution.
```r
download_data <- function(data_configuration, device, sensor_container, columns){
group <- data_configuration$SOURCE$DATABASE_GROUP
dbEngine <- dbConnect(MariaDB(), default.file = "./.env", group = group)
pull_data <- function(stream_parameters, device, sensor_container, columns){
# get_db_engine is an auxiliary function not shown here for brevity bu can be found in src/data/streams/aware_mysql/container.R
dbEngine <- get_db_engine(stream_parameters$DATABASE_GROUP)
query <- paste0("SELECT ", paste(columns, collapse = ",")," FROM ", sensor_container, " WHERE device_id = '", device,"'")
# Letting the user know what we are doing
message(paste0("Executing the following query to download data: ", query))
@ -65,17 +78,17 @@ The `container.R` script of a data stream should be implemented in R. This scrip
!!! warning
This function is only necessary for phone data streams.
RAPIDS allows users to use the keyword `infer` (previously `multiple`) to [automatically infer](../../setup/configuration/#structure-of-participants-files) the mobile Operative System a device (phone) was running.
RAPIDS allows users to use the keyword `infer` (previously `multiple`) to [automatically infer](../../setup/configuration/#structure-of-participants-files) the mobile Operative System a phone was running.
If you have a way to infer the OS of a device id, implement this function. For example, for AWARE data we use the `aware_device` table.
If you don't have a way to infer the OS, call `stop("Error Message")` so other users know they can't use `infer` or the inference failed, and they have to assign the OS manually in the participant file.
This function returns the operative system (`android` or `ios`) for a specific device. It has the following parameters:
This function returns the operative system (`android` or `ios`) for a specific phone device id. It has the following parameters:
| Param | Description |
|--------------------|-------------------------------------------------------------------------------------------------------|
| data_configuration | Any parameters (keys/values) set by the user in any `[DEVICE_DATA_STREAMS][stream_name]` key of `config.yaml`. For example, `[DATABASE_GROUP]` inside `[FITBIT_DATA_STREAMS][fitbitjson_mysql]` |
| stream_parameters | Any parameters (keys/values) set by the user in any `[DEVICE_DATA_STREAMS][stream_name]` key of `config.yaml`. For example, `[DATABASE_GROUP]` inside `[FITBIT_DATA_STREAMS][fitbitjson_mysql]` |
| device | The device id that you need to infer the OS for (this is set by the user in the [participant files](../../setup/configuration/#participant-files)). For example, in AWARE this device is a uuid|
@ -83,8 +96,9 @@ The `container.R` script of a data stream should be implemented in R. This scrip
This is the `infer_device_os` function we implemented for `aware_mysql`. Note that we can `message`, `warn` or `stop` the user during execution.
```r
infer_device_os <- function(data_configuration, device){
group <- data_configuration$SOURCE$DATABASE_GROUP # specified DB credentials group in config.yaml
infer_device_os <- function(stream_parameters, device){
# get_db_engine is an auxiliary function not shown here for brevity bu can be found in src/data/streams/aware_mysql/container.R
group <- stream_parameters$DATABASE_GROUP
dbEngine <- dbConnect(MariaDB(), default.file = "./.env", group = group)
query <- paste0("SELECT device_id,brand FROM aware_device WHERE device_id = '", device, "'")

View File

@ -41,7 +41,7 @@
## Every time I run force the download_dataset rule all rules are executed
???+ failure "Problem"
When running `snakemake -j1 -R download_phone_data` or `./rapids -j1 -R download_phone_data` all the rules and files are re-computed
When running `snakemake -j1 -R pull_phone_data` or `./rapids -j1 -R pull_phone_data` all the rules and files are re-computed
???+ done "Solution"
This is expected behavior. The advantage of using `snakemake` under the hood is that every time a file containing data is modified every rule that depends on that file will be re-executed to update their results. In this case, since `download_dataset` updates all the raw data, and you are forcing the rule with the flag `-R` every single rule that depends on those raw files will be executed.
@ -215,7 +215,7 @@
```bash
R -e 'renv::install("RMySQL")'
```
- Go to `src/data/download_phone_data.R` or `src/data/download_fitbit_data.R` and replace `library(RMariaDB)` with `library(RMySQL)`
- Go to `src/data/streams/pull_phone_data.R` or `src/data/streams/pull_fitbit_data.R` and replace `library(RMariaDB)` with `library(RMySQL)`
- In the same file(s) replace `dbEngine <- dbConnect(MariaDB(), default.file = "./.env", group = group)` with `dbEngine <- dbConnect(MySQL(), default.file = "./.env", group = group)`
## There is no package called `RMariaDB`

View File

@ -30,26 +30,33 @@ def get_phone_sensor_names():
phone_sensor_names.append(config_key)
return phone_sensor_names
def download_phone_data_input_with_mutation_scripts(wilcards):
def pull_phone_data_input_with_mutation_scripts(wilcards):
import yaml
input = dict()
phone_source_type = config["PHONE_DATA_CONFIGURATION"]["SOURCE"]["TYPE"]
phone_stream = config["PHONE_DATA_STREAMS"]["USE"]
input["participant_file"] = "data/external/participant_files/{pid}.yaml"
input["rapids_schema_file"] = "src/data/streams/rapids_columns.yaml"
input["source_schema_file"] = "src/data/streams/" + phone_source_type + "/format.yaml"
input["source_download_file"] = "src/data/streams/"+ phone_source_type + "/container.R"
input["stream_format"] = "src/data/streams/" + phone_stream + "/format.yaml"
schema = yaml.load(open(input.get("source_schema_file"), 'r'), Loader=yaml.FullLoader)
if Path("src/data/streams/"+ phone_stream + "/container.R").exists():
input["stream_container"] = "src/data/streams/"+ phone_stream + "/container.R"
elif Path("src/data/streams/"+ phone_stream + "/container.py").exists():
input["stream_container"] = "src/data/streams/"+ phone_stream + "/container.py"
else:
raise ValueError("The container script for {stream} is missing: src/data/streams/{stream}/container.[py|R]".format(stream=empatica_stream))
schema = yaml.load(open(input.get("stream_format"), 'r'), Loader=yaml.FullLoader)
sensor = ("phone_" + wilcards.sensor).upper()
if sensor not in schema:
raise ValueError("{sensor} is not defined in the schema {schema}".format(sensor=sensor, schema=input.get("source_schema_file")))
raise ValueError("{sensor} is not defined in the schema {schema}".format(sensor=sensor, schema=input.get("stream_format")))
for device_os in ["ANDROID", "IOS"]:
scripts = schema[sensor][device_os]["MUTATION_SCRIPTS"]
if isinstance(scripts, list):
for idx, script in enumerate(scripts):
if not script.lower().endswith((".py", ".r")):
raise ValueError("Mutate scripts can only be Python or R scripts (.py, .R).\n Instead we got {script} in \n [{sensor}][{device_os}] of {schema}".format(script=script, sensor=sensor, device_os=device_os, schema=input.get("source_schema_file")))
raise ValueError("Mutate scripts can only be Python or R scripts (.py, .R).\n Instead we got {script} in \n [{sensor}][{device_os}] of {schema}".format(script=script, sensor=sensor, device_os=device_os, schema=input.get("stream_format")))
input["mutationscript"+str(idx)] = script
return input

View File

@ -23,16 +23,16 @@ rule create_participants_files:
script:
"../src/data/create_participants_files.R"
rule download_phone_data:
input: unpack(download_phone_data_input_with_mutation_scripts)
rule pull_phone_data:
input: unpack(pull_phone_data_input_with_mutation_scripts)
params:
data_configuration = config["PHONE_DATA_CONFIGURATION"],
data_configuration = config["PHONE_DATA_STREAMS"][config["PHONE_DATA_STREAMS"]["USE"]],
sensor = "phone_" + "{sensor}",
tables = lambda wildcards: config["PHONE_" + str(wildcards.sensor).upper()]["TABLE"],
output:
"data/raw/{pid}/phone_{sensor}_raw.csv"
script:
"../src/data/download_phone_data.R"
"../src/data/streams/pull_phone_data.R"
rule download_fitbit_data:
input:
@ -275,7 +275,7 @@ rule pull_empatica_data:
output:
"data/raw/{pid}/empatica_{sensor}_raw.csv"
script:
"../src/data/pull_empatica_data.R"
"../src/data/streams/pull_empatica_data.R"
rule empatica_readable_datetime:
input:

View File

@ -33,12 +33,12 @@ get_db_engine <- function(group){
#' If you don't have a way to infer the OS, call stop("Error Message") so other users know they can't use "infer" or the inference failed,
#' and they have to assign the OS manually in the participant file
#'
#' @param data_configuration The PHONE_DATA_CONFIGURATION key in config.yaml. If you need specific parameters add them there.
#' @param stream_parameters The PHONE_STREAM_PARAMETERS key in config.yaml. If you need specific parameters add them there.
#' @param device A device ID string
#' @return The OS the device ran, "android" or "ios"
infer_device_os <- function(data_configuration, device){
dbEngine <- get_db_engine(data_configuration$SOURCE$DATABASE_GROUP)
infer_device_os <- function(stream_parameters, device){
dbEngine <- get_db_engine(stream_parameters$DATABASE_GROUP)
query <- paste0("SELECT device_id,brand FROM aware_device WHERE device_id = '", device, "'")
message(paste0("Executing the following query to infer phone OS: ", query))
os <- dbGetQuery(dbEngine, query)
@ -55,14 +55,14 @@ infer_device_os <- function(data_configuration, device){
#' @description
#' Gets the sensor data for a specific device id from a database table, file or whatever source you want to query
#'
#' @param data_configuration The PHONE_DATA_CONFIGURATION key in config.yaml. If you need specific parameters add them there.
#' @param stream_parameters The PHONE_STREAM_PARAMETERS key in config.yaml. If you need specific parameters add them there.
#' @param device A device ID string
#' @param sensor_container database table or file containing the sensor data for all participants. This is the PHONE_SENSOR[TABLE] key in config.yaml
#' @param columns the columns needed from this sensor (we recommend to only return these columns instead of every column in sensor_container)
#' @return A dataframe with the sensor data for device
download_data <- function(data_configuration, device, sensor_container, columns){
dbEngine <- get_db_engine(data_configuration$SOURCE$DATABASE_GROUP)
pull_data <- function(stream_parameters, device, sensor_container, columns){
dbEngine <- get_db_engine(stream_parameters$DATABASE_GROUP)
query <- paste0("SELECT ", paste(columns, collapse = ",")," FROM ", sensor_container, " WHERE device_id = '", device,"'")
# Letting the user know what we are doing
message(paste0("Executing the following query to download data: ", query))

View File

@ -63,7 +63,7 @@ load_container_script <- function(stream_container){
return(container$pull_data)
} else if(language == "r"){
source(stream_container)
if(exists("pull_data"))
if(!exists("pull_data"))
stop(paste0("The following container.R script does not have a pull_data function: ", stream_container))
return(pull_data)
}

View File

@ -17,9 +17,9 @@ validate_deviceid_platforms <- function(device_ids, platforms, participant){
}
}
validate_inferred_os <- function(source_download_file, participant_file, device, device_os){
validate_inferred_os <- function(stream_container, participant_file, device, device_os){
if(!is.na(device_os) && device_os != "android" && device_os != "ios")
stop(paste0("We tried to infer the OS for ", device, " but 'infer_device_os' function inside '",source_download_file,"' returned '",device_os,"' instead of 'android' or 'ios'. You can assign the OS manually in the participant file or report this bug on GitHub.\nParticipant file ", participant_file))
stop(paste0("We tried to infer the OS for ", device, " but 'infer_device_os' function inside '",stream_container,"' returned '",device_os,"' instead of 'android' or 'ios'. You can assign the OS manually in the participant file or report this bug on GitHub.\nParticipant file ", participant_file))
}
mutate_data <- function(scripts, data){
@ -58,7 +58,7 @@ rename_columns <- function(name_maps, data){
return(data)
}
validate_expected_columns_mapping <- function(schema, rapids_schema, sensor, rapids_schema_file){
validate_expected_columns_mapping <- function(schema, rapids_schema, sensor, rapids_schema_file, stream_format){
android_columns <- names(schema[[sensor]][["ANDROID"]][["COLUMN_MAPPINGS"]])
android_columns <- android_columns[(android_columns != "FLAG_AS_EXTRA")]
@ -69,29 +69,47 @@ validate_expected_columns_mapping <- function(schema, rapids_schema, sensor, rap
if(is.null(rapids_columns))
stop(paste(sensor, " columns are not listed in RAPIDS' column specification. If you are adding support for a new phone sensor, add any mandatory columns in ", rapids_schema_file))
if(length(setdiff(rapids_columns, android_columns)) > 0)
stop(paste(sensor," mappings are missing one or more mandatory columns for ANDROID. The missing column mappings are for ", paste(setdiff(rapids_columns, android_columns), collapse=","),"in", rapids_schema_file))
stop(paste(sensor," mappings are missing one or more mandatory columns for ANDROID. The missing column mappings are for ", paste(setdiff(rapids_columns, android_columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
if(length(setdiff(rapids_columns, ios_columns)) > 0)
stop(paste(sensor," mappings are missing one or more mandatory columns for IOS. The missing column mappings are for ", paste(setdiff(rapids_columns, ios_columns), collapse=","),"in", rapids_schema_file))
stop(paste(sensor," mappings are missing one or more mandatory columns for IOS. The missing column mappings are for ", paste(setdiff(rapids_columns, ios_columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
if(length(setdiff(android_columns, rapids_columns)) > 0)
stop(paste(sensor," mappings have one or more columns than required for ANDROID, add them as FLAG_AS_EXTRA instead. The extra column mappings are for ", paste(setdiff(android_columns, rapids_columns), collapse=","),"in", rapids_schema_file))
stop(paste(sensor," mappings have one or more columns than required for ANDROID, add them as FLAG_AS_EXTRA instead. The extra column mappings are for ", paste(setdiff(android_columns, rapids_columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
if(length(setdiff(ios_columns, rapids_columns)) > 0)
stop(paste(sensor," mappings have one or more columns than required for IOS, add them as FLAG_AS_EXTRA instead. The extra column mappings are for ", paste(setdiff(ios_columns, rapids_columns), collapse=","),"in", rapids_schema_file))
stop(paste(sensor," mappings have one or more columns than required for IOS, add them as FLAG_AS_EXTRA instead. The extra column mappings are for ", paste(setdiff(ios_columns, rapids_columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
}
download_phone_data <- function(){
load_container_script <- function(stream_container){
language <- if_else(endsWith(tolower(stream_container), "py"), "python", "r")
if(language == "python"){
library(reticulate)
container <- import_from_path(gsub(pattern = "\\.py$", "", basename(stream_container)), path = dirname(stream_container))
if(!py_has_attr(container, "pull_data"))
stop(paste0("The following container.py script does not have a pull_data function: ", stream_container))
if(!py_has_attr(container, "infer_device_os"))
stop(paste0("The following container.py script does not have a infer_device_os function: ", stream_container))
return(list("infer_device_os" = container$infer_device_os, "pull_data" = container$pull_data))
} else if(language == "r"){
source(stream_container)
if(!exists("pull_data"))
stop(paste0("The following container.R script does not have a pull_data function: ", stream_container))
if(!exists("infer_device_os"))
stop(paste0("The following container.R script does not have a infer_device_os function: ", stream_container))
return(list("infer_device_os" = infer_device_os, "pull_data" = pull_data))
}
}
pull_phone_data <- function(){
participant_file <- snakemake@input[["participant_file"]]
source_schema_file <- snakemake@input[["source_schema_file"]]
stream_format <- snakemake@input[["stream_format"]]
rapids_schema_file <- snakemake@input[["rapids_schema_file"]]
source_download_file <- snakemake@input[["source_download_file"]]
stream_container <- snakemake@input[["stream_container"]]
data_configuration <- snakemake@params[["data_configuration"]]
tables <- snakemake@params[["tables"]]
sensor <- toupper(snakemake@params[["sensor"]])
output_data_file <- snakemake@output[[1]]
source(source_download_file)
participant_data <- read_yaml(participant_file)
schema <- read_yaml(source_schema_file)
stream_schema <- read_yaml(stream_format)
rapids_schema <- read_yaml(rapids_schema_file)
devices <- participant_data$PHONE$DEVICE_IDS
device_oss <- participant_data$PHONE$PLATFORMS
@ -101,30 +119,34 @@ download_phone_data <- function(){
if(length(device_oss) == 1)
device_oss <- rep(device_oss, length(devices))
validate_expected_columns_mapping(schema, rapids_schema, sensor, rapids_schema_file)
validate_expected_columns_mapping(stream_schema, rapids_schema, sensor, rapids_schema_file, stream_format)
# ANDROID or IOS COLUMN_MAPPINGS are guaranteed to be the same at this point (see validate_expected_columns_mapping function)
expected_columns <- tolower(names(schema[[sensor]][["ANDROID"]][["COLUMN_MAPPINGS"]]))
expected_columns <- tolower(names(stream_schema[[sensor]][["ANDROID"]][["COLUMN_MAPPINGS"]]))
expected_columns <- expected_columns[(expected_columns != "flag_extra")]
participant_data <- setNames(data.frame(matrix(ncol = length(expected_columns), nrow = 0)), expected_columns)
container_functions <- load_container_script(stream_container)
infer_device_os_container <- container_functions$infer_device_os
pull_data_container <- container_functions$pull_data
for(idx in seq_along(devices)){ #TODO remove length
device <- devices[idx]
message(paste0("\nProcessing ", sensor, " for ", device))
device_os <- ifelse(device_oss[idx] == "infer", infer_device_os(data_configuration, device), device_oss[idx])
validate_inferred_os(basename(source_download_file), participant_file, device, device_os)
device_os <- ifelse(device_oss[idx] == "infer", infer_device_os_container(data_configuration, device), device_oss[idx])
validate_inferred_os(basename(stream_container), participant_file, device, device_os)
os_table <- ifelse(length(tables) > 1, tables[[toupper(device_os)]], tables) # some sensor tables have a different name for android and ios
columns_to_download <- schema[[sensor]][[toupper(device_os)]][["COLUMN_MAPPINGS"]]
columns_to_download <- stream_schema[[sensor]][[toupper(device_os)]][["COLUMN_MAPPINGS"]]
columns_to_download <- columns_to_download[(columns_to_download != "FLAG_TO_MUTATE")]
data <- download_data(data_configuration, device, os_table, columns_to_download)
data <- pull_data_container(data_configuration, device, os_table, columns_to_download)
# Rename all COLUMN_MAPPINGS except those mapped as FLAG_AS_EXTRA or FLAG_TO_MUTATE
columns_to_rename <- schema[[sensor]][[toupper(device_os)]][["COLUMN_MAPPINGS"]]
columns_to_rename <- stream_schema[[sensor]][[toupper(device_os)]][["COLUMN_MAPPINGS"]]
columns_to_rename <- (columns_to_rename[(columns_to_rename != "FLAG_TO_MUTATE" & names(columns_to_rename) != "FLAG_AS_EXTRA")])
renamed_data <- rename_columns(columns_to_rename, data)
mutation_scripts <- schema[[sensor]][[toupper(device_os)]][["MUTATION_SCRIPTS"]]
mutation_scripts <- stream_schema[[sensor]][[toupper(device_os)]][["MUTATION_SCRIPTS"]]
mutated_data <- mutate_data(mutation_scripts, renamed_data)
if(length(setdiff(expected_columns, colnames(mutated_data))) > 0)
@ -136,4 +158,4 @@ download_phone_data <- function(){
write_csv(participant_data, output_data_file)
}
download_phone_data()
pull_phone_data()

View File

@ -6,7 +6,7 @@ required:
- PIDS
- CREATE_PARTICIPANT_FILES
- TIME_SEGMENTS
- PHONE_DATA_CONFIGURATION
- PHONE_DATA_STREAMS
- PHONE_ACCELEROMETER
- PHONE_ACTIVITY_RECOGNITION
- PHONE_APPLICATIONS_CRASHES
@ -209,16 +209,16 @@ properties:
INCLUDE_PAST_PERIODIC_SEGMENTS:
type: boolean
PHONE_DATA_CONFIGURATION:
allOf:
- $ref: "#/definitions/DATA_CONFIGURATION"
- properties:
SOURCE:
type: object
properties:
TYPE:
type: string
enum: [aware_mysql]
PHONE_DATA_STREAMS:
type: object
properties:
USE:
type: string
aware_mysql:
type: object
properties:
DATABASE_GROUP:
type: string
PHONE_ACCELEROMETER:
type: object