2020-08-28 19:53:00 +02:00
library ( " stringr" )
2020-08-28 23:40:23 +02:00
rapids_log_tag <- " RAPIDS:"
2020-12-03 00:41:03 +01:00
filter_data_by_segment <- function ( data , time_segment ) {
# Filter the rows that belong to time_segment, and put the segment full name in a new column for grouping
2020-09-28 17:38:47 +02:00
datetime_regex = " [0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}"
timestamp_regex = " [0-9]{13}"
data <- data %>%
2020-12-03 00:41:03 +01:00
filter ( grepl ( paste0 ( " \\[" , time_segment , " #" ) , assigned_segments ) ) %>%
mutate ( local_segment = str_extract ( assigned_segments , paste0 ( " \\[" , time_segment , " #" , datetime_regex , " ," , datetime_regex , " ;" , timestamp_regex , " ," , timestamp_regex , " \\]" ) ) ) %>%
extract ( local_segment , into = c ( " local_segment" , " timestamps_segment" ) , paste0 ( " \\[(" , time_segment , " #" , datetime_regex , " ," , datetime_regex , " );(" , timestamp_regex , " ," , timestamp_regex , " )\\]" ) ) %>%
2020-09-28 17:38:47 +02:00
select ( - assigned_segments )
2021-04-17 00:02:43 +02:00
# chunk episodes
if ( nrow ( data ) > 0 && all ( c ( " start_timestamp" , " end_timestamp" ) %in% colnames ( data ) ) )
data <- chunk_episodes ( data )
2020-09-28 17:38:47 +02:00
return ( data )
}
chunk_episodes <- function ( sensor_episodes ) {
2021-03-05 23:49:37 +01:00
columns_to_drop <- c ( " ^timestamp$" , " local_date_time" , " local_date" , " local_time" , " local_hour" , " local_minute" , " segment_start" , " segment_end" )
2020-10-26 20:10:32 +01:00
chunked_episodes <- sensor_episodes %>%
separate ( col = timestamps_segment ,
into = c ( " segment_start_timestamp" , " segment_end_timestamp" ) ,
sep = " ," , convert = TRUE , remove = TRUE ) %>%
2020-09-28 17:38:47 +02:00
group_by ( local_timezone ) %>%
nest ( ) %>%
2020-10-26 20:10:32 +01:00
mutate ( data = map ( data , ~ .x %>%
distinct ( start_timestamp , end_timestamp , local_segment , .keep_all = TRUE ) %>%
mutate ( start_timestamp = pmax ( start_timestamp , segment_start_timestamp ) ,
end_timestamp = pmin ( end_timestamp , segment_end_timestamp ) ,
duration = ( end_timestamp - start_timestamp ) / ( 1000 * 60 ) ) %>%
select ( - matches ( columns_to_drop ) ) %>%
group_by_at ( vars ( setdiff ( colnames ( .) , c ( " start_timestamp" , " end_timestamp" , " duration" ) ) ) ) %>%
summarize ( start_timestamp = first ( start_timestamp ) ,
end_timestamp = last ( end_timestamp ) ,
duration = sum ( duration ) ) %>%
mutate ( local_start_date_time = format ( lubridate :: as_datetime ( start_timestamp / 1000 , tz = local_timezone ) , " %Y-%m-%d %H:%M:%S" ) ,
local_end_date_time = format ( lubridate :: as_datetime ( end_timestamp / 1000 , tz = local_timezone ) , " %Y-%m-%d %H:%M:%S" ) ) %>%
ungroup ( ) )
) %>%
unnest ( data ) %>%
ungroup ( ) %>%
select ( - local_timezone )
2020-09-28 17:38:47 +02:00
return ( chunked_episodes )
2020-08-28 19:53:00 +02:00
}
2020-08-28 23:40:23 +02:00
2020-12-03 00:41:03 +01:00
fetch_provider_features <- function ( provider , provider_key , sensor_key , sensor_data_files , time_segments_file ) {
2020-08-28 23:40:23 +02:00
sensor_features <- data.frame ( local_segment = character ( ) , stringsAsFactors = FALSE )
2020-12-03 00:41:03 +01:00
time_segments_labels <- read.csv ( time_segments_file , stringsAsFactors = FALSE )
2020-08-28 23:40:23 +02:00
if ( ! " FEATURES" %in% names ( provider ) )
2020-10-08 00:11:06 +02:00
stop ( paste0 ( " Provider config[" , sensor_key , " ][PROVIDERS][" , provider_key , " ] is missing a FEATURES attribute in config.yaml" ) )
2020-08-28 23:40:23 +02:00
if ( provider [ [ " COMPUTE" ] ] == TRUE ) {
2021-03-15 00:52:14 +01:00
source ( provider [ [ " SRC_SCRIPT" ] ] )
features_function <- match.fun ( paste0 ( tolower ( provider_key ) , " _features" ) )
2020-12-03 00:41:03 +01:00
time_segments <- time_segments_labels %>% pull ( label )
2021-06-26 01:06:46 +02:00
if ( length ( time_segments ) == 0 ) {
time_segments <- c ( " " )
}
2020-12-03 00:41:03 +01:00
for ( time_segment in time_segments ) {
print ( paste ( rapids_log_tag , " Processing" , sensor_key , provider_key , time_segment ) )
2020-08-28 23:40:23 +02:00
2020-12-03 00:41:03 +01:00
features <- features_function ( sensor_data_files , time_segment , provider )
2020-11-25 20:49:42 +01:00
if ( ! " local_segment" %in% colnames ( features ) )
2021-03-15 00:52:14 +01:00
stop ( paste0 ( " The dataframe returned by the " , sensor_key , " provider '" , provider_key , " ' is missing the 'local_segment' column added by the 'filter_data_by_segment()' function. Check the provider script is using such function and is not removing 'local_segment' by accident (" , provider [ [ " SRC_SCRIPT" ] ] , " )\n The 'local_segment' column is used to index a provider's features (each row corresponds to a different time segment instance (e.g. 2020-01-01, 2020-01-02, 2020-01-03, etc.)" ) )
2020-11-25 20:49:42 +01:00
features <- features %>% rename_at ( vars ( ! matches ( " local_segment" ) ) , ~ paste ( sensor_key , provider_key , ., sep = " _" ) )
2020-08-28 23:40:23 +02:00
sensor_features <- merge ( sensor_features , features , all = TRUE )
}
2020-09-28 17:38:47 +02:00
} else { # This is redundant, if COMPUTE is FALSE this script will be never executed
2020-08-28 23:40:23 +02:00
for ( feature in provider [ [ " FEATURES" ] ] )
sensor_features [ , feature ] <- NA
}
2021-03-23 00:15:13 +01:00
sensor_features <- sensor_features %>% mutate ( local_segment = str_remove ( local_segment , " _RR\\d+SS" ) ) %>%
extract ( col = local_segment ,
2020-09-28 17:38:47 +02:00
into = c ( " local_segment_label" , " local_segment_start_datetime" , " local_segment_end_datetime" ) ,
" (.*)#(.*),(.*)" ,
remove = FALSE )
2020-08-28 23:40:23 +02:00
return ( sensor_features )
}