2020-05-02 01:46:04 +02:00
source ( " renv/activate.R" )
2021-03-02 23:57:22 +01:00
2020-10-21 01:12:01 +02:00
library ( yaml )
2021-03-02 23:57:22 +01:00
library ( dplyr )
library ( readr )
# we use reticulate but only load it if we are going to use it to minimize the case when old RAPIDS deployments need to update ther renv
validate_deviceid_platforms <- function ( device_ids , platforms , participant ) {
if ( length ( device_ids ) > 1 && length ( platforms ) == 1 ) {
if ( platforms != " android" && platforms != " ios" && platforms != " infer" )
stop ( paste0 ( " If you have more than 1 device_id, platform should be 'android', 'ios' OR 'infer' but you typed: '" , paste0 ( platforms , collapse = " s," ) , " '. Participant file: " , participant ) )
2020-06-30 23:34:18 +02:00
} else if ( length ( device_ids ) > 1 && length ( platforms ) > 1 ) {
if ( length ( device_ids ) != length ( platforms ) )
stop ( paste0 ( " The number of device_ids should match the number of platforms. Participant file:" , participant ) )
if ( all ( intersect ( c ( " android" , " ios" ) , unique ( platforms ) ) != c ( " android" , " ios" ) ) )
stop ( paste0 ( " If you have more than 1 device_id and more than 1 platform, the platforms should be a mix of 'android' AND 'ios' but you typed: '" , paste0 ( platforms , collapse = " ," ) , " '. Participant file: " , participant ) )
}
}
2021-03-08 21:58:26 +01:00
validate_inferred_os <- function ( stream_container , participant_file , device , device_os ) {
2021-03-02 23:57:22 +01:00
if ( ! is.na ( device_os ) && device_os != " android" && device_os != " ios" )
2021-03-08 21:58:26 +01:00
stop ( paste0 ( " We tried to infer the OS for " , device , " but 'infer_device_os' function inside '" , stream_container , " ' returned '" , device_os , " ' instead of 'android' or 'ios'. You can assign the OS manually in the participant file or report this bug on GitHub.\nParticipant file " , participant_file ) )
2020-06-30 23:34:18 +02:00
}
2021-03-09 17:28:33 +01:00
mutate_data <- function ( scripts , data , data_configuration ) {
2021-03-02 23:57:22 +01:00
for ( script in scripts ) {
if ( grepl ( " \\.(R)$" , script ) ) {
myEnv <- new.env ( )
source ( script , local = myEnv )
attach ( myEnv , name = " sourced_scripts_rapids" )
if ( exists ( " main" , myEnv ) ) {
message ( paste ( " Applying mutation script" , script ) )
2021-03-09 17:28:33 +01:00
data <- main ( data , data_configuration )
2021-03-02 23:57:22 +01:00
} else {
stop ( paste0 ( " The following mutation script does not have main function: " , script ) )
}
# rm(list = ls(envir = myEnv), envir = myEnv, inherits = FALSE)
detach ( " sourced_scripts_rapids" )
} else { # python
library ( reticulate )
module <- gsub ( pattern = " \\.py$" , " " , basename ( script ) )
script_functions <- import_from_path ( module , path = dirname ( script ) )
if ( py_has_attr ( script_functions , " main" ) ) {
message ( paste ( " Applying mutation script" , script ) )
2021-03-09 17:28:33 +01:00
data <- script_functions $ main ( data , data_configuration )
2021-03-02 23:57:22 +01:00
} else {
stop ( paste0 ( " The following mutation script does not have a main function: " , script ) )
}
2020-10-21 01:12:01 +02:00
}
2021-03-02 23:57:22 +01:00
}
return ( data )
2020-10-21 01:12:01 +02:00
}
2021-03-02 23:57:22 +01:00
rename_columns <- function ( name_maps , data ) {
for ( name in names ( name_maps ) )
data <- data %>% rename ( ! ! tolower ( name ) : = name_maps [ [name ] ] )
return ( data )
2020-10-21 01:12:01 +02:00
}
2021-03-02 23:57:22 +01:00
2021-03-08 21:58:26 +01:00
validate_expected_columns_mapping <- function ( schema , rapids_schema , sensor , rapids_schema_file , stream_format ) {
2021-03-02 23:57:22 +01:00
rapids_columns <- rapids_schema [ [sensor ] ]
if ( is.null ( rapids_columns ) )
stop ( paste ( sensor , " columns are not listed in RAPIDS' column specification. If you are adding support for a new phone sensor, add any mandatory columns in " , rapids_schema_file ) )
2021-03-09 00:19:53 +01:00
if ( " ANDROID" %in% schema [ [sensor ] ] ) {
2021-03-09 22:42:02 +01:00
android_columns <- names ( schema [ [sensor ] ] [ [ " ANDROID" ] ] [ [ " RAPIDS_COLUMN_MAPPINGS" ] ] )
2021-03-09 00:19:53 +01:00
if ( length ( setdiff ( rapids_columns , android_columns ) ) > 0 )
stop ( paste ( sensor , " mappings are missing one or more mandatory columns for ANDROID. The missing column mappings are for " , paste ( setdiff ( rapids_columns , android_columns ) , collapse = " ," ) , " in" , stream_format , " (the mappings are case sensitive)" ) )
if ( length ( setdiff ( android_columns , rapids_columns ) ) > 0 )
2021-03-10 03:52:38 +01:00
stop ( paste ( sensor , " mappings have one or more columns than required for ANDROID. The extra column mappings are for " , paste ( setdiff ( android_columns , rapids_columns ) , collapse = " ," ) , " in" , stream_format , " (the mappings are case sensitive)" ) )
2021-03-09 00:19:53 +01:00
}
if ( " IOS" %in% schema [ [sensor ] ] ) {
2021-03-09 22:42:02 +01:00
ios_columns <- names ( schema [ [sensor ] ] [ [ " IOS" ] ] [ [ " RAPIDS_COLUMN_MAPPINGS" ] ] )
2021-03-09 00:19:53 +01:00
if ( length ( setdiff ( rapids_columns , ios_columns ) ) > 0 )
stop ( paste ( sensor , " mappings are missing one or more mandatory columns for IOS. The missing column mappings are for " , paste ( setdiff ( rapids_columns , ios_columns ) , collapse = " ," ) , " in" , stream_format , " (the mappings are case sensitive)" ) )
if ( length ( setdiff ( ios_columns , rapids_columns ) ) > 0 )
2021-03-10 03:52:38 +01:00
stop ( paste ( sensor , " mappings have one or more columns than required for IOS. The extra column mappings are for " , paste ( setdiff ( ios_columns , rapids_columns ) , collapse = " ," ) , " in" , stream_format , " (the mappings are case sensitive)" ) )
2021-03-09 00:19:53 +01:00
}
2020-10-19 21:07:12 +02:00
}
2021-03-08 21:58:26 +01:00
load_container_script <- function ( stream_container ) {
language <- if_else ( endsWith ( tolower ( stream_container ) , " py" ) , " python" , " r" )
if ( language == " python" ) {
library ( reticulate )
container <- import_from_path ( gsub ( pattern = " \\.py$" , " " , basename ( stream_container ) ) , path = dirname ( stream_container ) )
if ( ! py_has_attr ( container , " pull_data" ) )
stop ( paste0 ( " The following container.py script does not have a pull_data function: " , stream_container ) )
if ( ! py_has_attr ( container , " infer_device_os" ) )
stop ( paste0 ( " The following container.py script does not have a infer_device_os function: " , stream_container ) )
return ( list ( " infer_device_os" = container $ infer_device_os , " pull_data" = container $ pull_data ) )
} else if ( language == " r" ) {
source ( stream_container )
if ( ! exists ( " pull_data" ) )
stop ( paste0 ( " The following container.R script does not have a pull_data function: " , stream_container ) )
if ( ! exists ( " infer_device_os" ) )
stop ( paste0 ( " The following container.R script does not have a infer_device_os function: " , stream_container ) )
return ( list ( " infer_device_os" = infer_device_os , " pull_data" = pull_data ) )
}
}
2021-03-28 20:31:44 +02:00
get_devices_ids <- function ( participant_data ) {
devices_ids = c ( )
for ( device in participant_data )
for ( attribute in names ( device ) )
if ( attribute == " DEVICE_IDS" )
devices_ids <- c ( devices_ids , device [ [attribute ] ] )
return ( devices_ids )
}
validate_participant_file_without_device_ids <- function ( participant_file ) {
participant_data <- yaml :: read_yaml ( participant_file )
participant_devices <- get_devices_ids ( participant_data )
if ( length ( participant_devices ) == 0 )
stop ( " There are no device ids in this participant file for smartphones or wearables: " , participant_file )
}
2021-03-08 21:58:26 +01:00
pull_phone_data <- function ( ) {
2021-03-02 23:57:22 +01:00
participant_file <- snakemake @ input [ [ " participant_file" ] ]
2021-03-08 21:58:26 +01:00
stream_format <- snakemake @ input [ [ " stream_format" ] ]
2021-03-02 23:57:22 +01:00
rapids_schema_file <- snakemake @ input [ [ " rapids_schema_file" ] ]
2021-03-08 21:58:26 +01:00
stream_container <- snakemake @ input [ [ " stream_container" ] ]
2021-03-02 23:57:22 +01:00
data_configuration <- snakemake @ params [ [ " data_configuration" ] ]
tables <- snakemake @ params [ [ " tables" ] ]
sensor <- toupper ( snakemake @ params [ [ " sensor" ] ] )
2021-03-09 22:42:02 +01:00
device_type <- " phone"
2021-03-02 23:57:22 +01:00
output_data_file <- snakemake @ output [ [1 ] ]
2021-03-28 20:31:44 +02:00
validate_participant_file_without_device_ids ( participant_file )
2021-03-02 23:57:22 +01:00
participant_data <- read_yaml ( participant_file )
2021-03-08 21:58:26 +01:00
stream_schema <- read_yaml ( stream_format )
2021-03-02 23:57:22 +01:00
rapids_schema <- read_yaml ( rapids_schema_file )
devices <- participant_data $ PHONE $ DEVICE_IDS
device_oss <- participant_data $ PHONE $ PLATFORMS
device_oss <- replace ( device_oss , device_oss == " multiple" , " infer" ) # support multiple for retro compatibility
validate_deviceid_platforms ( devices , device_oss , participant_file )
if ( length ( device_oss ) == 1 )
device_oss <- rep ( device_oss , length ( devices ) )
2020-10-19 21:07:12 +02:00
2021-03-08 21:58:26 +01:00
validate_expected_columns_mapping ( stream_schema , rapids_schema , sensor , rapids_schema_file , stream_format )
2021-03-09 22:42:02 +01:00
# ANDROID or IOS RAPIDS_COLUMN_MAPPINGS are guaranteed to be the same at this point (see validate_expected_columns_mapping function)
2021-03-09 00:19:53 +01:00
expected_columns <- tolower ( rapids_schema [ [sensor ] ] )
2021-03-02 23:57:22 +01:00
participant_data <- setNames ( data.frame ( matrix ( ncol = length ( expected_columns ) , nrow = 0 ) ) , expected_columns )
2020-10-19 21:07:12 +02:00
2021-03-17 01:02:16 +01:00
if ( length ( devices ) == 0 ) {
warning ( " There were no PHONE device ids in this participant file:" , participant_file )
write_csv ( participant_data , output_data_file )
return ( )
}
2021-03-08 21:58:26 +01:00
container_functions <- load_container_script ( stream_container )
infer_device_os_container <- container_functions $ infer_device_os
pull_data_container <- container_functions $ pull_data
2021-04-17 00:02:43 +02:00
for ( idx in seq_along ( devices ) ) {
2021-03-02 23:57:22 +01:00
device <- devices [idx ]
message ( paste0 ( " \nProcessing " , sensor , " for " , device ) )
2021-03-08 21:58:26 +01:00
device_os <- ifelse ( device_oss [idx ] == " infer" , infer_device_os_container ( data_configuration , device ) , device_oss [idx ] )
validate_inferred_os ( basename ( stream_container ) , participant_file , device , device_os )
2021-03-09 00:00:36 +01:00
2021-03-14 06:09:08 +01:00
if ( ! toupper ( device_os ) %in% names ( stream_schema [ [sensor ] ] ) ) { # the current sensor is only available in a single OS (like PHONE_MESSAGES)
warning ( sensor , " data is not available for " , device_os , " . No data to download for " , device )
2021-03-09 00:00:36 +01:00
next
2021-03-14 06:09:08 +01:00
}
2021-03-09 00:00:36 +01:00
2021-03-02 23:57:22 +01:00
os_table <- ifelse ( length ( tables ) > 1 , tables [ [toupper ( device_os ) ] ] , tables ) # some sensor tables have a different name for android and ios
2021-03-09 22:42:02 +01:00
columns_to_download <- c ( stream_schema [ [sensor ] ] [ [toupper ( device_os ) ] ] [ [ " RAPIDS_COLUMN_MAPPINGS" ] ] , stream_schema [ [sensor ] ] [ [toupper ( device_os ) ] ] [ [ " MUTATION" ] ] [ [ " COLUMN_MAPPINGS" ] ] )
2021-03-02 23:57:22 +01:00
columns_to_download <- columns_to_download [ ( columns_to_download != " FLAG_TO_MUTATE" ) ]
2021-03-09 22:42:02 +01:00
data <- pull_data_container ( data_configuration , device , sensor , os_table , columns_to_download )
2021-03-02 23:57:22 +01:00
2021-03-09 22:42:02 +01:00
if ( ! setequal ( columns_to_download , colnames ( data ) ) )
stop ( paste0 ( " The pulled data for " , device , " does not have the expected columns (including [RAPIDS_COLUMN_MAPPINGS] and [MUTATE][COLUMN_MAPPINGS]). The container script returned [" , paste ( colnames ( data ) , collapse = " ," ) , " ] but the format mappings expected [" , paste ( columns_to_download , collapse = " ," ) , " ]. The conainer script is: " , stream_container ) )
2021-03-02 23:57:22 +01:00
2021-03-09 22:42:02 +01:00
renamed_data <- rename_columns ( columns_to_download , data )
mutation_scripts <- stream_schema [ [sensor ] ] [ [toupper ( device_os ) ] ] [ [ " MUTATION" ] ] [ [ " SCRIPTS" ] ]
2021-03-09 17:28:33 +01:00
mutated_data <- mutate_data ( mutation_scripts , renamed_data , data_configuration )
2021-03-02 23:57:22 +01:00
2021-03-09 22:42:02 +01:00
if ( ! setequal ( expected_columns , colnames ( mutated_data ) ) )
2021-03-11 01:38:09 +01:00
stop ( paste0 ( " The mutated data for " , device , " does not have the columns RAPIDS expects. The mutation script returned [" , paste ( colnames ( mutated_data ) , collapse = " ," ) , " ] but RAPIDS expected [" , paste ( expected_columns , collapse = " ," ) , " ]. One ore more mutation scripts in [" , sensor , " ][MUTATION][SCRIPTS] are adding extra columns or removing or not adding the ones expected" ) )
2021-03-12 18:38:36 +01:00
participant_data <- rbind ( participant_data , mutated_data %>% distinct ( ) )
2021-03-02 23:57:22 +01:00
}
2021-03-12 01:32:11 +01:00
participant_data <- participant_data %>% arrange ( timestamp )
2021-03-02 23:57:22 +01:00
write_csv ( participant_data , output_data_file )
}
2020-06-30 23:34:18 +02:00
2021-03-08 21:58:26 +01:00
pull_phone_data ( )