diff --git a/src/data/datetime/assign_to_event_segments.R b/src/data/datetime/assign_to_event_segments.R new file mode 100644 index 00000000..3b94ab05 --- /dev/null +++ b/src/data/datetime/assign_to_event_segments.R @@ -0,0 +1,42 @@ +validate_overlapping_event_segments <- function(segments){ + # Check for overlapping segments (not allowed because our resampling episode algorithm would have to have a second instead of minute granularity that increases storage and computation time) + overlapping <- segments %>% + group_by(label) %>% + arrange(segment_start_ts) %>% + mutate(overlaps = if_else(segment_start_ts <= lag(segment_end_ts), TRUE, FALSE), + overlapping_segments = glue("a) [{lag(label)},\t{lag(event_timestamp)},\t{lag(length)},\t{lag(shift)},\t{lag(shift_direction)},\t{lag(device_id)}] \n", + "b) [{label},\t{event_timestamp},\t{length},\t{shift},\t{shift_direction},\t{device_id}]")) + if(any(overlapping$overlaps, na.rm = TRUE)) + stop("One or more event time segments overlap for ",overlapping$device_id[[1]], + ", modify their lengths so they don't:\n", paste0(overlapping %>% filter(overlaps == TRUE) %>% pull(overlapping_segments), collapse = "\n")) +} + +infer_event_segments <- function(tz, segments){ + time_format_fn <- stamp("23:51:15", orders="HMS", quiet = TRUE) + inferred <- segments %>% + mutate(shift = ifelse(shift == "0", "0seconds", shift), + segment_start_ts = event_timestamp + (as.integer(seconds(lubridate::duration(shift))) * ifelse(shift_direction >= 0, 1, -1) * 1000), + segment_end_ts = segment_start_ts + (as.integer(seconds(lubridate::duration(length))) * 1000), + segment_id_start = lubridate::as_datetime(segment_start_ts/1000, tz = tz), + segment_id_end = lubridate::as_datetime(segment_end_ts/1000, tz = tz), + segment_end_ts = segment_end_ts + 999, + segment_id = glue("[{label}#{start_date} {start_time},{end_date} {end_time};{segment_start_ts},{segment_end_ts}]", + start_date=lubridate::date(segment_id_start), + start_time=time_format_fn(segment_id_start), + end_date=lubridate::date(segment_id_end), + end_time=time_format_fn(segment_id_end))) + validate_overlapping_event_segments(inferred) + return(inferred) +} + +assign_to_event_segments <- function(sensor_data, time_segments){ + sensor_data <- sensor_data %>% + group_by(local_timezone) %>% + nest() %>% + mutate(inferred_time_segments = map(local_timezone, infer_event_segments, time_segments), + data = map2(data, inferred_time_segments, assign_rows_to_segments)) %>% + select(-inferred_time_segments) %>% + unnest(data) %>% + arrange(timestamp) %>% + ungroup() +} \ No newline at end of file diff --git a/src/data/datetime/assign_to_periodic_segments.R b/src/data/datetime/assign_to_periodic_segments.R new file mode 100644 index 00000000..e2c13262 --- /dev/null +++ b/src/data/datetime/assign_to_periodic_segments.R @@ -0,0 +1,110 @@ +day_type_delay <- function(time_segments, day_type, include_past_periodic_segments){ + # Return a delay in days to consider or not the first row of data + delay <- time_segments %>% + mutate(length_duration = duration(length)) %>% + filter(repeats_on == day_type) %>% arrange(-length_duration) %>% + pull(length_duration) %>% + first() + return(if_else(is.na(delay) | include_past_periodic_segments == FALSE, duration("0days"), delay)) +} + +get_segment_dates <- function(data, local_timezone, day_type, delay){ + # Based on the data we are processing we extract unique dates to build segments + dates <- data %>% + distinct(local_date) %>% + mutate(local_date_obj = date(lubridate::ymd(local_date, tz = local_timezone))) %>% + complete(local_date_obj = seq(date(min(local_date_obj) - delay), date(max(local_date_obj) + delay), by="days")) %>% + mutate(local_date = replace_na(as.character(date(local_date_obj)))) + + if(day_type == "every_day") + dates <- dates %>% mutate(every_day = 0) + else if (day_type == "wday") + dates <- dates %>% mutate(wday = wday(local_date_obj, week_start = 1)) + else if (day_type == "mday") + dates <- dates %>% mutate(mday = mday(local_date_obj)) + else if (day_type == "qday") + dates <- dates %>% mutate(qday = qday(local_date_obj)) + else if (day_type == "yday") + dates <- dates %>% mutate(yday = yday(local_date_obj)) + return(dates) +} + +infer_existent_periodic_segments <- function(existent_dates, segments){ + # build the actual time segments taking into account the data and users' requested length and repeat schedule + # segment datetime labels are computed on UTC + crossing(segments, existent_dates) %>% + pivot_longer(cols = c(every_day,wday, mday, qday, yday), names_to = "day_type", values_to = "day_value") %>% + filter(repeats_on == day_type & repeats_value == day_value) %>% + mutate(segment_id_start = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM")) + period(overlap_duration), + segment_id_end = segment_id_start + lubridate::duration(length)) +} + +dedup_nonoverlapping_periodic_segments <- function(nested_inferred_time_segments){ + # Overlapping segments exist when their length is longer than their repeating frequency, e.g. twoday segements starting on every day + # In process_time_segments we decompose those segments into non-overlapping ones, e.g. twodayA +0days and twodayB +1days + # This means that any date will have more than one non-overlapping instances, that we need to dedup + # We choose alternating non-overlapping instances to guarantee any data row is only neeeded in one instance at a time + # d1,r1,twoday0 + # d2,r2,twoday0 twoday1 + # d3,r3,twoday1 twoday0 + # d4,r4,twoday0 twoday1 + new_segments <- data.frame(nested_inferred_time_segments %>% + group_by(original_label) %>% + mutate(max_groups = max(overlap_id) + 1) %>% + # select(label, segment_id_start, segment_id_end, overlap_id, max_groups) %>% + nest() %>% + mutate(data = map(data, function(nested_data){ + nested_data <- nested_data %>% arrange( segment_id_start, segment_id_end) %>% + group_by(segment_id_start) %>% + mutate(n_id = ((cur_group_id()-1) %% max_groups)) %>% + filter(overlap_id == n_id) %>% + # select(label, segment_id_start, overlap_id, n_id) %>% + ungroup() + })) %>% + unnest(cols = data) %>% + ungroup()) +} + + + +add_periodic_segment_timestamps_and_id <- function(segments, local_timezone){ + # segment timestamps are computed on the data's timezone(s) + time_format_fn <- stamp("23:51:15", orders="HMS", quiet = TRUE) + segments %>% mutate(segment_start_ts = as.numeric(lubridate::force_tz(segment_id_start, tzone = local_timezone)) * 1000, + segment_end_ts = segment_start_ts + as.numeric(lubridate::duration(length)) * 1000 + 999, + segment_id = glue("[{label}#{start_date} {start_time},{end_date} {end_time};{segment_start_ts},{segment_end_ts}]", + start_date=lubridate::date(segment_id_start), + start_time=time_format_fn(segment_id_start), + end_date=lubridate::date(segment_id_end), + end_time=time_format_fn(segment_id_end) )) %>% + drop_na(segment_start_ts, segment_end_ts) +} + +assign_to_periodic_segments <- function(sensor_data, time_segments, include_past_periodic_segments){ + time_segments <- time_segments %>% mutate(length_duration = duration(length)) + every_day_delay <- duration("0days") + wday_delay <- day_type_delay(time_segments, "wday", include_past_periodic_segments) + mday_delay <- day_type_delay(time_segments, "mday", include_past_periodic_segments) + qday_delay <- day_type_delay(time_segments, "qday", include_past_periodic_segments) + yday_delay <- day_type_delay(time_segments, "yday", include_past_periodic_segments) + + sensor_data <- sensor_data %>% + group_by(local_timezone) %>% + nest() %>% + mutate(every_date = map2(data, local_timezone, get_segment_dates, "every_day", every_day_delay), + week_dates = map2(data, local_timezone, get_segment_dates, "wday", wday_delay), + month_dates = map2(data, local_timezone, get_segment_dates, "mday", mday_delay), + quarter_dates = map2(data, local_timezone, get_segment_dates, "qday", qday_delay), + year_dates = map2(data, local_timezone, get_segment_dates, "yday", yday_delay), + existent_dates = pmap(list(every_date, week_dates, month_dates, quarter_dates, year_dates), function(every_date, week_dates, month_dates, quarter_dates, year_dates) reduce(list(every_date, week_dates,month_dates, quarter_dates, year_dates), .f=full_join)), + inferred_time_segments = map(existent_dates, infer_existent_periodic_segments, time_segments), + inferred_time_segments = map(inferred_time_segments, dedup_nonoverlapping_periodic_segments), + inferred_time_segments = map(inferred_time_segments, add_periodic_segment_timestamps_and_id, local_timezone), + data = map2(data, inferred_time_segments, assign_rows_to_segments)) %>% + select(-existent_dates, -inferred_time_segments, -every_date, -week_dates, -month_dates, -quarter_dates, -year_dates) %>% + unnest(cols = data) %>% + arrange(timestamp) %>% + ungroup() + + return(sensor_data) +} \ No newline at end of file diff --git a/src/data/datetime/assign_to_time_segment.R b/src/data/datetime/assign_to_time_segment.R index f31807b7..bf2eb551 100644 --- a/src/data/datetime/assign_to_time_segment.R +++ b/src/data/datetime/assign_to_time_segment.R @@ -1,81 +1,19 @@ library("tidyverse") +library("glue") library("lubridate", warn.conflicts = F) options(scipen=999) -day_type_delay <- function(time_segments, day_type, include_past_periodic_segments){ - delay <- time_segments %>% mutate(length_duration = duration(length)) %>% filter(repeats_on == day_type) %>% arrange(-length_duration) %>% pull(length_duration) %>% first() - return(if_else(is.na(delay) | include_past_periodic_segments == FALSE, duration("0days"), delay)) -} - -get_segment_dates <- function(data, local_timezone, day_type, delay){ - dates <- data %>% - distinct(local_date) %>% - mutate(local_date_obj = date(lubridate::ymd(local_date, tz = local_timezone))) %>% - complete(local_date_obj = seq(date(min(local_date_obj) - delay), date(max(local_date_obj) + delay), by="days")) %>% - mutate(local_date = replace_na(as.character(date(local_date_obj)))) - - if(day_type == "every_day") - dates <- dates %>% mutate(every_day = 0) - else if (day_type == "wday") - dates <- dates %>% mutate(wday = wday(local_date_obj, week_start = 1)) - else if (day_type == "mday") - dates <- dates %>% mutate(mday = mday(local_date_obj)) - else if (day_type == "qday") - dates <- dates %>% mutate(qday = qday(local_date_obj)) - else if (day_type == "yday") - dates <- dates %>% mutate(yday = yday(local_date_obj)) - return(dates) -} - -create_nonoverlapping_periodic_segments <- function(nested_inferred_time_segments){ - new_segments <- (data.frame(nested_inferred_time_segments %>% - group_by(original_label) %>% - mutate(max_groups = max(overlap_id) + 1) %>% - # select(label, segment_id_start, segment_id_end, overlap_id, max_groups) %>% - nest() %>% - mutate(data = map(data, function(nested_data){ - nested_data <- nested_data %>% arrange( segment_id_start, segment_id_end) %>% - group_by(segment_id_start) %>% - mutate(n_id = ((cur_group_id()-1) %% max_groups)) %>% - filter(overlap_id == n_id) %>% - # select(label, segment_id_start, overlap_id, n_id) %>% - ungroup() - })) %>% - unnest(cols = data) %>% - ungroup() - )) - return(new_segments) -} - - -assign_rows_to_segments <- function(nested_data, nested_inferred_time_segments){ - nested_data <- nested_data %>% mutate(assigned_segments = "") - for(i in seq_len(nrow(nested_inferred_time_segments))) { - segment <- nested_inferred_time_segments[i,] - nested_data$assigned_segments <- ifelse(segment$segment_start_ts<= nested_data$timestamp & segment$segment_end_ts >= nested_data$timestamp, - stringi::stri_c(nested_data$assigned_segments, segment$segment_id, sep = "|"), nested_data$assigned_segments) +assign_rows_to_segments <- function(data, segments){ + # This function is used by all segment types, we use data.tables because they are fast + data <- data.table::as.data.table(data) + data[, assigned_segments := ""] + for(i in seq_len(nrow(segments))) { + segment <- segments[i,] + data[segment$segment_start_ts<= timestamp & segment$segment_end_ts >= timestamp, + assigned_segments := stringi::stri_c(assigned_segments, segment$segment_id, sep = "|")] } - nested_data$assigned_segments <- substring(nested_data$assigned_segments, 2) - return(nested_data) -} - -assign_rows_to_segments_frequency <- function(nested_data, nested_timezone, time_segments){ - for(i in 1:nrow(time_segments)) { - segment <- time_segments[i,] - nested_data$assigned_segments <- ifelse(segment$segment_start_ts<= nested_data$local_time_obj & segment$segment_end_ts >= nested_data$local_time_obj, - # The segment_id is assambled on the fly because it depends on each row's local_date and timezone - stringi::stri_c("[", - segment[["label"]], "#", - nested_data$local_date, " ", - segment[["segment_id_start_time"]], ",", - nested_data$local_date, " ", - segment[["segment_id_end_time"]], ";", - as.numeric(lubridate::as_datetime(stringi::stri_c(nested_data$local_date, segment$segment_id_start_time), tz = nested_timezone)) * 1000, ",", - as.numeric(lubridate::as_datetime(stringi::stri_c(nested_data$local_date, segment$segment_id_end_time), tz = nested_timezone)) * 1000 + 999, - "]"), - nested_data$assigned_segments) - } - return(nested_data) + data[,assigned_segments:=substring(assigned_segments, 2)] + data } assign_to_time_segment <- function(sensor_data, time_segments, time_segments_type, include_past_periodic_segments){ @@ -83,116 +21,14 @@ assign_to_time_segment <- function(sensor_data, time_segments, time_segments_typ if(nrow(sensor_data) == 0 || nrow(time_segments) == 0) return(sensor_data %>% mutate(assigned_segments = NA)) - if(time_segments_type == "FREQUENCY"){ - - time_segments <- time_segments %>% mutate(start_time = lubridate::hm(start_time), - end_time = start_time + minutes(length) - seconds(1), - segment_id_start_time = paste(str_pad(hour(start_time),2, pad="0"), str_pad(minute(start_time),2, pad="0"), str_pad(second(start_time),2, pad="0"),sep =":"), - segment_id_end_time = paste(str_pad(hour(ymd("1970-01-01") + end_time),2, pad="0"), str_pad(minute(ymd("1970-01-01") + end_time),2, pad="0"), str_pad(second(ymd("1970-01-01") + end_time),2, pad="0"),sep =":"), # add ymd("1970-01-01") to get a real time instead of duration - segment_start_ts = as.numeric(start_time), - segment_end_ts = as.numeric(end_time)) - - sensor_data <- sensor_data %>% mutate(local_time_obj = as.numeric(lubridate::hms(local_time)), - assigned_segments = "") - - sensor_data <- sensor_data %>% - group_by(local_timezone) %>% - nest() %>% - mutate(data = map2(data, local_timezone, assign_rows_to_segments_frequency, time_segments)) %>% - unnest(cols = data) %>% - arrange(timestamp) %>% - select(-local_time_obj) %>% - ungroup() - + if (time_segments_type == "FREQUENCY" || time_segments_type == "PERIODIC"){ #FREQUENCY segments are just syntactic sugar for PERIODIC + source("src/data/datetime/assign_to_periodic_segments.R") + sensor_data <- assign_to_periodic_segments(sensor_data, time_segments, include_past_periodic_segments) return(sensor_data) - - } else if (time_segments_type == "PERIODIC"){ - - # We need to take into account segment start dates that could include the first day of data - time_segments <- time_segments %>% mutate(length_duration = duration(length)) - every_day_delay <- duration("0days") - wday_delay <- day_type_delay(time_segments, "wday", include_past_periodic_segments) - mday_delay <- day_type_delay(time_segments, "mday", include_past_periodic_segments) - qday_delay <- day_type_delay(time_segments, "qday", include_past_periodic_segments) - yday_delay <- day_type_delay(time_segments, "yday", include_past_periodic_segments) - - sensor_data <- sensor_data %>% - group_by(local_timezone) %>% - nest() %>% - # get existent days that we need to start segments from - mutate(every_date = map2(data, local_timezone, get_segment_dates, "every_day", every_day_delay), - week_dates = map2(data, local_timezone, get_segment_dates, "wday", wday_delay), - month_dates = map2(data, local_timezone, get_segment_dates, "mday", mday_delay), - quarter_dates = map2(data, local_timezone, get_segment_dates, "qday", qday_delay), - year_dates = map2(data, local_timezone, get_segment_dates, "yday", yday_delay), - existent_dates = pmap(list(every_date, week_dates, month_dates, quarter_dates, year_dates), - function(every_date, week_dates, month_dates, quarter_dates, year_dates) reduce(list(every_date, week_dates,month_dates, quarter_dates, year_dates), .f=full_join)), - # build the actual time segments taking into account the users requested length and repeat schedule - inferred_time_segments = map(existent_dates, - ~ crossing(time_segments, .x) %>% - pivot_longer(cols = c(every_day,wday, mday, qday, yday), names_to = "day_type", values_to = "day_value") %>% - filter(repeats_on == day_type & repeats_value == day_value) %>% - # The segment ids (segment_id_start and segment_id_end) are computed in UTC to avoid having different labels for instances of a segment that happen in different timezones - mutate(segment_id_start = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM")) + period(overlap_duration), - segment_id_end = segment_id_start + lubridate::duration(length), - # The actual segments are computed using timestamps taking into account the timezone - segment_start_ts = as.numeric(lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = local_timezone) + period(overlap_duration)) * 1000, - segment_end_ts = segment_start_ts + as.numeric(lubridate::duration(length)) * 1000 + 999, - segment_id = paste0("[", - paste0(label,"#", - paste0(lubridate::date(segment_id_start), " ", - paste(str_pad(hour(segment_id_start),2, pad="0"), str_pad(minute(segment_id_start),2, pad="0"), str_pad(second(segment_id_start),2, pad="0"),sep =":"), ",", - lubridate::date(segment_id_end), " ", - paste(str_pad(hour(segment_id_end),2, pad="0"), str_pad(minute(segment_id_end),2, pad="0"), str_pad(second(segment_id_end),2, pad="0"),sep =":")),";", - paste0(segment_start_ts, ",", segment_end_ts)), - "]")) %>% - # drop time segments with an invalid start or end time (mostly due to daylight saving changes, e.g. 2020-03-08 02:00:00 EST does not exist, clock jumps from 01:59am to 03:00am) - drop_na(segment_start_ts, segment_end_ts)), - inferred_time_segments = map(inferred_time_segments, create_nonoverlapping_periodic_segments), - data = map2(data, inferred_time_segments, assign_rows_to_segments) - ) %>% - select(-existent_dates, -inferred_time_segments, -every_date, -week_dates, -month_dates, -quarter_dates, -year_dates) %>% - unnest(cols = data) %>% - arrange(timestamp) - } else if ( time_segments_type == "EVENT"){ - - sensor_data <- sensor_data %>% - group_by(local_timezone) %>% - nest() %>% - mutate(inferred_time_segments = map(local_timezone, function(tz){ - inferred <- time_segments %>% - mutate(shift = ifelse(shift == "0", "0seconds", shift), - segment_start_ts = event_timestamp + (as.integer(seconds(lubridate::duration(shift))) * ifelse(shift_direction >= 0, 1, -1) * 1000), - segment_end_ts = segment_start_ts + (as.integer(seconds(lubridate::duration(length))) * 1000), - # these start and end datetime objects are for labeling only - segment_id_start = lubridate::as_datetime(segment_start_ts/1000, tz = tz), - segment_id_end = lubridate::as_datetime(segment_end_ts/1000, tz = tz), - segment_end_ts = segment_end_ts + 999, - segment_id = paste0("[", - paste0(label,"#", - paste0(lubridate::date(segment_id_start), " ", - paste(str_pad(hour(segment_id_start),2, pad="0"), str_pad(minute(segment_id_start),2, pad="0"), str_pad(second(segment_id_start),2, pad="0"),sep =":"), ",", - lubridate::date(segment_id_end), " ", - paste(str_pad(hour(segment_id_end),2, pad="0"), str_pad(minute(segment_id_end),2, pad="0"), str_pad(second(segment_id_end),2, pad="0"),sep =":")),";", - paste0(segment_start_ts, ",", segment_end_ts)), - "]")) - # Check that for overlapping segments (not allowed because our resampling episode algorithm would have to have a second instead of minute granularity that increases storage and computation time) - overlapping <- inferred %>% group_by(label) %>% arrange(segment_start_ts) %>% - mutate(overlaps = if_else(segment_start_ts <= lag(segment_end_ts), TRUE, FALSE), - overlapping_segments = paste(paste(lag(label), lag(event_timestamp), lag(length), lag(shift), lag(shift_direction), lag(device_id), sep = ","),"and", - paste(label, event_timestamp, length, shift, shift_direction, device_id, sep = ","))) - if(any(overlapping$overlaps, na.rm = TRUE)){ - stop(paste0("\n\nOne or more event time segments overlap for ",overlapping$device_id[[1]],", modify their lengths so they don't:\n", paste0(overlapping %>% filter(overlaps == TRUE) %>% pull(overlapping_segments), collapse = "\n"), "\n\n")) - } else{ - return(inferred) - }}), - data = map2(data, inferred_time_segments, assign_rows_to_segments)) %>% - select(-inferred_time_segments) %>% - unnest(data) %>% - arrange(timestamp) + source("src/data/datetime/assign_to_event_segments.R") + sensor_data <- assign_to_event_segments(sensor_data, time_segments) + return(sensor_data) } - - return(sensor_data %>% ungroup()) } \ No newline at end of file diff --git a/src/data/datetime/process_time_segments.R b/src/data/datetime/process_time_segments.R index 92ebc249..96d6d634 100644 --- a/src/data/datetime/process_time_segments.R +++ b/src/data/datetime/process_time_segments.R @@ -105,16 +105,22 @@ validate_frequency_segments <- function(segments){ } prepare_frequency_segments <- function(segments){ + #FREQUENCY segments are just syntactic sugar for PERIODIC validate_frequency_segments(segments) - stamp_fn <- stamp("23:10", orders = c("HM"), quiet = TRUE) + stamp_fn <- stamp("23:10:00", orders = c("HMS"), quiet = TRUE) new_segments <- data.frame(start_time = seq.POSIXt(from = ymd_hms("2020-01-01 00:00:00"), to=ymd_hms("2020-01-02 00:00:00"), by=paste(segments$length, "min"))) new_segments <- new_segments %>% head(-1) %>% - mutate(start_time = stamp_fn(start_time), - length = segments$length, - label = paste0(segments$label, str_pad(row_number()-1, width = 4, pad = "0"))) + mutate(label = paste0(segments$label, str_pad(row_number()-1, width = 4, pad = "0")), + start_time = stamp_fn(start_time), + length = paste0((segments$length * 60)-1, "S"), + repeats_on = "every_day", + repeats_value=0, + overlap_id = 0, + original_label = label, + overlap_duration = "0D") }