From d0858f88332f8f07306e500d310cadbd31bd92ef Mon Sep 17 00:00:00 2001 From: JulioV Date: Mon, 22 Mar 2021 19:15:13 -0400 Subject: [PATCH 1/4] Fix overlapping periodic time segments --- data/external/timesegments_event.csv | 8 +- data/external/timesegments_periodic.csv | 4 +- rules/preprocessing.smk | 8 +- src/data/compute_time_segments.py | 216 ------------------ src/data/datetime/assign_to_time_segment.R | 28 ++- src/data/datetime/process_time_segments.R | 204 +++++++++++++++++ .../utils/join_features_from_providers.R | 2 +- ...sor_features_for_individual_participants.R | 13 +- src/features/utils/utils.R | 4 +- src/features/utils/utils.py | 1 + 10 files changed, 246 insertions(+), 242 deletions(-) delete mode 100644 src/data/compute_time_segments.py create mode 100644 src/data/datetime/process_time_segments.R diff --git a/data/external/timesegments_event.csv b/data/external/timesegments_event.csv index 9f2a6eac..7d54b602 100644 --- a/data/external/timesegments_event.csv +++ b/data/external/timesegments_event.csv @@ -3,7 +3,7 @@ stress,1587661220000,1H,0M,1,a748ee1a-1d0b-4ae9-9074-279a2b6ba524 stress,1587747620000,4H,4H,-1,a748ee1a-1d0b-4ae9-9074-279a2b6ba524 stress,1587906020000,3H,0M,1,a748ee1a-1d0b-4ae9-9074-279a2b6ba524 stress,1588003220000,7H,4H,-1,a748ee1a-1d0b-4ae9-9074-279a2b6ba524 -stress,1588172420000,9H,0,-1,a748ee1a-1d0b-4ae9-9074-279a2b6ba524 -mood,1587661220000,1H,0,0,a748ee1a-1d0b-4ae9-9074-279a2b6ba524 -mood,1587747620000,1D,0,0,a748ee1a-1d0b-4ae9-9074-279a2b6ba524 -mood,1587906020000,7D,0,0,a748ee1a-1d0b-4ae9-9074-279a2b6ba524 +stress,1588172420000,9H,0M,-1,a748ee1a-1d0b-4ae9-9074-279a2b6ba524 +mood,1587661220000,1H,0M,0,a748ee1a-1d0b-4ae9-9074-279a2b6ba524 +mood,1587747620000,1D,0M,0,a748ee1a-1d0b-4ae9-9074-279a2b6ba524 +mood,1587906020000,7D,0M,0,a748ee1a-1d0b-4ae9-9074-279a2b6ba524 diff --git a/data/external/timesegments_periodic.csv b/data/external/timesegments_periodic.csv index d47a2d2e..c444224e 100644 --- a/data/external/timesegments_periodic.csv +++ b/data/external/timesegments_periodic.csv @@ -3,4 +3,6 @@ daily,00:00:00,23H 59M 59S,every_day,0 morning,06:00:00,5H 59M 59S,every_day,0 afternoon,12:00:00,5H 59M 59S,every_day,0 evening,18:00:00,5H 59M 59S,every_day,0 -night,00:00:00,5H 59M 59S,every_day,0 \ No newline at end of file +night,00:00:00,5H 59M 59S,every_day,0 +two_weeks_overlapping,00:00:00,13D 23H 59M 59S,every_day,0 +weekends,00:00:00,1D 23H 59M 59S,wday,6 diff --git a/rules/preprocessing.smk b/rules/preprocessing.smk index 5156b1a9..5001734b 100644 --- a/rules/preprocessing.smk +++ b/rules/preprocessing.smk @@ -23,10 +23,10 @@ rule pull_phone_data: script: "../src/data/streams/pull_phone_data.R" -rule compute_time_segments: +rule process_time_segments: input: - config["TIME_SEGMENTS"]["FILE"], - "data/external/participant_files/{pid}.yaml" + segments_file = config["TIME_SEGMENTS"]["FILE"], + participant_file = "data/external/participant_files/{pid}.yaml" params: time_segments_type = config["TIME_SEGMENTS"]["TYPE"], pid = "{pid}" @@ -34,7 +34,7 @@ rule compute_time_segments: segments_file = "data/interim/time_segments/{pid}_time_segments.csv", segments_labels_file = "data/interim/time_segments/{pid}_time_segments_labels.csv", script: - "../src/data/compute_time_segments.py" + "../src/data/datetime/process_time_segments.R" rule phone_readable_datetime: input: diff --git a/src/data/compute_time_segments.py b/src/data/compute_time_segments.py deleted file mode 100644 index 6f48a5fc..00000000 --- a/src/data/compute_time_segments.py +++ /dev/null @@ -1,216 +0,0 @@ -import pandas as pd -import warnings -import yaml - -def is_valid_frequency_segments(time_segments, time_segments_file): - """ - returns true if time_segment has the expected structure for generating frequency segments; - raises ValueError exception otherwise. - """ - - valid_columns = ["label", "length"] - if set(time_segments.columns) != set(valid_columns): - error_message = 'The FREQUENCY time segments file in [TIME_SEGMENTS][FILE] must have two columns: label, and length ' \ - 'but instead we found {}. Modify {}'.format(list(time_segments.columns), time_segments_file) - raise ValueError(error_message) - - if time_segments.shape[0] > 1: - message = 'The FREQUENCY time segments file in [TIME_SEGMENTS][FILE] can only have 1 row.' \ - 'Modify {}'.format(time_segments_file) - raise ValueError(message) - - if not pd.api.types.is_integer_dtype(time_segments.dtypes['length']): - message = 'The column length in the FREQUENCY time segments file in [TIME_SEGMENTS][FILE] must be integer but instead is ' \ - '{}. . This usually means that not all values in this column are formed by digits. Modify {}'.format(time_segments.dtypes['length'], time_segments_file) - raise ValueError(message) - - if time_segments.iloc[0].loc['length'] < 0: - message = 'The value in column length in the FREQUENCY time segments file in [TIME_SEGMENTS][FILE] must be positive but instead is ' \ - '{}. Modify {}'.format(time_segments.iloc[0].loc['length'], time_segments_file) - raise ValueError(message) - if time_segments.iloc[0].loc['length'] >= 1440: - message = 'The column length in the FREQUENCY time segments file in [TIME_SEGMENTS][FILE] must be shorter than a day in minutes (1440) but instead is ' \ - '{}. Modify {}'.format(time_segments.iloc[0].loc['length'], time_segments_file) - raise ValueError(message) - - return True - -def is_valid_periodic_segments(time_segments, time_segments_file): - time_segments = time_segments.copy(deep=True) - - valid_columns = ["label", "start_time", "length", "repeats_on", "repeats_value"] - if set(time_segments.columns) != set(valid_columns): - error_message = 'The PERIODIC time segments file in [TIME_SEGMENTS][FILE] must have five columns: label, start_time, length, repeats_on, repeats_value ' \ - 'but instead we found {}. Modify {}'.format(list(time_segments.columns), time_segments_file) - raise ValueError(error_message) - - valid_repeats_on = ["every_day", "wday", "mday", "qday", "yday"] - if len(list(set(time_segments["repeats_on"]) - set(valid_repeats_on))) > 0: - error_message = 'The column repeats_on in the PERIODIC time segments file in [TIME_SEGMENTS][FILE] can only accept: "every_day", "wday", "mday", "qday", or "yday" ' \ - 'but instead we found {}. Modify {}'.format(list(set(time_segments["repeats_on"])), time_segments_file) - raise ValueError(error_message) - - if not pd.api.types.is_integer_dtype(time_segments.dtypes['repeats_value']): - message = 'The column repeats_value in the PERIODIC time segments file in [TIME_SEGMENTS][FILE] must be integer but instead is ' \ - '{}. . This usually means that not all values in this column are formed by digits. Modify {}'.format(time_segments.dtypes['repeats_value'], time_segments_file) - raise ValueError(message) - - invalid_time_segments = time_segments.query("repeats_on == 'every_day' and repeats_value != 0") - if invalid_time_segments.shape[0] > 0: - message = 'Every row with repeats_on=every_day must have a repeats_value=0 in the PERIODIC time segments file in [TIME_SEGMENTS][FILE].' \ - ' Modify row(s) of segment(s) {} of {}'.format(invalid_time_segments["label"].to_numpy(), time_segments_file) - raise ValueError(message) - - invalid_time_segments = time_segments.query("repeats_on == 'wday' and (repeats_value < 1 | repeats_value > 7)") - if invalid_time_segments.shape[0] > 0: - message = 'Every row with repeats_on=wday must have a repeats_value=[1,7] in the PERIODIC time segments file in [TIME_SEGMENTS][FILE].' \ - ' Modify row(s) of segment(s) {} of {}'.format(invalid_time_segments["label"].to_numpy(), time_segments_file) - raise ValueError(message) - - invalid_time_segments = time_segments.query("repeats_on == 'mday' and (repeats_value < 1 | repeats_value > 31)") - if invalid_time_segments.shape[0] > 0: - message = 'Every row with repeats_on=mday must have a repeats_value=[1,31] in the PERIODIC time segments file in [TIME_SEGMENTS][FILE].' \ - ' Modify row(s) of segment(s) {} of {}'.format(invalid_time_segments["label"].to_numpy(), time_segments_file) - raise ValueError(message) - - invalid_time_segments = time_segments.query("repeats_on == 'qday' and (repeats_value < 1 | repeats_value > 92)") - if invalid_time_segments.shape[0] > 0: - message = 'Every row with repeats_on=qday must have a repeats_value=[1,92] in the PERIODIC time segments file in [TIME_SEGMENTS][FILE].' \ - ' Modify row(s) of segment(s) {} of {}'.format(invalid_time_segments["label"].to_numpy(), time_segments_file) - raise ValueError(message) - - invalid_time_segments = time_segments.query("repeats_on == 'yday' and (repeats_value < 1 | repeats_value > 366)") - if invalid_time_segments.shape[0] > 0: - message = 'Every row with repeats_on=yday must have a repeats_value=[1,366] in the PERIODIC time segments file in [TIME_SEGMENTS][FILE].' \ - ' Modify row(s) of segment(s) {} of {}'.format(invalid_time_segments["label"].to_numpy(), time_segments_file) - raise ValueError(message) - - try: - time_segments["start_time"] = pd.to_datetime(time_segments["start_time"]) - except ValueError as err: - raise ValueError("At least one start_time in the PERIODIC time segments file in [TIME_SEGMENTS][FILE] has an invalid format, it should be HH:MM:SS in 24hr clock({}). Modify {}".format(err, time_segments_file)) - - if(time_segments.shape[0] != time_segments.drop_duplicates().shape[0]): - error_message = 'The PERIODIC time segments file in [TIME_SEGMENTS][FILE] has two or more rows that are identical. ' \ - 'Modify {}'.format(time_segments_file) - raise ValueError(error_message) - - duplicated_labels = time_segments[time_segments["label"].duplicated()] - if(duplicated_labels.shape[0] > 0): - error_message = 'Segements labels must be unique. The PERIODIC time segments file in [TIME_SEGMENTS][FILE] has {} row(s) with the same label {}. ' \ - 'Modify {}'.format(duplicated_labels.shape[0], duplicated_labels["label"].to_numpy(), time_segments_file) - raise ValueError(error_message) - - # TODO Validate string format for lubridate - - return True - -def is_valid_event_segments(time_segments, time_segments_file): - time_segments = time_segments.copy(deep=True) - - valid_columns = ["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"] - if set(time_segments.columns) != set(valid_columns): - error_message = 'The EVENT time segments file in [TIME_SEGMENTS][FILE] must have six columns: label, event_timestamp, length, shift, shift_direction and device_id ' \ - 'but instead we found {}. Modify {}'.format(list(time_segments.columns), time_segments_file) - raise ValueError(error_message) - - if not pd.api.types.is_integer_dtype(time_segments.dtypes['event_timestamp']): - message = 'The column event_timestamp in the EVENT time segments file in [TIME_SEGMENTS][FILE] must be integer but instead is ' \ - '{}. This usually means that not all values in this column are formed by digits. Modify {}'.format(time_segments.dtypes['event_timestamp'], time_segments_file) - raise ValueError(message) - - valid_shift_direction_values = [1, -1, 0] - provided_values = time_segments["shift_direction"].unique() - if len(list(set(provided_values) - set(valid_shift_direction_values))) > 0: - error_message = 'The values of shift_direction column in the EVENT time segments file in [TIME_SEGMENTS][FILE] can only be 1, -1 or 0 ' \ - 'but instead we found {}. Modify {}'.format(provided_values, time_segments_file) - raise ValueError(error_message) - - if(time_segments.shape[0] != time_segments.drop_duplicates().shape[0]): - error_message = 'The EVENT time segments file in [TIME_SEGMENTS][FILE] has two or more rows that are identical. ' \ - 'Modify {}'.format(time_segments_file) - raise ValueError(error_message) - - # TODO Validate string format for lubridate of length and shift - # TODO validate unique labels per participant - return True - - -def parse_frequency_segments(time_segments: pd.DataFrame) -> pd.DataFrame: - """ - returns a table with rows identifying start and end of time slots with frequency freq (in minutes). For example, - for freq = 10 it outputs: - bin_id start end label - 0 00:00 00:10 epoch_0000 - 1 00:10 00:20 epoch_0001 - 2 00:20 00:30 epoch_0002 - ... - 143 23:50 00:00 epoch_0143 - time_segments argument is expected to have the following structure: - label length - epoch 10 - """ - freq = time_segments.iloc[0].loc['length'] - slots = pd.date_range(start='2020-01-01', end='2020-01-02', freq='{}min'.format(freq)) - slots = ['{:02d}:{:02d}'.format(x.hour, x.minute) for x in slots] - - table = pd.DataFrame(slots, columns=['start_time']) - table['length'] = time_segments.iloc[0].loc['length'] - table = table.iloc[:-1, :] - - label = time_segments.loc[0, 'label'] - table['label'] = range(0, table.shape[0]) - table['label'] = table['label'].apply(lambda x: '{}{:04}'.format(label, x)) - - return table[['start_time', 'length', 'label']] - -def parse_periodic_segments(time_segments): - time_segments.loc[time_segments["repeats_on"] == "every_day", "repeats_value"] = 0 - return time_segments - -def parse_event_segments(time_segments, device_ids): - return time_segments.query("device_id == @device_ids") - -def parse_time_segments(time_segments_file, segments_type, device_ids): - # Add code to validate and parse frequencies, intervals, and events - # Expected formats: - # Frequency: label, length columns (e.g. my_prefix, 5) length has to be in minutes (int) - # Interval: label, start, end columns (e.g. daily, 00:00, 23:59) start and end should be valid hours in 24 hour format - # Event: label, timestamp, length, shift (e.g., survey1, 1532313215463, 60, -30), timestamp is a UNIX timestamp in ms (we could take a date time string instead), length is in minutes (int), shift is in minutes (+/-int) and is added/substracted from timestamp - # Our output should have local_date, start_time, end_time, label. In the readable_datetime script, If local_date has the same value for all rows, every segment will be applied for all days, otherwise each segment will be applied only to its local_date - time_segments = pd.read_csv(time_segments_file) - - if time_segments is None: - message = 'The time segments file in [TIME_SEGMENTS][FILE] is None. Modify {}'.format(time_segments_file) - raise ValueError(message) - - if time_segments.shape[0] == 0: - message = 'The time segments file in [TIME_SEGMENTS][FILE] is empty. Modify {}'.format(time_segments_file) - raise ValueError(message) - - if(segments_type not in ["FREQUENCY", "PERIODIC", "EVENT"]): - raise ValueError("[TIME_SEGMENTS][TYPE] can only be FREQUENCY, PERIODIC, or EVENT") - - if(segments_type == "FREQUENCY" and is_valid_frequency_segments(time_segments, time_segments_file)): - time_segments = parse_frequency_segments(time_segments) - elif(segments_type == "PERIODIC" and is_valid_periodic_segments(time_segments, time_segments_file)): - time_segments = parse_periodic_segments(time_segments) - elif(segments_type == "EVENT" and is_valid_event_segments(time_segments, time_segments_file)): - time_segments = parse_event_segments(time_segments, device_ids) - else: - raise ValueError("{} does not have a format compatible with frequency, periodic or event time segments. Please refer to [LINK]".format(time_segments_file)) - return time_segments - -participant_file = yaml.load(open(snakemake.input[1], 'r'), Loader=yaml.FullLoader) -device_ids = [] -for key in participant_file.keys(): - if "DEVICE_IDS" in participant_file[key] and isinstance(participant_file[key]["DEVICE_IDS"], list): - device_ids = device_ids + participant_file[key]["DEVICE_IDS"] - -final_time_segments = parse_time_segments(snakemake.input[0], snakemake.params["time_segments_type"], device_ids) - -if snakemake.params["time_segments_type"] == "EVENT" and final_time_segments.shape[0] == 0: - warnings.warn("There are no event time segments for {}. Check your time segment file {}".format(snakemake.params["pid"], snakemake.input[0])) - -final_time_segments.to_csv(snakemake.output["segments_file"], index=False) -pd.DataFrame({"label" : final_time_segments["label"].unique()}).to_csv(snakemake.output["segments_labels_file"], index=False) \ No newline at end of file diff --git a/src/data/datetime/assign_to_time_segment.R b/src/data/datetime/assign_to_time_segment.R index 09f27b14..f31807b7 100644 --- a/src/data/datetime/assign_to_time_segment.R +++ b/src/data/datetime/assign_to_time_segment.R @@ -11,7 +11,7 @@ get_segment_dates <- function(data, local_timezone, day_type, delay){ dates <- data %>% distinct(local_date) %>% mutate(local_date_obj = date(lubridate::ymd(local_date, tz = local_timezone))) %>% - complete(local_date_obj = seq(date(min(local_date_obj) - delay), max(local_date_obj), by="days")) %>% + complete(local_date_obj = seq(date(min(local_date_obj) - delay), date(max(local_date_obj) + delay), by="days")) %>% mutate(local_date = replace_na(as.character(date(local_date_obj)))) if(day_type == "every_day") @@ -27,6 +27,27 @@ get_segment_dates <- function(data, local_timezone, day_type, delay){ return(dates) } +create_nonoverlapping_periodic_segments <- function(nested_inferred_time_segments){ + new_segments <- (data.frame(nested_inferred_time_segments %>% + group_by(original_label) %>% + mutate(max_groups = max(overlap_id) + 1) %>% + # select(label, segment_id_start, segment_id_end, overlap_id, max_groups) %>% + nest() %>% + mutate(data = map(data, function(nested_data){ + nested_data <- nested_data %>% arrange( segment_id_start, segment_id_end) %>% + group_by(segment_id_start) %>% + mutate(n_id = ((cur_group_id()-1) %% max_groups)) %>% + filter(overlap_id == n_id) %>% + # select(label, segment_id_start, overlap_id, n_id) %>% + ungroup() + })) %>% + unnest(cols = data) %>% + ungroup() + )) + return(new_segments) +} + + assign_rows_to_segments <- function(nested_data, nested_inferred_time_segments){ nested_data <- nested_data %>% mutate(assigned_segments = "") for(i in seq_len(nrow(nested_inferred_time_segments))) { @@ -113,10 +134,10 @@ assign_to_time_segment <- function(sensor_data, time_segments, time_segments_typ pivot_longer(cols = c(every_day,wday, mday, qday, yday), names_to = "day_type", values_to = "day_value") %>% filter(repeats_on == day_type & repeats_value == day_value) %>% # The segment ids (segment_id_start and segment_id_end) are computed in UTC to avoid having different labels for instances of a segment that happen in different timezones - mutate(segment_id_start = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM")), + mutate(segment_id_start = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM")) + period(overlap_duration), segment_id_end = segment_id_start + lubridate::duration(length), # The actual segments are computed using timestamps taking into account the timezone - segment_start_ts = as.numeric(lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = local_timezone)) * 1000, + segment_start_ts = as.numeric(lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = local_timezone) + period(overlap_duration)) * 1000, segment_end_ts = segment_start_ts + as.numeric(lubridate::duration(length)) * 1000 + 999, segment_id = paste0("[", paste0(label,"#", @@ -128,6 +149,7 @@ assign_to_time_segment <- function(sensor_data, time_segments, time_segments_typ "]")) %>% # drop time segments with an invalid start or end time (mostly due to daylight saving changes, e.g. 2020-03-08 02:00:00 EST does not exist, clock jumps from 01:59am to 03:00am) drop_na(segment_start_ts, segment_end_ts)), + inferred_time_segments = map(inferred_time_segments, create_nonoverlapping_periodic_segments), data = map2(data, inferred_time_segments, assign_rows_to_segments) ) %>% select(-existent_dates, -inferred_time_segments, -every_date, -week_dates, -month_dates, -quarter_dates, -year_dates) %>% diff --git a/src/data/datetime/process_time_segments.R b/src/data/datetime/process_time_segments.R new file mode 100644 index 00000000..92ebc249 --- /dev/null +++ b/src/data/datetime/process_time_segments.R @@ -0,0 +1,204 @@ +source("renv/activate.R") +library("lubridate") +library("readr") +library("dplyr") +library("tidyr") +library("stringr") +library("yaml") + +validate_periodic_segments <- function(segments){ + invalid_lengths <- segments %>% mutate(is_valid = str_detect(length, "^[[:space:]]*(\\d+?[d|D])??[[:space:]]*(\\d+?[h|H])??[[:space:]]*(\\d+?[m|M])??[[:space:]]*(\\d+?[s|S])??$")) + if(any(!(invalid_lengths$is_valid))) + stop("One or more rows in your periodic time segments file have an invalid length format (XXD XXH XXM XXS): ", + paste(invalid_lengths %>% filter(!is_valid) %>% pull(label), collapse = ", ")) + + if(any(is.na(segments$length_period))) + stop("One or more rows in your periodic time segments file have an invalid length value: ", + paste(segments %>% filter(is.na(length_period)) %>% pull(label), collapse = ",")) + + if(any(is.na(segments$start_time_format))) + stop("One or more rows in your periodic time segments file have an invalid start_time (HH:MM:SS): ", + paste(segments %>% filter(is.na(start_time_format)) %>% pull(label), collapse = ", ")) + + longer_start_time <- segments %>% mutate(is_longer = start_time_format > period("23H 59M 59S")) + if(any(longer_start_time$is_longer)) + stop("One or more rows in your periodic time segments file have a start_time longer than 23:59:59: ", + paste(longer_start_time %>% filter(is_longer) %>% pull(label), collapse = ", ")) + + invalid_repeats_on <- segments %>% filter(!repeats_on %in% c("every_day", "wday", "mday", "qday","yday")) %>% pull(label) + if(length(invalid_repeats_on) > 0) + stop("One or more rows in your periodic time segments file have an invalid repeats_on: ", + paste(invalid_repeats_on, collapse = ","), + ". Valid values include: ", + paste(c("every_day", "wday", "mday", "qday","yday"), collapse = ", ")) + + if(nrow(count(segments, label) %>% filter(n > 1)) > 0) + stop("The values in the column 'label' should be unique but they are not: ", + paste(count(segments, label) %>% filter(n > 1) %>% pull(label), collapse = ", "), + ". Valid values include: ", + paste(c("every_day", "wday", "mday", "qday","yday"), collapse = ", ")) + + if(nrow(filter(segments, length_period > repeats_on_period & repeats_on %in% c("mday", "qday", "yday")))) + stop("We do not support mday, qday, or yday segments that overlap yet. Get in touch with the RAPIDS team if you'd like to have this functionality. Overlapping segments: ", + paste((filter(segments, length_period > repeats_on_period)) %>% filter(repeats_on %in% c("mday", "qday", "yday")) %>% pull(label), collapse = ",")) + + distinct_segments <- segments %>% distinct(across(-label), .keep_all=TRUE) + if(nrow(segments) != nrow(distinct_segments)) + stop("Your periodic time segments file has ", nrow(segments) - nrow(distinct_segments), " duplicated row(s) (excluding label): ", + paste(setdiff(segments %>% pull(label), distinct_segments %>% pull(label)), collapse = ",")) + + invalid_repeats_value <- segments %>% + mutate(is_invalid = case_when(repeats_on == "every_day" ~ repeats_value != 0, + repeats_on == "wday" ~ repeats_value < 1 | repeats_value > 7, + repeats_on == "mday" ~ repeats_value < 1 | repeats_value > 31, + repeats_on == "qday" ~ repeats_value < 1 | repeats_value > 91, + repeats_on == "yday" ~ repeats_value < 1 | repeats_value > 365)) + if(any(invalid_repeats_value$is_invalid)) + stop("One or more rows in your periodic time segments file have an invalid repeats_value (0 for every_day, [1,7] for wday, [1,31] for mday, [1,91] for qday, [1,366] for yday): ", + paste(invalid_repeats_value %>% filter(is_invalid) %>% pull(label), collapse = ", ")) + return(segments) + +} + +validate_periodic_columns <- function(segments){ + if(nrow(segments) == 0) + stop("Your periodic time segments file is empty: ", segments_file) + + if(!identical(colnames(segments), c("label","start_time","length","repeats_on","repeats_value"))) + stop("Your periodic time segments file does not have the expected columns (label,start_time,length,repeats_on,repeats_value). Maybe you have a typo in the names?") + return(segments) +} + +prepare_periodic_segments <- function(segments){ + segments <- segments %>% + validate_periodic_columns() %>% + mutate(length_period = period(length), + start_time_format = hms(start_time, quiet = TRUE), + repeats_on_period = case_when(repeats_on == "every_day" ~ period("1D"), + repeats_on == "wday" ~ period("7D"), + repeats_on == "mday" ~ period("28D"), + repeats_on == "qday" ~ period("95D"), + repeats_on == "yday" ~ period("365D"))) %>% + validate_periodic_segments() %>% + mutate(new_segments = (length_period %/% repeats_on_period) + 1) %>% + uncount(weights = new_segments, .remove = FALSE, .id = "overlap_id") %>% + mutate(overlap_id = overlap_id -1, + original_label = label, + overlap_duration = paste0(overlap_id * repeats_on_period / days(1),"D"), + label = paste0(label, "_RR", overlap_id, "SS")) %>% + select(label,start_time,length,repeats_on,repeats_value,overlap_duration,overlap_id,original_label) + return(segments) +} + +validate_frequency_segments <- function(segments){ + if(nrow(segments) == 0) + stop("Your frequency time segments file is empty: ", segments_file) + if(!identical(colnames(segments), c("label","length"))) + stop("Your frequency time segments file does not have the expected columns (label, length). Maybe you have a typo in the names?") + if(nrow(segments) > 1) + stop("Your frequency time segments file cannot have more than one row") + if(any(is.na(segments$label))) + stop("Your frequency time segments file has an empty or invalid label") + if(nrow(segments %>% filter(!is.na(length) & length >= 1 & length <= 1440)) == 0) + stop("Your frequency time segments file has an empty or invalid length (only numbers between [1,1440] are accepted), you typed: ", segments$length) + return(segments) +} + +prepare_frequency_segments <- function(segments){ + validate_frequency_segments(segments) + stamp_fn <- stamp("23:10", orders = c("HM"), quiet = TRUE) + new_segments <- data.frame(start_time = seq.POSIXt(from = ymd_hms("2020-01-01 00:00:00"), + to=ymd_hms("2020-01-02 00:00:00"), + by=paste(segments$length, "min"))) + new_segments <- new_segments %>% + head(-1) %>% + mutate(start_time = stamp_fn(start_time), + length = segments$length, + label = paste0(segments$label, str_pad(row_number()-1, width = 4, pad = "0"))) + +} + +get_devices_ids <- function(participant_data){ + devices_ids = c() + for(device in participant_data) + for(attribute in names(device)) + if(attribute == "DEVICE_IDS") + devices_ids <- c(devices_ids, device[[attribute]]) + return(devices_ids) +} + +validate_event_segments <- function(segments){ + if(nrow(segments) == 0) + stop("The following time segments file is empty: ", segments_file) + + if(!identical(colnames(segments), c("label","event_timestamp","length","shift","shift_direction","device_id"))) + stop("Your periodic time segments file does not have the expected columns (label,event_timestamp,length,shift,shift_direction,device_id). Maybe you have a typo in the names?") + + invalid_lengths <- segments %>% mutate(is_valid = str_detect(length, "^[[:space:]]*(\\d+?[d|D])??[[:space:]]*(\\d+?[h|H])??[[:space:]]*(\\d+?[m|M])??[[:space:]]*(\\d+?[s|S])??$")) + if(any(!(invalid_lengths$is_valid))) + stop("One or more rows in your event time segments file have an invalid length format (XXD XXH XXM XXS): ", + paste(invalid_lengths %>% filter(!is_valid) %>% pull(label), collapse = ", ")) + + invalid_shifts <- segments %>% mutate(is_valid = str_detect(shift, "^[[:space:]]*(\\d+?[d|D])??[[:space:]]*(\\d+?[h|H])??[[:space:]]*(\\d+?[m|M])??[[:space:]]*(\\d+?[s|S])??$")) + if(any(!(invalid_shifts$is_valid))) + stop("One or more rows in your event time segments file have an invalid shift format (XXD XXH XXM XXS): ", + paste(invalid_shifts %>% filter(!is_valid) %>% pull(label), collapse = ", ")) + + invalid_shift_direction <- segments %>% filter(shift_direction < -1 | shift_direction > 1) + if(nrow(invalid_shift_direction) > 0) + stop("One or more rows in your event time segments file have an invalid shift direction (-1,0,1): ", + paste(invalid_shift_direction %>% pull(label), collapse = ", ")) + + invalid_timestamps <- segments %>% filter(is.na(event_timestamp)) + if(nrow(invalid_timestamps) > 0) + stop("One or more rows in your event time segments file have an empty timestamp: ", + paste(invalid_timestamps %>% pull(label), collapse = ", ")) + + invalid_timestamps <- segments %>% filter(event_timestamp <= 999999999999) + if(nrow(invalid_timestamps) > 0) + stop("One or more rows in your event time segments file is not in milliseconds: ", + paste(invalid_timestamps %>% pull(label), collapse = ", ")) + + distinct_segments <- segments %>% mutate(row_id = row_number()) %>% distinct(across(c(-label, -row_id)), .keep_all=TRUE) + if(nrow(segments) != nrow(distinct_segments)) + stop("Your event time segments file has ", nrow(segments) - nrow(distinct_segments), " duplicated row(s) (excluding label). Duplicated row number(s): ", + paste(setdiff(segments %>% mutate(row_id = row_number()) %>% pull(row_id), distinct_segments %>% pull(row_id)), collapse = ",")) + + return(segments) +} + +prepare_event_segments <- function(segments, participant_data){ + participant_devices <- get_devices_ids(participant_data) + if(length(participant_devices) == 0) + stop("There are no devices in the participant file.") + + new_segments <- segments%>% + validate_event_segments() %>% + filter(device_id %in% participant_devices) + return(new_segments) +} + +compute_time_segments <- function(){ + type = snakemake@params[["time_segments_type"]] + pid = snakemake@params[["pid"]] + segments_file <- snakemake@input[["segments_file"]] + participant_file <- snakemake@input[["participant_file"]] + message("Processing ",type, " time segments for ", pid,"'s ", participant_file) + + if(type == "FREQUENCY"){ + segments <- read_csv(segments_file, col_types = cols_only(label = "c", length = "i"), trim_ws = TRUE) + new_segments <- prepare_frequency_segments(segments) + } else if(type == "PERIODIC"){ + segments <- read_csv(segments_file, col_types = cols_only(label = "c", start_time = "c",length = "c",repeats_on = "c",repeats_value = "i"), trim_ws = TRUE) + new_segments <- prepare_periodic_segments(segments) + } else if(type == "EVENT"){ + participant_data <- yaml::read_yaml(participant_file) + segments <- read_csv(segments_file, col_types = cols_only(label = "c", event_timestamp = "d",length = "c",shift = "c",shift_direction = "i", device_id = "c"), trim_ws = TRUE) + new_segments <- prepare_event_segments(segments, participant_data) + } + + write.csv(new_segments %>% select(label) %>% distinct(label), snakemake@output[["segments_labels_file"]], row.names = FALSE, quote = FALSE) + write.csv(new_segments,snakemake@output[["segments_file"]], row.names = FALSE, quote = FALSE) +} + +compute_time_segments() \ No newline at end of file diff --git a/src/features/utils/join_features_from_providers.R b/src/features/utils/join_features_from_providers.R index bbbc18b3..bf1e97c7 100644 --- a/src/features/utils/join_features_from_providers.R +++ b/src/features/utils/join_features_from_providers.R @@ -11,4 +11,4 @@ for(location_features_file in location_features_files){ location_features <- merge(location_features, read.csv(location_features_file), all = TRUE) } -write.csv(location_features, snakemake@output[[1]], row.names = FALSE) \ No newline at end of file +write.csv(location_features %>% arrange(local_segment), snakemake@output[[1]], row.names = FALSE) \ No newline at end of file diff --git a/src/features/utils/merge_sensor_features_for_individual_participants.R b/src/features/utils/merge_sensor_features_for_individual_participants.R index e1264e1e..b9090adf 100644 --- a/src/features/utils/merge_sensor_features_for_individual_participants.R +++ b/src/features/utils/merge_sensor_features_for_individual_participants.R @@ -1,22 +1,13 @@ source("renv/activate.R") -library(tidyr) -library(purrr) library("dplyr", warn.conflicts = F) -library("methods") -library("mgm") -library("qgraph") -library("dplyr", warn.conflicts = F) -library("scales") -library("ggplot2") library("purrr") -library("tidyr") -library("reshape2") feature_files <- snakemake@input[["feature_files"]] features_for_individual_model <- feature_files %>% map(read.csv, stringsAsFactors = F, colClasses = c(local_segment = "character", local_segment_label = "character", local_segment_start_datetime="character", local_segment_end_datetime="character")) %>% - reduce(full_join, by=c("local_segment","local_segment_label","local_segment_start_datetime","local_segment_end_datetime")) + reduce(full_join, by=c("local_segment","local_segment_label","local_segment_start_datetime","local_segment_end_datetime")) %>% + arrange(local_segment) write.csv(features_for_individual_model, snakemake@output[[1]], row.names = FALSE) diff --git a/src/features/utils/utils.R b/src/features/utils/utils.R index 629e7eb3..5ffbe492 100644 --- a/src/features/utils/utils.R +++ b/src/features/utils/utils.R @@ -69,8 +69,8 @@ fetch_provider_features <- function(provider, provider_key, sensor_key, sensor_d for(feature in provider[["FEATURES"]]) sensor_features[,feature] <- NA } - - sensor_features <- sensor_features %>% extract(col = local_segment, + sensor_features <- sensor_features %>% mutate(local_segment = str_remove(local_segment, "_RR\\d+SS")) %>% + extract(col = local_segment, into = c("local_segment_label", "local_segment_start_datetime", "local_segment_end_datetime"), "(.*)#(.*),(.*)", remove = FALSE) diff --git a/src/features/utils/utils.py b/src/features/utils/utils.py index abb8cbd8..1e540016 100644 --- a/src/features/utils/utils.py +++ b/src/features/utils/utils.py @@ -113,6 +113,7 @@ def fetch_provider_features(provider, provider_key, sensor_key, sensor_data_file for feature in provider["FEATURES"]: sensor_features[feature] = None segment_colums = pd.DataFrame() + sensor_features['local_segment'] = sensor_features['local_segment'].str.replace(r'_RR\d+SS', '') split_segemnt_columns = sensor_features["local_segment"].str.split(pat="(.*)#(.*),(.*)", expand=True) new_segment_columns = split_segemnt_columns.iloc[:,1:4] if split_segemnt_columns.shape[1] == 5 else pd.DataFrame(columns=["local_segment_label", "local_segment_start_datetime","local_segment_end_datetime"]) segment_colums[["local_segment_label", "local_segment_start_datetime", "local_segment_end_datetime"]] = new_segment_columns From c48c1c8f242390ef85c21313f480c0f7a37ade1c Mon Sep 17 00:00:00 2001 From: JulioV Date: Tue, 23 Mar 2021 14:04:01 -0400 Subject: [PATCH 2/4] Optimize Barnett's computation multi-day segments --- Snakefile | 3 + docs/features/phone-locations.md | 14 +- rules/common.smk | 5 + rules/features.smk | 14 +- .../phone_locations/barnett/daily_features.R | 68 +++++++++ src/features/phone_locations/barnett/main.R | 141 +++++++----------- 6 files changed, 159 insertions(+), 86 deletions(-) create mode 100644 src/features/phone_locations/barnett/daily_features.R diff --git a/Snakefile b/Snakefile index eca70344..71a6fc5c 100644 --- a/Snakefile +++ b/Snakefile @@ -205,6 +205,9 @@ for provider in config["PHONE_LOCATIONS"]["PROVIDERS"].keys(): else: raise ValueError("Error: Add PHONE_LOCATIONS (and as many PHONE_SENSORS as you have) to [PHONE_DATA_YIELD][SENSORS] in config.yaml. This is necessary to compute phone_yielded_timestamps (time when the smartphone was sensing data) which is used to resample fused location data (ALL_RESAMPLED and RESAMPLED_FUSED)") + if provider == "BARNETT": + files_to_compute.extend(expand("data/interim/{pid}/phone_locations_barnett_daily.csv", pid=config["PIDS"])) + files_to_compute.extend(expand("data/raw/{pid}/phone_locations_raw.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/interim/{pid}/phone_locations_processed.csv", pid=config["PIDS"])) files_to_compute.extend(expand("data/interim/{pid}/phone_locations_processed_with_datetime.csv", pid=config["PIDS"])) diff --git a/docs/features/phone-locations.md b/docs/features/phone-locations.md index eeb34a82..d46ce1ef 100644 --- a/docs/features/phone-locations.md +++ b/docs/features/phone-locations.md @@ -33,7 +33,7 @@ These features are based on the original open-source implementation by [Barnett !!! info "Available time segments and platforms" - - Available only for segments that start at 00:00:00 and end at 23:59:59 of the same day (daily segments) + - Available only for segments that start at 00:00:00 and end at 23:59:59 of the same or a different day (daily, weekly, weekend, etc.) - Available for Android and iOS !!! info "File Sequence" @@ -78,7 +78,17 @@ Features description for `[PHONE_LOCATIONS][PROVIDERS][BARNETT]` adapted from [B |wkenddayrtn | - | Same as circdnrtn but computed separately for weekends and weekdays. !!! note "Assumptions/Observations" - **Barnett\'s et al features** + **Multi day segment features** + Barnett's features are only available on time segments that span entire days (00:00:00 to 23:59:59). Such segments can be one-day long (daily) or multi-day (weekly, for example). Multi-day segment features are computed based on daily features summarized the following way: + + - sum for `hometime`, `disttravelled`, `siglocsvisited`, and `minutes_data_used` + - max for `maxdiam`, and `maxhomedist` + - mean for `rog`, `avgflightlen`, `stdflightlen`, `avgflightdur`, `stdflightdur`, `probpause`, `siglocentropy`, `circdnrtn`, `wkenddayrtn`, and `minsmissing` + + **Computation speed** + The process to extract these features can be slow compared to other sensors and providers due to the required simulation. + + **How are these features computed?** These features are based on a Pause-Flight model. A pause is defined as a mobility trace (location pings) within a certain duration and distance (by default, 300 seconds and 60 meters). A flight is any mobility trace between two pauses. Data is resampled and imputed before the features are computed. See [Barnett et al](../../citation#barnett-locations) for more information. In RAPIDS, we only expose one parameter for these features (accuracy limit). You can change other parameters in `src/features/phone_locations/barnett/library/MobilityFeatures.R`. **Significant Locations** diff --git a/rules/common.smk b/rules/common.smk index 3e34a105..6651677a 100644 --- a/rules/common.smk +++ b/rules/common.smk @@ -11,6 +11,11 @@ def get_script_language(script_path): # Features.smk ######################################################################################################### +def get_barnett_daily(wildcards): + if wildcards.provider_key.upper() == "BARNETT": + return "data/interim/{pid}/phone_locations_barnett_daily.csv" + return [] + def find_features_files(wildcards): feature_files = [] for provider_key, provider in config[(wildcards.sensor_key).upper()]["PROVIDERS"].items(): diff --git a/rules/features.smk b/rules/features.smk index a2105145..7f471968 100644 --- a/rules/features.smk +++ b/rules/features.smk @@ -379,10 +379,22 @@ rule phone_locations_python_features: script: "../src/features/entry.py" +rule phone_locations_barnett_daily_features: + input: + sensor_data = "data/interim/{pid}/phone_locations_processed_with_datetime.csv", + time_segments_labels = "data/interim/time_segments/{pid}_time_segments_labels.csv", + params: + provider = lambda wildcards: config["PHONE_LOCATIONS"]["PROVIDERS"]["BARNETT"], + output: + "data/interim/{pid}/phone_locations_barnett_daily.csv" + script: + "../src/features/phone_locations/barnett/daily_features.R" + rule phone_locations_r_features: input: sensor_data = "data/interim/{pid}/phone_locations_processed_with_datetime.csv", - time_segments_labels = "data/interim/time_segments/{pid}_time_segments_labels.csv" + time_segments_labels = "data/interim/time_segments/{pid}_time_segments_labels.csv", + barnett_daily = get_barnett_daily params: provider = lambda wildcards: config["PHONE_LOCATIONS"]["PROVIDERS"][wildcards.provider_key.upper()], provider_key = "{provider_key}", diff --git a/src/features/phone_locations/barnett/daily_features.R b/src/features/phone_locations/barnett/daily_features.R new file mode 100644 index 00000000..4a9009c5 --- /dev/null +++ b/src/features/phone_locations/barnett/daily_features.R @@ -0,0 +1,68 @@ +source("renv/activate.R") +library("dplyr", warn.conflicts = F) +library("stringr") +library("lubridate") +library("purrr") + +# Load Ian Barnett's code. From https://scholar.harvard.edu/ibarnett/software/gpsmobility +file.sources = list.files(c("src/features/phone_locations/barnett/library"), pattern="*.R$", full.names=TRUE, ignore.case=TRUE) +output_apply <- sapply(file.sources,source,.GlobalEnv) + +create_empty_file <- function(){ + return(data.frame(local_date= character(), hometime= numeric(), disttravelled= numeric(), rog= numeric(), maxdiam= numeric(), + maxhomedist= numeric(), siglocsvisited= numeric(), avgflightlen= numeric(), stdflightlen= numeric(), + avgflightdur= numeric(), stdflightdur= numeric(), probpause= numeric(), siglocentropy= numeric(), minsmissing= numeric(), + circdnrtn= numeric(), wkenddayrtn= numeric(), minutes_data_used= numeric() + )) +} + +barnett_daily_features <- function(snakemake){ + location_features <- NULL + location <- read.csv(snakemake@input[["sensor_data"]], stringsAsFactors = FALSE) + segment_labels <- read.csv(snakemake@input[["time_segments_labels"]], stringsAsFactors = FALSE) + accuracy_limit <- snakemake@params[["provider"]][["ACCURACY_LIMIT"]] + datetime_start_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2} 00:00:00" + datetime_end_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2} 23:59:59" + location <- location %>% + filter(accuracy < accuracy_limit) %>% + mutate(is_daily = str_detect(assigned_segments, paste0(".*#", datetime_start_regex, ",", datetime_end_regex, ".*"))) + + + if(nrow(location) == 0 || all(location$is_daily == FALSE) || (max(location$timestamp) - min(location$timestamp) < 86400000)){ + warning("Barnett's location features cannot be computed for data or time segments that do not span one or more entire days (00:00:00 to 23:59:59). Values below point to the problem:", + "\nLocation data rows within accuracy: ", nrow(location %>% filter(accuracy < accuracy_limit)), + "\nLocation data rows within a daily time segment: ", nrow(filter(location, is_daily)), + "\nLocation data time span in days: ", round((max(location$timestamp) - min(location$timestamp)) / 86400000, 2) + ) + location_features <- create_empty_file() + } else{ + # Count how many minutes of data we use to get location features. Some minutes have multiple fused rows + location_minutes_used <- location %>% + group_by(local_date, local_hour) %>% + summarise(n_minutes = n_distinct(local_minute), .groups = 'drop_last') %>% + group_by(local_date) %>% + summarise(minutes_data_used = sum(n_minutes), .groups = 'drop_last') %>% + select(local_date, minutes_data_used) + + # Select only the columns that the algorithm needs + all_timezones <- table(location %>% pull(local_timezone)) + location <- location %>% select(timestamp, latitude = double_latitude, longitude = double_longitude, altitude = double_altitude, accuracy) + timezone <- names(all_timezones)[as.vector(all_timezones)==max(all_timezones)] + outputMobility <- MobilityFeatures(location, ACCURACY_LIM = accuracy_limit, tz = timezone) + + if(is.null(outputMobility)){ + location_features <- create_empty_file() + } else { + # Copy index (dates) as a column + features <- cbind(rownames(outputMobility$featavg), outputMobility$featavg) + features <- as.data.frame(features) + features[-1] <- lapply(lapply(features[-1], as.character), as.numeric) + colnames(features)=c("local_date",tolower(colnames(outputMobility$featavg))) + location_features <- left_join(features, location_minutes_used, by = "local_date") + } + + } + write.csv(location_features, snakemake@output[[1]], row.names =FALSE) +} + +barnett_daily_features(snakemake) \ No newline at end of file diff --git a/src/features/phone_locations/barnett/main.R b/src/features/phone_locations/barnett/main.R index 127cb68e..db58dfe8 100644 --- a/src/features/phone_locations/barnett/main.R +++ b/src/features/phone_locations/barnett/main.R @@ -1,108 +1,83 @@ source("renv/activate.R") library("dplyr", warn.conflicts = F) library("stringr") - -# Load Ian Barnett's code. Taken from https://scholar.harvard.edu/ibarnett/software/gpsmobility -file.sources = list.files(c("src/features/phone_locations/barnett/library"), pattern="*.R$", full.names=TRUE, ignore.case=TRUE) -sapply(file.sources,source,.GlobalEnv) +library("lubridate") +library("purrr") create_empty_file <- function(requested_features){ return(data.frame(local_segment= character(), - hometime= numeric(), - disttravelled= numeric(), - rog= numeric(), - maxdiam= numeric(), - maxhomedist= numeric(), - siglocsvisited= numeric(), - avgflightlen= numeric(), - stdflightlen= numeric(), - avgflightdur= numeric(), - stdflightdur= numeric(), - probpause= numeric(), - siglocentropy= numeric(), - minsmissing= numeric(), - circdnrtn= numeric(), - wkenddayrtn= numeric(), - minutes_data_used= numeric() - ) %>% select(all_of(requested_features))) + hometime= numeric(), + disttravelled= numeric(), + rog= numeric(), + maxdiam= numeric(), + maxhomedist= numeric(), + siglocsvisited= numeric(), + avgflightlen= numeric(), + stdflightlen= numeric(), + avgflightdur= numeric(), + stdflightdur= numeric(), + probpause= numeric(), + siglocentropy= numeric(), + minsmissing= numeric(), + circdnrtn= numeric(), + wkenddayrtn= numeric(), + minutes_data_used= numeric() + ) %>% select(all_of(requested_features))) +} + +summarise_multiday_segments <- function(segments, features){ + features <- features %>% mutate(local_date=ymd(local_date)) + segments <- segments %>% extract(col = local_segment, + into = c ("local_segment_start_datetime", "local_segment_end_datetime"), + ".*#(.*) .*,(.*) .*", + remove = FALSE) %>% + mutate(local_segment_start_datetime = ymd(local_segment_start_datetime), + local_segment_end_datetime = ymd(local_segment_end_datetime)) %>% + group_by(local_segment) %>% + nest() %>% + mutate(data = map(data, function(nested_data, nested_features){ + + summary <- nested_features %>% filter(local_date >= nested_data$local_segment_start_datetime & + local_date <= nested_data$local_segment_end_datetime) + if(nrow(summary) > 0) + summary <- summary %>% + summarise(across(c(hometime, disttravelled, siglocsvisited, minutes_data_used), sum), + across(c(maxdiam, maxhomedist), max), + across(c(rog, avgflightlen, stdflightlen, avgflightdur, stdflightdur, probpause, siglocentropy, circdnrtn, wkenddayrtn, minsmissing), mean)) + return(summary) + + }, features)) %>% + unnest(cols = everything()) %>% + ungroup() + return(segments) } barnett_features <- function(sensor_data_files, time_segment, params){ - - location_data <- read.csv(sensor_data_files[["sensor_data"]], stringsAsFactors = FALSE) location_features <- NULL - - location <- location_data - accuracy_limit <- params[["ACCURACY_LIMIT"]] + daily_features <- read.csv(sensor_data_files[["barnett_daily"]], stringsAsFactors = FALSE) + location <- read.csv(sensor_data_files[["sensor_data"]], stringsAsFactors = FALSE) minutes_data_used <- params[["MINUTES_DATA_USED"]] - - # Compute what features were requested available_features <- c("hometime","disttravelled","rog","maxdiam", "maxhomedist","siglocsvisited","avgflightlen", "stdflightlen", - "avgflightdur","stdflightdur", "probpause","siglocentropy","minsmissing", "circdnrtn","wkenddayrtn") + "avgflightdur","stdflightdur", "probpause","siglocentropy", "circdnrtn","wkenddayrtn") requested_features <- intersect(unlist(params["FEATURES"], use.names = F), available_features) requested_features <- c("local_segment", requested_features) if(minutes_data_used) requested_features <- c(requested_features, "minutes_data_used") - - # Excludes datasets with less than 24 hours of data - if(max(location$timestamp) - min(location$timestamp) < 86400000) - location <- head(location, 0) - - if (nrow(location) > 1){ - # Filter by segment and skipping any non-daily segment + + if (nrow(location) > 0 & nrow(daily_features) > 0){ location <- location %>% filter_data_by_segment(time_segment) - datetime_start_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2} 00:00:00" datetime_end_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2} 23:59:59" location <- location %>% mutate(is_daily = str_detect(local_segment, paste0(time_segment, "#", datetime_start_regex, ",", datetime_end_regex))) - - if(!all(location$is_daily)){ - message(paste("Barnett's location features cannot be computed for time segmentes that are not daily (cover 00:00:00 to 23:59:59 of every day). Skipping ", time_segment)) + if(nrow(location) == 0 || !all(location$is_daily)){ + message(paste("Barnett's location features cannot be computed for data or time segmentes that do not span entire days (00:00:00 to 23:59:59). Skipping ", time_segment)) location_features <- create_empty_file(requested_features) } else { - # Count how many minutes of data we use to get location features - # Some minutes have multiple fused rows - location_minutes_used <- location %>% - group_by(local_date, local_hour) %>% - summarise(n_minutes = n_distinct(local_minute), .groups = 'drop_last') %>% - group_by(local_date) %>% - summarise(minutes_data_used = sum(n_minutes), .groups = 'drop_last') %>% - select(local_date, minutes_data_used) - - # Save time segment to attach it later - location_dates_segments <- location %>% select(local_date, local_segment) %>% distinct(local_date, .keep_all = TRUE) - - # Select only the columns that the algorithm needs - all_timezones <- table(location %>% pull(local_timezone)) - location <- location %>% select(timestamp, latitude = double_latitude, longitude = double_longitude, altitude = double_altitude, accuracy) - if(nrow(location %>% filter(accuracy < accuracy_limit)) > 1){ - timezone <- names(all_timezones)[as.vector(all_timezones)==max(all_timezones)] - outputMobility <- MobilityFeatures(location, ACCURACY_LIM = accuracy_limit, tz = timezone) - } else { - print(paste("Cannot compute Barnett location features because there are no rows with an accuracy value lower than ACCURACY_LIMIT", accuracy_limit)) - outputMobility <- NULL - } - - if(is.null(outputMobility)){ - location_features <- create_empty_file(requested_features) - } else{ - # Copy index (dates) as a column - features <- cbind(rownames(outputMobility$featavg), outputMobility$featavg) - features <- as.data.frame(features) - features[-1] <- lapply(lapply(features[-1], as.character), as.numeric) - colnames(features)=c("local_date",tolower(colnames(outputMobility$featavg))) - # Add the minute count column - features <- left_join(features, location_minutes_used, by = "local_date") - # Add the time segment column for consistency - features <- left_join(features, location_dates_segments, by = "local_date") - location_features <- features %>% select(all_of(requested_features)) - } + location_dates_segments <- location %>% select(local_segment) %>% distinct(local_segment, .keep_all = TRUE) + features <- summarise_multiday_segments(location_dates_segments, daily_features) + location_features <- features %>% select(all_of(requested_features)) } - } else { - location_features <- create_empty_file(requested_features) - } - - if(ncol(location_features) != length(requested_features)) - stop(paste0("The number of features in the output dataframe (=", ncol(location_features),") does not match the expected value (=", length(requested_features),"). Verify your barnett location features")) + } else + location_features <- create_empty_file(requested_features) return(location_features) } \ No newline at end of file From 87fbbbe402092ebba2cf8e078d52fd643ff1438f Mon Sep 17 00:00:00 2001 From: JulioV Date: Sun, 28 Mar 2021 14:31:02 -0400 Subject: [PATCH 3/4] Refactor and simplify time segments --- src/data/datetime/assign_to_event_segments.R | 42 ++++ .../datetime/assign_to_periodic_segments.R | 110 ++++++++++ src/data/datetime/assign_to_time_segment.R | 198 ++---------------- src/data/datetime/process_time_segments.R | 14 +- 4 files changed, 179 insertions(+), 185 deletions(-) create mode 100644 src/data/datetime/assign_to_event_segments.R create mode 100644 src/data/datetime/assign_to_periodic_segments.R diff --git a/src/data/datetime/assign_to_event_segments.R b/src/data/datetime/assign_to_event_segments.R new file mode 100644 index 00000000..3b94ab05 --- /dev/null +++ b/src/data/datetime/assign_to_event_segments.R @@ -0,0 +1,42 @@ +validate_overlapping_event_segments <- function(segments){ + # Check for overlapping segments (not allowed because our resampling episode algorithm would have to have a second instead of minute granularity that increases storage and computation time) + overlapping <- segments %>% + group_by(label) %>% + arrange(segment_start_ts) %>% + mutate(overlaps = if_else(segment_start_ts <= lag(segment_end_ts), TRUE, FALSE), + overlapping_segments = glue("a) [{lag(label)},\t{lag(event_timestamp)},\t{lag(length)},\t{lag(shift)},\t{lag(shift_direction)},\t{lag(device_id)}] \n", + "b) [{label},\t{event_timestamp},\t{length},\t{shift},\t{shift_direction},\t{device_id}]")) + if(any(overlapping$overlaps, na.rm = TRUE)) + stop("One or more event time segments overlap for ",overlapping$device_id[[1]], + ", modify their lengths so they don't:\n", paste0(overlapping %>% filter(overlaps == TRUE) %>% pull(overlapping_segments), collapse = "\n")) +} + +infer_event_segments <- function(tz, segments){ + time_format_fn <- stamp("23:51:15", orders="HMS", quiet = TRUE) + inferred <- segments %>% + mutate(shift = ifelse(shift == "0", "0seconds", shift), + segment_start_ts = event_timestamp + (as.integer(seconds(lubridate::duration(shift))) * ifelse(shift_direction >= 0, 1, -1) * 1000), + segment_end_ts = segment_start_ts + (as.integer(seconds(lubridate::duration(length))) * 1000), + segment_id_start = lubridate::as_datetime(segment_start_ts/1000, tz = tz), + segment_id_end = lubridate::as_datetime(segment_end_ts/1000, tz = tz), + segment_end_ts = segment_end_ts + 999, + segment_id = glue("[{label}#{start_date} {start_time},{end_date} {end_time};{segment_start_ts},{segment_end_ts}]", + start_date=lubridate::date(segment_id_start), + start_time=time_format_fn(segment_id_start), + end_date=lubridate::date(segment_id_end), + end_time=time_format_fn(segment_id_end))) + validate_overlapping_event_segments(inferred) + return(inferred) +} + +assign_to_event_segments <- function(sensor_data, time_segments){ + sensor_data <- sensor_data %>% + group_by(local_timezone) %>% + nest() %>% + mutate(inferred_time_segments = map(local_timezone, infer_event_segments, time_segments), + data = map2(data, inferred_time_segments, assign_rows_to_segments)) %>% + select(-inferred_time_segments) %>% + unnest(data) %>% + arrange(timestamp) %>% + ungroup() +} \ No newline at end of file diff --git a/src/data/datetime/assign_to_periodic_segments.R b/src/data/datetime/assign_to_periodic_segments.R new file mode 100644 index 00000000..e2c13262 --- /dev/null +++ b/src/data/datetime/assign_to_periodic_segments.R @@ -0,0 +1,110 @@ +day_type_delay <- function(time_segments, day_type, include_past_periodic_segments){ + # Return a delay in days to consider or not the first row of data + delay <- time_segments %>% + mutate(length_duration = duration(length)) %>% + filter(repeats_on == day_type) %>% arrange(-length_duration) %>% + pull(length_duration) %>% + first() + return(if_else(is.na(delay) | include_past_periodic_segments == FALSE, duration("0days"), delay)) +} + +get_segment_dates <- function(data, local_timezone, day_type, delay){ + # Based on the data we are processing we extract unique dates to build segments + dates <- data %>% + distinct(local_date) %>% + mutate(local_date_obj = date(lubridate::ymd(local_date, tz = local_timezone))) %>% + complete(local_date_obj = seq(date(min(local_date_obj) - delay), date(max(local_date_obj) + delay), by="days")) %>% + mutate(local_date = replace_na(as.character(date(local_date_obj)))) + + if(day_type == "every_day") + dates <- dates %>% mutate(every_day = 0) + else if (day_type == "wday") + dates <- dates %>% mutate(wday = wday(local_date_obj, week_start = 1)) + else if (day_type == "mday") + dates <- dates %>% mutate(mday = mday(local_date_obj)) + else if (day_type == "qday") + dates <- dates %>% mutate(qday = qday(local_date_obj)) + else if (day_type == "yday") + dates <- dates %>% mutate(yday = yday(local_date_obj)) + return(dates) +} + +infer_existent_periodic_segments <- function(existent_dates, segments){ + # build the actual time segments taking into account the data and users' requested length and repeat schedule + # segment datetime labels are computed on UTC + crossing(segments, existent_dates) %>% + pivot_longer(cols = c(every_day,wday, mday, qday, yday), names_to = "day_type", values_to = "day_value") %>% + filter(repeats_on == day_type & repeats_value == day_value) %>% + mutate(segment_id_start = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM")) + period(overlap_duration), + segment_id_end = segment_id_start + lubridate::duration(length)) +} + +dedup_nonoverlapping_periodic_segments <- function(nested_inferred_time_segments){ + # Overlapping segments exist when their length is longer than their repeating frequency, e.g. twoday segements starting on every day + # In process_time_segments we decompose those segments into non-overlapping ones, e.g. twodayA +0days and twodayB +1days + # This means that any date will have more than one non-overlapping instances, that we need to dedup + # We choose alternating non-overlapping instances to guarantee any data row is only neeeded in one instance at a time + # d1,r1,twoday0 + # d2,r2,twoday0 twoday1 + # d3,r3,twoday1 twoday0 + # d4,r4,twoday0 twoday1 + new_segments <- data.frame(nested_inferred_time_segments %>% + group_by(original_label) %>% + mutate(max_groups = max(overlap_id) + 1) %>% + # select(label, segment_id_start, segment_id_end, overlap_id, max_groups) %>% + nest() %>% + mutate(data = map(data, function(nested_data){ + nested_data <- nested_data %>% arrange( segment_id_start, segment_id_end) %>% + group_by(segment_id_start) %>% + mutate(n_id = ((cur_group_id()-1) %% max_groups)) %>% + filter(overlap_id == n_id) %>% + # select(label, segment_id_start, overlap_id, n_id) %>% + ungroup() + })) %>% + unnest(cols = data) %>% + ungroup()) +} + + + +add_periodic_segment_timestamps_and_id <- function(segments, local_timezone){ + # segment timestamps are computed on the data's timezone(s) + time_format_fn <- stamp("23:51:15", orders="HMS", quiet = TRUE) + segments %>% mutate(segment_start_ts = as.numeric(lubridate::force_tz(segment_id_start, tzone = local_timezone)) * 1000, + segment_end_ts = segment_start_ts + as.numeric(lubridate::duration(length)) * 1000 + 999, + segment_id = glue("[{label}#{start_date} {start_time},{end_date} {end_time};{segment_start_ts},{segment_end_ts}]", + start_date=lubridate::date(segment_id_start), + start_time=time_format_fn(segment_id_start), + end_date=lubridate::date(segment_id_end), + end_time=time_format_fn(segment_id_end) )) %>% + drop_na(segment_start_ts, segment_end_ts) +} + +assign_to_periodic_segments <- function(sensor_data, time_segments, include_past_periodic_segments){ + time_segments <- time_segments %>% mutate(length_duration = duration(length)) + every_day_delay <- duration("0days") + wday_delay <- day_type_delay(time_segments, "wday", include_past_periodic_segments) + mday_delay <- day_type_delay(time_segments, "mday", include_past_periodic_segments) + qday_delay <- day_type_delay(time_segments, "qday", include_past_periodic_segments) + yday_delay <- day_type_delay(time_segments, "yday", include_past_periodic_segments) + + sensor_data <- sensor_data %>% + group_by(local_timezone) %>% + nest() %>% + mutate(every_date = map2(data, local_timezone, get_segment_dates, "every_day", every_day_delay), + week_dates = map2(data, local_timezone, get_segment_dates, "wday", wday_delay), + month_dates = map2(data, local_timezone, get_segment_dates, "mday", mday_delay), + quarter_dates = map2(data, local_timezone, get_segment_dates, "qday", qday_delay), + year_dates = map2(data, local_timezone, get_segment_dates, "yday", yday_delay), + existent_dates = pmap(list(every_date, week_dates, month_dates, quarter_dates, year_dates), function(every_date, week_dates, month_dates, quarter_dates, year_dates) reduce(list(every_date, week_dates,month_dates, quarter_dates, year_dates), .f=full_join)), + inferred_time_segments = map(existent_dates, infer_existent_periodic_segments, time_segments), + inferred_time_segments = map(inferred_time_segments, dedup_nonoverlapping_periodic_segments), + inferred_time_segments = map(inferred_time_segments, add_periodic_segment_timestamps_and_id, local_timezone), + data = map2(data, inferred_time_segments, assign_rows_to_segments)) %>% + select(-existent_dates, -inferred_time_segments, -every_date, -week_dates, -month_dates, -quarter_dates, -year_dates) %>% + unnest(cols = data) %>% + arrange(timestamp) %>% + ungroup() + + return(sensor_data) +} \ No newline at end of file diff --git a/src/data/datetime/assign_to_time_segment.R b/src/data/datetime/assign_to_time_segment.R index f31807b7..bf2eb551 100644 --- a/src/data/datetime/assign_to_time_segment.R +++ b/src/data/datetime/assign_to_time_segment.R @@ -1,81 +1,19 @@ library("tidyverse") +library("glue") library("lubridate", warn.conflicts = F) options(scipen=999) -day_type_delay <- function(time_segments, day_type, include_past_periodic_segments){ - delay <- time_segments %>% mutate(length_duration = duration(length)) %>% filter(repeats_on == day_type) %>% arrange(-length_duration) %>% pull(length_duration) %>% first() - return(if_else(is.na(delay) | include_past_periodic_segments == FALSE, duration("0days"), delay)) -} - -get_segment_dates <- function(data, local_timezone, day_type, delay){ - dates <- data %>% - distinct(local_date) %>% - mutate(local_date_obj = date(lubridate::ymd(local_date, tz = local_timezone))) %>% - complete(local_date_obj = seq(date(min(local_date_obj) - delay), date(max(local_date_obj) + delay), by="days")) %>% - mutate(local_date = replace_na(as.character(date(local_date_obj)))) - - if(day_type == "every_day") - dates <- dates %>% mutate(every_day = 0) - else if (day_type == "wday") - dates <- dates %>% mutate(wday = wday(local_date_obj, week_start = 1)) - else if (day_type == "mday") - dates <- dates %>% mutate(mday = mday(local_date_obj)) - else if (day_type == "qday") - dates <- dates %>% mutate(qday = qday(local_date_obj)) - else if (day_type == "yday") - dates <- dates %>% mutate(yday = yday(local_date_obj)) - return(dates) -} - -create_nonoverlapping_periodic_segments <- function(nested_inferred_time_segments){ - new_segments <- (data.frame(nested_inferred_time_segments %>% - group_by(original_label) %>% - mutate(max_groups = max(overlap_id) + 1) %>% - # select(label, segment_id_start, segment_id_end, overlap_id, max_groups) %>% - nest() %>% - mutate(data = map(data, function(nested_data){ - nested_data <- nested_data %>% arrange( segment_id_start, segment_id_end) %>% - group_by(segment_id_start) %>% - mutate(n_id = ((cur_group_id()-1) %% max_groups)) %>% - filter(overlap_id == n_id) %>% - # select(label, segment_id_start, overlap_id, n_id) %>% - ungroup() - })) %>% - unnest(cols = data) %>% - ungroup() - )) - return(new_segments) -} - - -assign_rows_to_segments <- function(nested_data, nested_inferred_time_segments){ - nested_data <- nested_data %>% mutate(assigned_segments = "") - for(i in seq_len(nrow(nested_inferred_time_segments))) { - segment <- nested_inferred_time_segments[i,] - nested_data$assigned_segments <- ifelse(segment$segment_start_ts<= nested_data$timestamp & segment$segment_end_ts >= nested_data$timestamp, - stringi::stri_c(nested_data$assigned_segments, segment$segment_id, sep = "|"), nested_data$assigned_segments) +assign_rows_to_segments <- function(data, segments){ + # This function is used by all segment types, we use data.tables because they are fast + data <- data.table::as.data.table(data) + data[, assigned_segments := ""] + for(i in seq_len(nrow(segments))) { + segment <- segments[i,] + data[segment$segment_start_ts<= timestamp & segment$segment_end_ts >= timestamp, + assigned_segments := stringi::stri_c(assigned_segments, segment$segment_id, sep = "|")] } - nested_data$assigned_segments <- substring(nested_data$assigned_segments, 2) - return(nested_data) -} - -assign_rows_to_segments_frequency <- function(nested_data, nested_timezone, time_segments){ - for(i in 1:nrow(time_segments)) { - segment <- time_segments[i,] - nested_data$assigned_segments <- ifelse(segment$segment_start_ts<= nested_data$local_time_obj & segment$segment_end_ts >= nested_data$local_time_obj, - # The segment_id is assambled on the fly because it depends on each row's local_date and timezone - stringi::stri_c("[", - segment[["label"]], "#", - nested_data$local_date, " ", - segment[["segment_id_start_time"]], ",", - nested_data$local_date, " ", - segment[["segment_id_end_time"]], ";", - as.numeric(lubridate::as_datetime(stringi::stri_c(nested_data$local_date, segment$segment_id_start_time), tz = nested_timezone)) * 1000, ",", - as.numeric(lubridate::as_datetime(stringi::stri_c(nested_data$local_date, segment$segment_id_end_time), tz = nested_timezone)) * 1000 + 999, - "]"), - nested_data$assigned_segments) - } - return(nested_data) + data[,assigned_segments:=substring(assigned_segments, 2)] + data } assign_to_time_segment <- function(sensor_data, time_segments, time_segments_type, include_past_periodic_segments){ @@ -83,116 +21,14 @@ assign_to_time_segment <- function(sensor_data, time_segments, time_segments_typ if(nrow(sensor_data) == 0 || nrow(time_segments) == 0) return(sensor_data %>% mutate(assigned_segments = NA)) - if(time_segments_type == "FREQUENCY"){ - - time_segments <- time_segments %>% mutate(start_time = lubridate::hm(start_time), - end_time = start_time + minutes(length) - seconds(1), - segment_id_start_time = paste(str_pad(hour(start_time),2, pad="0"), str_pad(minute(start_time),2, pad="0"), str_pad(second(start_time),2, pad="0"),sep =":"), - segment_id_end_time = paste(str_pad(hour(ymd("1970-01-01") + end_time),2, pad="0"), str_pad(minute(ymd("1970-01-01") + end_time),2, pad="0"), str_pad(second(ymd("1970-01-01") + end_time),2, pad="0"),sep =":"), # add ymd("1970-01-01") to get a real time instead of duration - segment_start_ts = as.numeric(start_time), - segment_end_ts = as.numeric(end_time)) - - sensor_data <- sensor_data %>% mutate(local_time_obj = as.numeric(lubridate::hms(local_time)), - assigned_segments = "") - - sensor_data <- sensor_data %>% - group_by(local_timezone) %>% - nest() %>% - mutate(data = map2(data, local_timezone, assign_rows_to_segments_frequency, time_segments)) %>% - unnest(cols = data) %>% - arrange(timestamp) %>% - select(-local_time_obj) %>% - ungroup() - + if (time_segments_type == "FREQUENCY" || time_segments_type == "PERIODIC"){ #FREQUENCY segments are just syntactic sugar for PERIODIC + source("src/data/datetime/assign_to_periodic_segments.R") + sensor_data <- assign_to_periodic_segments(sensor_data, time_segments, include_past_periodic_segments) return(sensor_data) - - } else if (time_segments_type == "PERIODIC"){ - - # We need to take into account segment start dates that could include the first day of data - time_segments <- time_segments %>% mutate(length_duration = duration(length)) - every_day_delay <- duration("0days") - wday_delay <- day_type_delay(time_segments, "wday", include_past_periodic_segments) - mday_delay <- day_type_delay(time_segments, "mday", include_past_periodic_segments) - qday_delay <- day_type_delay(time_segments, "qday", include_past_periodic_segments) - yday_delay <- day_type_delay(time_segments, "yday", include_past_periodic_segments) - - sensor_data <- sensor_data %>% - group_by(local_timezone) %>% - nest() %>% - # get existent days that we need to start segments from - mutate(every_date = map2(data, local_timezone, get_segment_dates, "every_day", every_day_delay), - week_dates = map2(data, local_timezone, get_segment_dates, "wday", wday_delay), - month_dates = map2(data, local_timezone, get_segment_dates, "mday", mday_delay), - quarter_dates = map2(data, local_timezone, get_segment_dates, "qday", qday_delay), - year_dates = map2(data, local_timezone, get_segment_dates, "yday", yday_delay), - existent_dates = pmap(list(every_date, week_dates, month_dates, quarter_dates, year_dates), - function(every_date, week_dates, month_dates, quarter_dates, year_dates) reduce(list(every_date, week_dates,month_dates, quarter_dates, year_dates), .f=full_join)), - # build the actual time segments taking into account the users requested length and repeat schedule - inferred_time_segments = map(existent_dates, - ~ crossing(time_segments, .x) %>% - pivot_longer(cols = c(every_day,wday, mday, qday, yday), names_to = "day_type", values_to = "day_value") %>% - filter(repeats_on == day_type & repeats_value == day_value) %>% - # The segment ids (segment_id_start and segment_id_end) are computed in UTC to avoid having different labels for instances of a segment that happen in different timezones - mutate(segment_id_start = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM")) + period(overlap_duration), - segment_id_end = segment_id_start + lubridate::duration(length), - # The actual segments are computed using timestamps taking into account the timezone - segment_start_ts = as.numeric(lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = local_timezone) + period(overlap_duration)) * 1000, - segment_end_ts = segment_start_ts + as.numeric(lubridate::duration(length)) * 1000 + 999, - segment_id = paste0("[", - paste0(label,"#", - paste0(lubridate::date(segment_id_start), " ", - paste(str_pad(hour(segment_id_start),2, pad="0"), str_pad(minute(segment_id_start),2, pad="0"), str_pad(second(segment_id_start),2, pad="0"),sep =":"), ",", - lubridate::date(segment_id_end), " ", - paste(str_pad(hour(segment_id_end),2, pad="0"), str_pad(minute(segment_id_end),2, pad="0"), str_pad(second(segment_id_end),2, pad="0"),sep =":")),";", - paste0(segment_start_ts, ",", segment_end_ts)), - "]")) %>% - # drop time segments with an invalid start or end time (mostly due to daylight saving changes, e.g. 2020-03-08 02:00:00 EST does not exist, clock jumps from 01:59am to 03:00am) - drop_na(segment_start_ts, segment_end_ts)), - inferred_time_segments = map(inferred_time_segments, create_nonoverlapping_periodic_segments), - data = map2(data, inferred_time_segments, assign_rows_to_segments) - ) %>% - select(-existent_dates, -inferred_time_segments, -every_date, -week_dates, -month_dates, -quarter_dates, -year_dates) %>% - unnest(cols = data) %>% - arrange(timestamp) - } else if ( time_segments_type == "EVENT"){ - - sensor_data <- sensor_data %>% - group_by(local_timezone) %>% - nest() %>% - mutate(inferred_time_segments = map(local_timezone, function(tz){ - inferred <- time_segments %>% - mutate(shift = ifelse(shift == "0", "0seconds", shift), - segment_start_ts = event_timestamp + (as.integer(seconds(lubridate::duration(shift))) * ifelse(shift_direction >= 0, 1, -1) * 1000), - segment_end_ts = segment_start_ts + (as.integer(seconds(lubridate::duration(length))) * 1000), - # these start and end datetime objects are for labeling only - segment_id_start = lubridate::as_datetime(segment_start_ts/1000, tz = tz), - segment_id_end = lubridate::as_datetime(segment_end_ts/1000, tz = tz), - segment_end_ts = segment_end_ts + 999, - segment_id = paste0("[", - paste0(label,"#", - paste0(lubridate::date(segment_id_start), " ", - paste(str_pad(hour(segment_id_start),2, pad="0"), str_pad(minute(segment_id_start),2, pad="0"), str_pad(second(segment_id_start),2, pad="0"),sep =":"), ",", - lubridate::date(segment_id_end), " ", - paste(str_pad(hour(segment_id_end),2, pad="0"), str_pad(minute(segment_id_end),2, pad="0"), str_pad(second(segment_id_end),2, pad="0"),sep =":")),";", - paste0(segment_start_ts, ",", segment_end_ts)), - "]")) - # Check that for overlapping segments (not allowed because our resampling episode algorithm would have to have a second instead of minute granularity that increases storage and computation time) - overlapping <- inferred %>% group_by(label) %>% arrange(segment_start_ts) %>% - mutate(overlaps = if_else(segment_start_ts <= lag(segment_end_ts), TRUE, FALSE), - overlapping_segments = paste(paste(lag(label), lag(event_timestamp), lag(length), lag(shift), lag(shift_direction), lag(device_id), sep = ","),"and", - paste(label, event_timestamp, length, shift, shift_direction, device_id, sep = ","))) - if(any(overlapping$overlaps, na.rm = TRUE)){ - stop(paste0("\n\nOne or more event time segments overlap for ",overlapping$device_id[[1]],", modify their lengths so they don't:\n", paste0(overlapping %>% filter(overlaps == TRUE) %>% pull(overlapping_segments), collapse = "\n"), "\n\n")) - } else{ - return(inferred) - }}), - data = map2(data, inferred_time_segments, assign_rows_to_segments)) %>% - select(-inferred_time_segments) %>% - unnest(data) %>% - arrange(timestamp) + source("src/data/datetime/assign_to_event_segments.R") + sensor_data <- assign_to_event_segments(sensor_data, time_segments) + return(sensor_data) } - - return(sensor_data %>% ungroup()) } \ No newline at end of file diff --git a/src/data/datetime/process_time_segments.R b/src/data/datetime/process_time_segments.R index 92ebc249..96d6d634 100644 --- a/src/data/datetime/process_time_segments.R +++ b/src/data/datetime/process_time_segments.R @@ -105,16 +105,22 @@ validate_frequency_segments <- function(segments){ } prepare_frequency_segments <- function(segments){ + #FREQUENCY segments are just syntactic sugar for PERIODIC validate_frequency_segments(segments) - stamp_fn <- stamp("23:10", orders = c("HM"), quiet = TRUE) + stamp_fn <- stamp("23:10:00", orders = c("HMS"), quiet = TRUE) new_segments <- data.frame(start_time = seq.POSIXt(from = ymd_hms("2020-01-01 00:00:00"), to=ymd_hms("2020-01-02 00:00:00"), by=paste(segments$length, "min"))) new_segments <- new_segments %>% head(-1) %>% - mutate(start_time = stamp_fn(start_time), - length = segments$length, - label = paste0(segments$label, str_pad(row_number()-1, width = 4, pad = "0"))) + mutate(label = paste0(segments$label, str_pad(row_number()-1, width = 4, pad = "0")), + start_time = stamp_fn(start_time), + length = paste0((segments$length * 60)-1, "S"), + repeats_on = "every_day", + repeats_value=0, + overlap_id = 0, + original_label = label, + overlap_duration = "0D") } From 30ad3cd586b4402950b6e9d15ee1955e95183a97 Mon Sep 17 00:00:00 2001 From: JulioV Date: Sun, 28 Mar 2021 14:31:44 -0400 Subject: [PATCH 4/4] Validate participant files without device ids --- src/data/datetime/process_time_segments.R | 15 +++++++-------- src/data/streams/pull_phone_data.R | 17 +++++++++++++++++ src/data/streams/pull_wearable_data.R | 17 +++++++++++++++++ 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/src/data/datetime/process_time_segments.R b/src/data/datetime/process_time_segments.R index 96d6d634..2989043c 100644 --- a/src/data/datetime/process_time_segments.R +++ b/src/data/datetime/process_time_segments.R @@ -173,15 +173,10 @@ validate_event_segments <- function(segments){ return(segments) } -prepare_event_segments <- function(segments, participant_data){ - participant_devices <- get_devices_ids(participant_data) - if(length(participant_devices) == 0) - stop("There are no devices in the participant file.") - +prepare_event_segments <- function(segments, participant_devices){ new_segments <- segments%>% validate_event_segments() %>% filter(device_id %in% participant_devices) - return(new_segments) } compute_time_segments <- function(){ @@ -191,6 +186,11 @@ compute_time_segments <- function(){ participant_file <- snakemake@input[["participant_file"]] message("Processing ",type, " time segments for ", pid,"'s ", participant_file) + participant_data <- yaml::read_yaml(participant_file) + participant_devices <- get_devices_ids(participant_data) + if(length(participant_devices) == 0) + stop("There are no device ids in this participant file for smartphones or wearables: ", participant_file) + if(type == "FREQUENCY"){ segments <- read_csv(segments_file, col_types = cols_only(label = "c", length = "i"), trim_ws = TRUE) new_segments <- prepare_frequency_segments(segments) @@ -198,9 +198,8 @@ compute_time_segments <- function(){ segments <- read_csv(segments_file, col_types = cols_only(label = "c", start_time = "c",length = "c",repeats_on = "c",repeats_value = "i"), trim_ws = TRUE) new_segments <- prepare_periodic_segments(segments) } else if(type == "EVENT"){ - participant_data <- yaml::read_yaml(participant_file) segments <- read_csv(segments_file, col_types = cols_only(label = "c", event_timestamp = "d",length = "c",shift = "c",shift_direction = "i", device_id = "c"), trim_ws = TRUE) - new_segments <- prepare_event_segments(segments, participant_data) + new_segments <- prepare_event_segments(segments, participant_devices) } write.csv(new_segments %>% select(label) %>% distinct(label), snakemake@output[["segments_labels_file"]], row.names = FALSE, quote = FALSE) diff --git a/src/data/streams/pull_phone_data.R b/src/data/streams/pull_phone_data.R index 3755119d..a0e4e036 100644 --- a/src/data/streams/pull_phone_data.R +++ b/src/data/streams/pull_phone_data.R @@ -100,6 +100,22 @@ load_container_script <- function(stream_container){ } } +get_devices_ids <- function(participant_data){ + devices_ids = c() + for(device in participant_data) + for(attribute in names(device)) + if(attribute == "DEVICE_IDS") + devices_ids <- c(devices_ids, device[[attribute]]) + return(devices_ids) +} + +validate_participant_file_without_device_ids <- function(participant_file){ + participant_data <- yaml::read_yaml(participant_file) + participant_devices <- get_devices_ids(participant_data) + if(length(participant_devices) == 0) + stop("There are no device ids in this participant file for smartphones or wearables: ", participant_file) +} + pull_phone_data <- function(){ participant_file <- snakemake@input[["participant_file"]] stream_format <- snakemake@input[["stream_format"]] @@ -111,6 +127,7 @@ pull_phone_data <- function(){ device_type <- "phone" output_data_file <- snakemake@output[[1]] + validate_participant_file_without_device_ids(participant_file) participant_data <- read_yaml(participant_file) stream_schema <- read_yaml(stream_format) rapids_schema <- read_yaml(rapids_schema_file) diff --git a/src/data/streams/pull_wearable_data.R b/src/data/streams/pull_wearable_data.R index 9a0b1647..247e0147 100644 --- a/src/data/streams/pull_wearable_data.R +++ b/src/data/streams/pull_wearable_data.R @@ -68,6 +68,22 @@ load_container_script <- function(stream_container){ } } +get_devices_ids <- function(participant_data){ + devices_ids = c() + for(device in participant_data) + for(attribute in names(device)) + if(attribute == "DEVICE_IDS") + devices_ids <- c(devices_ids, device[[attribute]]) + return(devices_ids) +} + +validate_participant_file_without_device_ids <- function(participant_file){ + participant_data <- yaml::read_yaml(participant_file) + participant_devices <- get_devices_ids(participant_data) + if(length(participant_devices) == 0) + stop("There are no device ids in this participant file for smartphones or wearables: ", participant_file) +} + pull_wearable_data_main <- function(){ participant_file <- snakemake@input[["participant_file"]] stream_format <- snakemake@input[["stream_format"]] @@ -81,6 +97,7 @@ pull_wearable_data_main <- function(){ output_data_file <- snakemake@output[[1]] + validate_participant_file_without_device_ids(participant_file) participant_data <- read_yaml(participant_file) stream_schema <- read_yaml(stream_format) rapids_schema <- read_yaml(rapids_schema_file)