2021-03-07 05:16:59 +01:00
source ( " renv/activate.R" )
library ( yaml )
library ( dplyr )
library ( readr )
2021-04-09 20:59:53 +02:00
fix_pandas_nan_in_string_columns <- function ( column ) {
return ( vapply ( column , function ( value ) {
if ( ! is.character ( value ) && ! is.nan ( value ) )
stop ( " The reticulate conversion from the python mutation script to r failed. One or more returned columns are a list with unsupported mixed types. We only handle string columns with np.nan values. Open a GitHub issue or fix the mutation script" )
return ( ifelse ( is.nan ( value ) , NA_character_ , value ) )
} , FUN.VALUE = character ( 1 ) ) )
}
2021-03-07 05:16:59 +01:00
# we use reticulate but only load it if we are going to use it to minimize the case when old RAPIDS deployments need to update ther renv
2021-03-09 17:28:33 +01:00
mutate_data <- function ( scripts , data , data_configuration ) {
2021-03-07 05:16:59 +01:00
for ( script in scripts ) {
if ( grepl ( " \\.(R)$" , script ) ) {
myEnv <- new.env ( )
source ( script , local = myEnv )
attach ( myEnv , name = " sourced_scripts_rapids" )
if ( exists ( " main" , myEnv ) ) {
message ( paste ( " Applying mutation script" , script ) )
2021-03-09 17:28:33 +01:00
data <- main ( data , data_configuration )
2021-03-07 05:16:59 +01:00
} else {
stop ( paste0 ( " The following mutation script does not have main function: " , script ) )
}
# rm(list = ls(envir = myEnv), envir = myEnv, inherits = FALSE)
detach ( " sourced_scripts_rapids" )
} else { # python
library ( reticulate )
module <- gsub ( pattern = " \\.py$" , " " , basename ( script ) )
script_functions <- import_from_path ( module , path = dirname ( script ) )
if ( py_has_attr ( script_functions , " main" ) ) {
message ( paste ( " Applying mutation script" , script ) )
2021-03-09 17:28:33 +01:00
data <- script_functions $ main ( data , data_configuration )
2021-04-09 20:59:53 +02:00
data <- data %>% mutate ( across ( where ( is.list ) , fix_pandas_nan_in_string_columns ) )
2021-03-07 05:16:59 +01:00
} else {
stop ( paste0 ( " The following mutation script does not have a main function: " , script ) )
}
}
}
return ( data )
}
rename_columns <- function ( name_maps , data ) {
for ( name in names ( name_maps ) )
data <- data %>% rename ( ! ! tolower ( name ) : = name_maps [ [name ] ] )
return ( data )
}
validate_expected_columns_mapping <- function ( schema , rapids_schema , sensor , rapids_schema_file , stream_format ) {
2021-03-09 22:42:02 +01:00
columns <- names ( schema [ [sensor ] ] [ [ " RAPIDS_COLUMN_MAPPINGS" ] ] )
2021-03-07 05:16:59 +01:00
rapids_columns <- rapids_schema [ [sensor ] ]
if ( is.null ( rapids_columns ) )
stop ( paste ( sensor , " columns are not listed in RAPIDS' column specification. If you are adding support for a new phone sensor, add any mandatory columns in " , rapids_schema_file ) )
if ( length ( setdiff ( rapids_columns , columns ) ) > 0 )
stop ( paste ( sensor , " mappings are missing one or more mandatory columns. The missing column mappings are for " , paste ( setdiff ( rapids_columns , columns ) , collapse = " ," ) , " in" , stream_format , " (the mappings are case sensitive)" ) )
if ( length ( setdiff ( columns , rapids_columns ) ) > 0 )
2021-03-09 22:42:02 +01:00
stop ( paste ( sensor , " mappings have one or more columns than required. If you mutation scripts need them, add them as [MUTATION][COLUMN_MAPPINGS] instead. The extra column mappings are for " , paste ( setdiff ( columns , rapids_columns ) , collapse = " ," ) , " in" , stream_format , " (the mappings are case sensitive)" ) )
2021-03-07 05:16:59 +01:00
}
load_container_script <- function ( stream_container ) {
language <- if_else ( endsWith ( tolower ( stream_container ) , " py" ) , " python" , " r" )
if ( language == " python" ) {
library ( reticulate )
container <- import_from_path ( gsub ( pattern = " \\.py$" , " " , basename ( stream_container ) ) , path = dirname ( stream_container ) )
if ( ! py_has_attr ( container , " pull_data" ) )
stop ( paste0 ( " The following container.py script does not have a pull_data function: " , stream_container ) )
return ( container $ pull_data )
} else if ( language == " r" ) {
source ( stream_container )
2021-03-08 21:58:26 +01:00
if ( ! exists ( " pull_data" ) )
2021-03-07 05:16:59 +01:00
stop ( paste0 ( " The following container.R script does not have a pull_data function: " , stream_container ) )
return ( pull_data )
}
}
2021-03-28 20:31:44 +02:00
get_devices_ids <- function ( participant_data ) {
devices_ids = c ( )
for ( device in participant_data )
for ( attribute in names ( device ) )
if ( attribute == " DEVICE_IDS" )
devices_ids <- c ( devices_ids , device [ [attribute ] ] )
return ( devices_ids )
}
validate_participant_file_without_device_ids <- function ( participant_file ) {
participant_data <- yaml :: read_yaml ( participant_file )
participant_devices <- get_devices_ids ( participant_data )
if ( length ( participant_devices ) == 0 )
stop ( " There are no device ids in this participant file for smartphones or wearables: " , participant_file )
}
2021-03-09 22:42:02 +01:00
pull_wearable_data_main <- function ( ) {
2021-03-07 05:16:59 +01:00
participant_file <- snakemake @ input [ [ " participant_file" ] ]
stream_format <- snakemake @ input [ [ " stream_format" ] ]
rapids_schema_file <- snakemake @ input [ [ " rapids_schema_file" ] ]
stream_container <- snakemake @ input [ [ " stream_container" ] ]
data_configuration <- snakemake @ params [ [ " data_configuration" ] ]
pid <- snakemake @ params [ [ " pid" ] ]
2021-03-09 22:42:02 +01:00
table <- snakemake @ params [ [ " tables" ] ]
device_type <- snakemake @ params [ [ " device_type" ] ]
2021-03-07 05:16:59 +01:00
sensor <- toupper ( snakemake @ params [ [ " sensor" ] ] )
output_data_file <- snakemake @ output [ [1 ] ]
2021-03-28 20:31:44 +02:00
validate_participant_file_without_device_ids ( participant_file )
2021-03-07 05:16:59 +01:00
participant_data <- read_yaml ( participant_file )
stream_schema <- read_yaml ( stream_format )
rapids_schema <- read_yaml ( rapids_schema_file )
2021-03-09 22:42:02 +01:00
devices <- participant_data [ [toupper ( device_type ) ] ] $ DEVICE_IDS
2021-03-17 01:02:16 +01:00
2021-03-07 05:16:59 +01:00
validate_expected_columns_mapping ( stream_schema , rapids_schema , sensor , rapids_schema_file , stream_format )
2021-03-09 22:42:02 +01:00
expected_columns <- tolower ( names ( stream_schema [ [sensor ] ] [ [ " RAPIDS_COLUMN_MAPPINGS" ] ] ) )
2021-03-07 05:16:59 +01:00
participant_data <- setNames ( data.frame ( matrix ( ncol = length ( expected_columns ) , nrow = 0 ) ) , expected_columns )
2021-03-17 01:02:16 +01:00
if ( length ( devices ) == 0 ) {
warning ( " There were no " , device_type , " device ids in this participant file: " , participant_file )
write_csv ( participant_data , output_data_file )
return ( )
}
2021-03-07 05:16:59 +01:00
pull_data_container <- load_container_script ( stream_container )
2021-04-17 00:02:43 +02:00
for ( idx in seq_along ( devices ) ) {
2021-03-07 05:16:59 +01:00
device <- devices [idx ]
message ( paste0 ( " \nProcessing " , sensor , " for " , device ) )
2021-03-09 22:42:02 +01:00
columns_to_download <- c ( stream_schema [ [sensor ] ] [ [ " RAPIDS_COLUMN_MAPPINGS" ] ] , stream_schema [ [sensor ] ] [ [ " MUTATION" ] ] [ [ " COLUMN_MAPPINGS" ] ] )
2021-03-07 05:16:59 +01:00
columns_to_download <- columns_to_download [ ( columns_to_download != " FLAG_TO_MUTATE" ) ]
2021-03-09 22:42:02 +01:00
data <- pull_data_container ( data_configuration , device , sensor , table , columns_to_download )
2021-03-07 05:16:59 +01:00
2021-03-09 22:42:02 +01:00
if ( ! setequal ( columns_to_download , colnames ( data ) ) )
stop ( paste0 ( " The pulled data for " , device , " does not have the expected columns (including [RAPIDS_COLUMN_MAPPINGS] and [MUTATE][COLUMN_MAPPINGS]). The container script returned [" , paste ( colnames ( data ) , collapse = " ," ) , " ] but the format mappings expected [" , paste ( columns_to_download , collapse = " ," ) , " ]. The conainer script is: " , stream_container ) )
renamed_data <- rename_columns ( columns_to_download , data )
2021-03-07 05:16:59 +01:00
2021-03-09 22:42:02 +01:00
mutation_scripts <- stream_schema [ [sensor ] ] [ [ " MUTATION" ] ] [ [ " SCRIPTS" ] ]
2021-03-09 17:28:33 +01:00
mutated_data <- mutate_data ( mutation_scripts , renamed_data , data_configuration )
2021-03-07 05:16:59 +01:00
2021-03-09 22:42:02 +01:00
if ( ! setequal ( expected_columns , colnames ( mutated_data ) ) )
2021-03-11 01:38:09 +01:00
stop ( paste0 ( " The mutated data for " , device , " does not have the columns RAPIDS expects. The mutation script returned [" , paste ( colnames ( mutated_data ) , collapse = " ," ) , " ] but RAPIDS expected [" , paste ( expected_columns , collapse = " ," ) , " ]. One ore more mutation scripts in [" , sensor , " ][MUTATION][SCRIPTS] are adding extra columns or removing or not adding the ones expected" ) )
2021-03-12 18:38:36 +01:00
participant_data <- rbind ( participant_data , mutated_data %>% distinct ( ) )
2021-03-07 05:16:59 +01:00
}
2021-03-12 01:32:11 +01:00
if ( device_type == " fitbit" )
participant_data <- participant_data %>% arrange ( local_date_time )
else
participant_data <- participant_data %>% arrange ( timestamp )
2021-03-07 05:16:59 +01:00
write_csv ( participant_data , output_data_file )
}
2021-03-09 22:42:02 +01:00
pull_wearable_data_main ( )