2020-08-28 19:53:00 +02:00
library ( " stringr" )
2020-08-28 23:40:23 +02:00
rapids_log_tag <- " RAPIDS:"
2020-08-28 19:53:00 +02:00
filter_data_by_segment <- function ( data , day_segment ) {
2020-09-28 17:38:47 +02:00
# Filter the rows that belong to day_segment, and put the segment full name in a new column for grouping
datetime_regex = " [0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}"
timestamp_regex = " [0-9]{13}"
data <- data %>%
filter ( grepl ( paste0 ( " \\[" , day_segment , " #" ) , assigned_segments ) ) %>%
mutate ( local_segment = str_extract ( assigned_segments , paste0 ( " \\[" , day_segment , " #" , datetime_regex , " ," , datetime_regex , " ;" , timestamp_regex , " ," , timestamp_regex , " \\]" ) ) ) %>%
extract ( local_segment , into = c ( " local_segment" , " timestamps_segment" ) , paste0 ( " \\[(" , day_segment , " #" , datetime_regex , " ," , datetime_regex , " );(" , timestamp_regex , " ," , timestamp_regex , " )\\]" ) ) %>%
select ( - assigned_segments )
return ( data )
}
chunk_episodes <- function ( sensor_episodes ) {
2020-10-26 20:10:32 +01:00
columns_to_drop <- c ( " ^timestamp$" , " utc_date_time" , " local_date_time" , " local_date" , " local_time" , " local_hour" , " local_minute" , " segment_start" , " segment_end" )
chunked_episodes <- sensor_episodes %>%
separate ( col = timestamps_segment ,
into = c ( " segment_start_timestamp" , " segment_end_timestamp" ) ,
sep = " ," , convert = TRUE , remove = TRUE ) %>%
2020-09-28 17:38:47 +02:00
group_by ( local_timezone ) %>%
nest ( ) %>%
2020-10-26 20:10:32 +01:00
mutate ( data = map ( data , ~ .x %>%
distinct ( start_timestamp , end_timestamp , local_segment , .keep_all = TRUE ) %>%
mutate ( start_timestamp = pmax ( start_timestamp , segment_start_timestamp ) ,
end_timestamp = pmin ( end_timestamp , segment_end_timestamp ) ,
duration = ( end_timestamp - start_timestamp ) / ( 1000 * 60 ) ) %>%
select ( - matches ( columns_to_drop ) ) %>%
group_by_at ( vars ( setdiff ( colnames ( .) , c ( " start_timestamp" , " end_timestamp" , " duration" ) ) ) ) %>%
summarize ( start_timestamp = first ( start_timestamp ) ,
end_timestamp = last ( end_timestamp ) ,
duration = sum ( duration ) ) %>%
mutate ( local_start_date_time = format ( lubridate :: as_datetime ( start_timestamp / 1000 , tz = local_timezone ) , " %Y-%m-%d %H:%M:%S" ) ,
local_end_date_time = format ( lubridate :: as_datetime ( end_timestamp / 1000 , tz = local_timezone ) , " %Y-%m-%d %H:%M:%S" ) ) %>%
ungroup ( ) )
) %>%
unnest ( data ) %>%
ungroup ( ) %>%
select ( - local_timezone )
2020-09-28 17:38:47 +02:00
return ( chunked_episodes )
2020-08-28 19:53:00 +02:00
}
2020-08-28 23:40:23 +02:00
2020-10-08 00:11:06 +02:00
fetch_provider_features <- function ( provider , provider_key , sensor_key , sensor_data_files , day_segments_file ) {
2020-08-28 23:40:23 +02:00
sensor_features <- data.frame ( local_segment = character ( ) , stringsAsFactors = FALSE )
day_segments_labels <- read.csv ( day_segments_file , stringsAsFactors = FALSE )
if ( ! " FEATURES" %in% names ( provider ) )
2020-10-08 00:11:06 +02:00
stop ( paste0 ( " Provider config[" , sensor_key , " ][PROVIDERS][" , provider_key , " ] is missing a FEATURES attribute in config.yaml" ) )
2020-08-28 23:40:23 +02:00
if ( provider [ [ " COMPUTE" ] ] == TRUE ) {
2020-10-08 00:11:06 +02:00
code_path <- paste0 ( " src/features/" , sensor_key , " /" , provider [ [ " SRC_FOLDER" ] ] , " /main.R" )
2020-08-28 23:40:23 +02:00
source ( code_path )
features_function <- match.fun ( paste0 ( provider [ [ " SRC_FOLDER" ] ] , " _features" ) )
day_segments <- day_segments_labels %>% pull ( label )
for ( day_segment in day_segments ) {
2020-10-08 00:11:06 +02:00
print ( paste ( rapids_log_tag , " Processing" , sensor_key , provider_key , day_segment ) )
2020-08-28 23:40:23 +02:00
2020-10-08 00:11:06 +02:00
features <- features_function ( sensor_data_files , day_segment , provider )
2020-11-25 20:49:42 +01:00
if ( ! " local_segment" %in% colnames ( features ) )
stop ( paste0 ( " The dataframe returned by the " , sensor_key , " provider '" , provider_key , " ' is missing the 'local_segment' column added by the 'filter_data_by_segment()' function. Check the provider script is using such function and is not removing 'local_segment' by accident (" , code_path , " )\n The 'local_segment' column is used to index a provider's features (each row corresponds to a different day segment instance (e.g. 2020-01-01, 2020-01-02, 2020-01-03, etc.)" ) )
features <- features %>% rename_at ( vars ( ! matches ( " local_segment" ) ) , ~ paste ( sensor_key , provider_key , ., sep = " _" ) )
2020-08-28 23:40:23 +02:00
sensor_features <- merge ( sensor_features , features , all = TRUE )
}
2020-09-28 17:38:47 +02:00
} else { # This is redundant, if COMPUTE is FALSE this script will be never executed
2020-08-28 23:40:23 +02:00
for ( feature in provider [ [ " FEATURES" ] ] )
sensor_features [ , feature ] <- NA
}
2020-09-28 17:38:47 +02:00
sensor_features <- sensor_features %>% extract ( col = local_segment ,
into = c ( " local_segment_label" , " local_segment_start_datetime" , " local_segment_end_datetime" ) ,
" (.*)#(.*),(.*)" ,
remove = FALSE )
2020-08-28 23:40:23 +02:00
return ( sensor_features )
}