2020-05-02 01:46:04 +02:00
source ( " renv/activate.R" )
2021-03-02 23:57:22 +01:00
2020-10-21 01:12:01 +02:00
library ( yaml )
2021-03-02 23:57:22 +01:00
library ( dplyr )
library ( readr )
# we use reticulate but only load it if we are going to use it to minimize the case when old RAPIDS deployments need to update ther renv
validate_deviceid_platforms <- function ( device_ids , platforms , participant ) {
if ( length ( device_ids ) > 1 && length ( platforms ) == 1 ) {
if ( platforms != " android" && platforms != " ios" && platforms != " infer" )
stop ( paste0 ( " If you have more than 1 device_id, platform should be 'android', 'ios' OR 'infer' but you typed: '" , paste0 ( platforms , collapse = " s," ) , " '. Participant file: " , participant ) )
2020-06-30 23:34:18 +02:00
} else if ( length ( device_ids ) > 1 && length ( platforms ) > 1 ) {
if ( length ( device_ids ) != length ( platforms ) )
stop ( paste0 ( " The number of device_ids should match the number of platforms. Participant file:" , participant ) )
if ( all ( intersect ( c ( " android" , " ios" ) , unique ( platforms ) ) != c ( " android" , " ios" ) ) )
stop ( paste0 ( " If you have more than 1 device_id and more than 1 platform, the platforms should be a mix of 'android' AND 'ios' but you typed: '" , paste0 ( platforms , collapse = " ," ) , " '. Participant file: " , participant ) )
}
}
2021-03-08 21:58:26 +01:00
validate_inferred_os <- function ( stream_container , participant_file , device , device_os ) {
2021-03-02 23:57:22 +01:00
if ( ! is.na ( device_os ) && device_os != " android" && device_os != " ios" )
2021-03-08 21:58:26 +01:00
stop ( paste0 ( " We tried to infer the OS for " , device , " but 'infer_device_os' function inside '" , stream_container , " ' returned '" , device_os , " ' instead of 'android' or 'ios'. You can assign the OS manually in the participant file or report this bug on GitHub.\nParticipant file " , participant_file ) )
2020-06-30 23:34:18 +02:00
}
2021-03-02 23:57:22 +01:00
mutate_data <- function ( scripts , data ) {
for ( script in scripts ) {
if ( grepl ( " \\.(R)$" , script ) ) {
myEnv <- new.env ( )
source ( script , local = myEnv )
attach ( myEnv , name = " sourced_scripts_rapids" )
if ( exists ( " main" , myEnv ) ) {
message ( paste ( " Applying mutation script" , script ) )
data <- main ( data )
} else {
stop ( paste0 ( " The following mutation script does not have main function: " , script ) )
}
# rm(list = ls(envir = myEnv), envir = myEnv, inherits = FALSE)
detach ( " sourced_scripts_rapids" )
} else { # python
library ( reticulate )
module <- gsub ( pattern = " \\.py$" , " " , basename ( script ) )
script_functions <- import_from_path ( module , path = dirname ( script ) )
if ( py_has_attr ( script_functions , " main" ) ) {
message ( paste ( " Applying mutation script" , script ) )
data <- script_functions $ main ( data )
} else {
stop ( paste0 ( " The following mutation script does not have a main function: " , script ) )
}
2020-10-21 01:12:01 +02:00
}
2021-03-02 23:57:22 +01:00
}
return ( data )
2020-10-21 01:12:01 +02:00
}
2021-03-02 23:57:22 +01:00
rename_columns <- function ( name_maps , data ) {
for ( name in names ( name_maps ) )
data <- data %>% rename ( ! ! tolower ( name ) : = name_maps [ [name ] ] )
return ( data )
2020-10-21 01:12:01 +02:00
}
2021-03-02 23:57:22 +01:00
2021-03-08 21:58:26 +01:00
validate_expected_columns_mapping <- function ( schema , rapids_schema , sensor , rapids_schema_file , stream_format ) {
2021-03-02 23:57:22 +01:00
rapids_columns <- rapids_schema [ [sensor ] ]
if ( is.null ( rapids_columns ) )
stop ( paste ( sensor , " columns are not listed in RAPIDS' column specification. If you are adding support for a new phone sensor, add any mandatory columns in " , rapids_schema_file ) )
2021-03-09 00:19:53 +01:00
if ( " ANDROID" %in% schema [ [sensor ] ] ) {
android_columns <- names ( schema [ [sensor ] ] [ [ " ANDROID" ] ] [ [ " COLUMN_MAPPINGS" ] ] )
android_columns <- android_columns [ ( android_columns != " FLAG_AS_EXTRA" ) ]
if ( length ( setdiff ( rapids_columns , android_columns ) ) > 0 )
stop ( paste ( sensor , " mappings are missing one or more mandatory columns for ANDROID. The missing column mappings are for " , paste ( setdiff ( rapids_columns , android_columns ) , collapse = " ," ) , " in" , stream_format , " (the mappings are case sensitive)" ) )
if ( length ( setdiff ( android_columns , rapids_columns ) ) > 0 )
stop ( paste ( sensor , " mappings have one or more columns than required for ANDROID, add them as FLAG_AS_EXTRA instead. The extra column mappings are for " , paste ( setdiff ( android_columns , rapids_columns ) , collapse = " ," ) , " in" , stream_format , " (the mappings are case sensitive)" ) )
}
if ( " IOS" %in% schema [ [sensor ] ] ) {
ios_columns <- names ( schema [ [sensor ] ] [ [ " IOS" ] ] [ [ " COLUMN_MAPPINGS" ] ] )
ios_columns <- ios_columns [ ( ios_columns != " FLAG_AS_EXTRA" ) ]
if ( length ( setdiff ( rapids_columns , ios_columns ) ) > 0 )
stop ( paste ( sensor , " mappings are missing one or more mandatory columns for IOS. The missing column mappings are for " , paste ( setdiff ( rapids_columns , ios_columns ) , collapse = " ," ) , " in" , stream_format , " (the mappings are case sensitive)" ) )
if ( length ( setdiff ( ios_columns , rapids_columns ) ) > 0 )
stop ( paste ( sensor , " mappings have one or more columns than required for IOS, add them as FLAG_AS_EXTRA instead. The extra column mappings are for " , paste ( setdiff ( ios_columns , rapids_columns ) , collapse = " ," ) , " in" , stream_format , " (the mappings are case sensitive)" ) )
}
2020-10-19 21:07:12 +02:00
}
2021-03-08 21:58:26 +01:00
load_container_script <- function ( stream_container ) {
language <- if_else ( endsWith ( tolower ( stream_container ) , " py" ) , " python" , " r" )
if ( language == " python" ) {
library ( reticulate )
container <- import_from_path ( gsub ( pattern = " \\.py$" , " " , basename ( stream_container ) ) , path = dirname ( stream_container ) )
if ( ! py_has_attr ( container , " pull_data" ) )
stop ( paste0 ( " The following container.py script does not have a pull_data function: " , stream_container ) )
if ( ! py_has_attr ( container , " infer_device_os" ) )
stop ( paste0 ( " The following container.py script does not have a infer_device_os function: " , stream_container ) )
return ( list ( " infer_device_os" = container $ infer_device_os , " pull_data" = container $ pull_data ) )
} else if ( language == " r" ) {
source ( stream_container )
if ( ! exists ( " pull_data" ) )
stop ( paste0 ( " The following container.R script does not have a pull_data function: " , stream_container ) )
if ( ! exists ( " infer_device_os" ) )
stop ( paste0 ( " The following container.R script does not have a infer_device_os function: " , stream_container ) )
return ( list ( " infer_device_os" = infer_device_os , " pull_data" = pull_data ) )
}
}
pull_phone_data <- function ( ) {
2021-03-02 23:57:22 +01:00
participant_file <- snakemake @ input [ [ " participant_file" ] ]
2021-03-08 21:58:26 +01:00
stream_format <- snakemake @ input [ [ " stream_format" ] ]
2021-03-02 23:57:22 +01:00
rapids_schema_file <- snakemake @ input [ [ " rapids_schema_file" ] ]
2021-03-08 21:58:26 +01:00
stream_container <- snakemake @ input [ [ " stream_container" ] ]
2021-03-02 23:57:22 +01:00
data_configuration <- snakemake @ params [ [ " data_configuration" ] ]
tables <- snakemake @ params [ [ " tables" ] ]
sensor <- toupper ( snakemake @ params [ [ " sensor" ] ] )
output_data_file <- snakemake @ output [ [1 ] ]
participant_data <- read_yaml ( participant_file )
2021-03-08 21:58:26 +01:00
stream_schema <- read_yaml ( stream_format )
2021-03-02 23:57:22 +01:00
rapids_schema <- read_yaml ( rapids_schema_file )
devices <- participant_data $ PHONE $ DEVICE_IDS
device_oss <- participant_data $ PHONE $ PLATFORMS
device_oss <- replace ( device_oss , device_oss == " multiple" , " infer" ) # support multiple for retro compatibility
validate_deviceid_platforms ( devices , device_oss , participant_file )
if ( length ( device_oss ) == 1 )
device_oss <- rep ( device_oss , length ( devices ) )
2020-10-19 21:07:12 +02:00
2021-03-08 21:58:26 +01:00
validate_expected_columns_mapping ( stream_schema , rapids_schema , sensor , rapids_schema_file , stream_format )
2021-03-02 23:57:22 +01:00
# ANDROID or IOS COLUMN_MAPPINGS are guaranteed to be the same at this point (see validate_expected_columns_mapping function)
2021-03-09 00:19:53 +01:00
expected_columns <- tolower ( rapids_schema [ [sensor ] ] )
2021-03-02 23:57:22 +01:00
expected_columns <- expected_columns [ ( expected_columns != " flag_extra" ) ]
participant_data <- setNames ( data.frame ( matrix ( ncol = length ( expected_columns ) , nrow = 0 ) ) , expected_columns )
2020-10-19 21:07:12 +02:00
2021-03-08 21:58:26 +01:00
container_functions <- load_container_script ( stream_container )
infer_device_os_container <- container_functions $ infer_device_os
pull_data_container <- container_functions $ pull_data
2021-03-02 23:57:22 +01:00
for ( idx in seq_along ( devices ) ) { #TODO remove length
device <- devices [idx ]
message ( paste0 ( " \nProcessing " , sensor , " for " , device ) )
2021-03-08 21:58:26 +01:00
device_os <- ifelse ( device_oss [idx ] == " infer" , infer_device_os_container ( data_configuration , device ) , device_oss [idx ] )
validate_inferred_os ( basename ( stream_container ) , participant_file , device , device_os )
2021-03-09 00:00:36 +01:00
if ( ! toupper ( device_os ) %in% stream_schema [ [sensor ] ] ) # the current sensor is only available in a single OS (like PHONE_MESSAGES)
next
2021-03-02 23:57:22 +01:00
os_table <- ifelse ( length ( tables ) > 1 , tables [ [toupper ( device_os ) ] ] , tables ) # some sensor tables have a different name for android and ios
2021-03-08 21:58:26 +01:00
columns_to_download <- stream_schema [ [sensor ] ] [ [toupper ( device_os ) ] ] [ [ " COLUMN_MAPPINGS" ] ]
2021-03-02 23:57:22 +01:00
columns_to_download <- columns_to_download [ ( columns_to_download != " FLAG_TO_MUTATE" ) ]
2021-03-08 21:58:26 +01:00
data <- pull_data_container ( data_configuration , device , os_table , columns_to_download )
2021-03-02 23:57:22 +01:00
# Rename all COLUMN_MAPPINGS except those mapped as FLAG_AS_EXTRA or FLAG_TO_MUTATE
2021-03-08 21:58:26 +01:00
columns_to_rename <- stream_schema [ [sensor ] ] [ [toupper ( device_os ) ] ] [ [ " COLUMN_MAPPINGS" ] ]
2021-03-02 23:57:22 +01:00
columns_to_rename <- ( columns_to_rename [ ( columns_to_rename != " FLAG_TO_MUTATE" & names ( columns_to_rename ) != " FLAG_AS_EXTRA" ) ] )
renamed_data <- rename_columns ( columns_to_rename , data )
2021-03-08 21:58:26 +01:00
mutation_scripts <- stream_schema [ [sensor ] ] [ [toupper ( device_os ) ] ] [ [ " MUTATION_SCRIPTS" ] ]
2021-03-02 23:57:22 +01:00
mutated_data <- mutate_data ( mutation_scripts , renamed_data )
if ( length ( setdiff ( expected_columns , colnames ( mutated_data ) ) ) > 0 )
stop ( paste ( " The mutated data for" , device , " is missing these columns expected by RAPIDS: [" , paste ( setdiff ( expected_columns , colnames ( mutated_data ) ) , collapse = " ," ) , " ]. One ore more mutation scripts in [" , sensor , " ][" , toupper ( device_os ) , " ]" , " [MUTATION_SCRIPTS] are removing or not adding these columns" ) )
participant_data <- rbind ( participant_data , mutated_data )
}
write_csv ( participant_data , output_data_file )
}
2020-06-30 23:34:18 +02:00
2021-03-08 21:58:26 +01:00
pull_phone_data ( )