Fix overlapping periodic time segments
parent
1314f1d1cf
commit
d0858f8833
|
@ -3,7 +3,7 @@ stress,1587661220000,1H,0M,1,a748ee1a-1d0b-4ae9-9074-279a2b6ba524
|
|||
stress,1587747620000,4H,4H,-1,a748ee1a-1d0b-4ae9-9074-279a2b6ba524
|
||||
stress,1587906020000,3H,0M,1,a748ee1a-1d0b-4ae9-9074-279a2b6ba524
|
||||
stress,1588003220000,7H,4H,-1,a748ee1a-1d0b-4ae9-9074-279a2b6ba524
|
||||
stress,1588172420000,9H,0,-1,a748ee1a-1d0b-4ae9-9074-279a2b6ba524
|
||||
mood,1587661220000,1H,0,0,a748ee1a-1d0b-4ae9-9074-279a2b6ba524
|
||||
mood,1587747620000,1D,0,0,a748ee1a-1d0b-4ae9-9074-279a2b6ba524
|
||||
mood,1587906020000,7D,0,0,a748ee1a-1d0b-4ae9-9074-279a2b6ba524
|
||||
stress,1588172420000,9H,0M,-1,a748ee1a-1d0b-4ae9-9074-279a2b6ba524
|
||||
mood,1587661220000,1H,0M,0,a748ee1a-1d0b-4ae9-9074-279a2b6ba524
|
||||
mood,1587747620000,1D,0M,0,a748ee1a-1d0b-4ae9-9074-279a2b6ba524
|
||||
mood,1587906020000,7D,0M,0,a748ee1a-1d0b-4ae9-9074-279a2b6ba524
|
||||
|
|
|
|
@ -3,4 +3,6 @@ daily,00:00:00,23H 59M 59S,every_day,0
|
|||
morning,06:00:00,5H 59M 59S,every_day,0
|
||||
afternoon,12:00:00,5H 59M 59S,every_day,0
|
||||
evening,18:00:00,5H 59M 59S,every_day,0
|
||||
night,00:00:00,5H 59M 59S,every_day,0
|
||||
night,00:00:00,5H 59M 59S,every_day,0
|
||||
two_weeks_overlapping,00:00:00,13D 23H 59M 59S,every_day,0
|
||||
weekends,00:00:00,1D 23H 59M 59S,wday,6
|
||||
|
|
|
|
@ -23,10 +23,10 @@ rule pull_phone_data:
|
|||
script:
|
||||
"../src/data/streams/pull_phone_data.R"
|
||||
|
||||
rule compute_time_segments:
|
||||
rule process_time_segments:
|
||||
input:
|
||||
config["TIME_SEGMENTS"]["FILE"],
|
||||
"data/external/participant_files/{pid}.yaml"
|
||||
segments_file = config["TIME_SEGMENTS"]["FILE"],
|
||||
participant_file = "data/external/participant_files/{pid}.yaml"
|
||||
params:
|
||||
time_segments_type = config["TIME_SEGMENTS"]["TYPE"],
|
||||
pid = "{pid}"
|
||||
|
@ -34,7 +34,7 @@ rule compute_time_segments:
|
|||
segments_file = "data/interim/time_segments/{pid}_time_segments.csv",
|
||||
segments_labels_file = "data/interim/time_segments/{pid}_time_segments_labels.csv",
|
||||
script:
|
||||
"../src/data/compute_time_segments.py"
|
||||
"../src/data/datetime/process_time_segments.R"
|
||||
|
||||
rule phone_readable_datetime:
|
||||
input:
|
||||
|
|
|
@ -1,216 +0,0 @@
|
|||
import pandas as pd
|
||||
import warnings
|
||||
import yaml
|
||||
|
||||
def is_valid_frequency_segments(time_segments, time_segments_file):
|
||||
"""
|
||||
returns true if time_segment has the expected structure for generating frequency segments;
|
||||
raises ValueError exception otherwise.
|
||||
"""
|
||||
|
||||
valid_columns = ["label", "length"]
|
||||
if set(time_segments.columns) != set(valid_columns):
|
||||
error_message = 'The FREQUENCY time segments file in [TIME_SEGMENTS][FILE] must have two columns: label, and length ' \
|
||||
'but instead we found {}. Modify {}'.format(list(time_segments.columns), time_segments_file)
|
||||
raise ValueError(error_message)
|
||||
|
||||
if time_segments.shape[0] > 1:
|
||||
message = 'The FREQUENCY time segments file in [TIME_SEGMENTS][FILE] can only have 1 row.' \
|
||||
'Modify {}'.format(time_segments_file)
|
||||
raise ValueError(message)
|
||||
|
||||
if not pd.api.types.is_integer_dtype(time_segments.dtypes['length']):
|
||||
message = 'The column length in the FREQUENCY time segments file in [TIME_SEGMENTS][FILE] must be integer but instead is ' \
|
||||
'{}. . This usually means that not all values in this column are formed by digits. Modify {}'.format(time_segments.dtypes['length'], time_segments_file)
|
||||
raise ValueError(message)
|
||||
|
||||
if time_segments.iloc[0].loc['length'] < 0:
|
||||
message = 'The value in column length in the FREQUENCY time segments file in [TIME_SEGMENTS][FILE] must be positive but instead is ' \
|
||||
'{}. Modify {}'.format(time_segments.iloc[0].loc['length'], time_segments_file)
|
||||
raise ValueError(message)
|
||||
if time_segments.iloc[0].loc['length'] >= 1440:
|
||||
message = 'The column length in the FREQUENCY time segments file in [TIME_SEGMENTS][FILE] must be shorter than a day in minutes (1440) but instead is ' \
|
||||
'{}. Modify {}'.format(time_segments.iloc[0].loc['length'], time_segments_file)
|
||||
raise ValueError(message)
|
||||
|
||||
return True
|
||||
|
||||
def is_valid_periodic_segments(time_segments, time_segments_file):
|
||||
time_segments = time_segments.copy(deep=True)
|
||||
|
||||
valid_columns = ["label", "start_time", "length", "repeats_on", "repeats_value"]
|
||||
if set(time_segments.columns) != set(valid_columns):
|
||||
error_message = 'The PERIODIC time segments file in [TIME_SEGMENTS][FILE] must have five columns: label, start_time, length, repeats_on, repeats_value ' \
|
||||
'but instead we found {}. Modify {}'.format(list(time_segments.columns), time_segments_file)
|
||||
raise ValueError(error_message)
|
||||
|
||||
valid_repeats_on = ["every_day", "wday", "mday", "qday", "yday"]
|
||||
if len(list(set(time_segments["repeats_on"]) - set(valid_repeats_on))) > 0:
|
||||
error_message = 'The column repeats_on in the PERIODIC time segments file in [TIME_SEGMENTS][FILE] can only accept: "every_day", "wday", "mday", "qday", or "yday" ' \
|
||||
'but instead we found {}. Modify {}'.format(list(set(time_segments["repeats_on"])), time_segments_file)
|
||||
raise ValueError(error_message)
|
||||
|
||||
if not pd.api.types.is_integer_dtype(time_segments.dtypes['repeats_value']):
|
||||
message = 'The column repeats_value in the PERIODIC time segments file in [TIME_SEGMENTS][FILE] must be integer but instead is ' \
|
||||
'{}. . This usually means that not all values in this column are formed by digits. Modify {}'.format(time_segments.dtypes['repeats_value'], time_segments_file)
|
||||
raise ValueError(message)
|
||||
|
||||
invalid_time_segments = time_segments.query("repeats_on == 'every_day' and repeats_value != 0")
|
||||
if invalid_time_segments.shape[0] > 0:
|
||||
message = 'Every row with repeats_on=every_day must have a repeats_value=0 in the PERIODIC time segments file in [TIME_SEGMENTS][FILE].' \
|
||||
' Modify row(s) of segment(s) {} of {}'.format(invalid_time_segments["label"].to_numpy(), time_segments_file)
|
||||
raise ValueError(message)
|
||||
|
||||
invalid_time_segments = time_segments.query("repeats_on == 'wday' and (repeats_value < 1 | repeats_value > 7)")
|
||||
if invalid_time_segments.shape[0] > 0:
|
||||
message = 'Every row with repeats_on=wday must have a repeats_value=[1,7] in the PERIODIC time segments file in [TIME_SEGMENTS][FILE].' \
|
||||
' Modify row(s) of segment(s) {} of {}'.format(invalid_time_segments["label"].to_numpy(), time_segments_file)
|
||||
raise ValueError(message)
|
||||
|
||||
invalid_time_segments = time_segments.query("repeats_on == 'mday' and (repeats_value < 1 | repeats_value > 31)")
|
||||
if invalid_time_segments.shape[0] > 0:
|
||||
message = 'Every row with repeats_on=mday must have a repeats_value=[1,31] in the PERIODIC time segments file in [TIME_SEGMENTS][FILE].' \
|
||||
' Modify row(s) of segment(s) {} of {}'.format(invalid_time_segments["label"].to_numpy(), time_segments_file)
|
||||
raise ValueError(message)
|
||||
|
||||
invalid_time_segments = time_segments.query("repeats_on == 'qday' and (repeats_value < 1 | repeats_value > 92)")
|
||||
if invalid_time_segments.shape[0] > 0:
|
||||
message = 'Every row with repeats_on=qday must have a repeats_value=[1,92] in the PERIODIC time segments file in [TIME_SEGMENTS][FILE].' \
|
||||
' Modify row(s) of segment(s) {} of {}'.format(invalid_time_segments["label"].to_numpy(), time_segments_file)
|
||||
raise ValueError(message)
|
||||
|
||||
invalid_time_segments = time_segments.query("repeats_on == 'yday' and (repeats_value < 1 | repeats_value > 366)")
|
||||
if invalid_time_segments.shape[0] > 0:
|
||||
message = 'Every row with repeats_on=yday must have a repeats_value=[1,366] in the PERIODIC time segments file in [TIME_SEGMENTS][FILE].' \
|
||||
' Modify row(s) of segment(s) {} of {}'.format(invalid_time_segments["label"].to_numpy(), time_segments_file)
|
||||
raise ValueError(message)
|
||||
|
||||
try:
|
||||
time_segments["start_time"] = pd.to_datetime(time_segments["start_time"])
|
||||
except ValueError as err:
|
||||
raise ValueError("At least one start_time in the PERIODIC time segments file in [TIME_SEGMENTS][FILE] has an invalid format, it should be HH:MM:SS in 24hr clock({}). Modify {}".format(err, time_segments_file))
|
||||
|
||||
if(time_segments.shape[0] != time_segments.drop_duplicates().shape[0]):
|
||||
error_message = 'The PERIODIC time segments file in [TIME_SEGMENTS][FILE] has two or more rows that are identical. ' \
|
||||
'Modify {}'.format(time_segments_file)
|
||||
raise ValueError(error_message)
|
||||
|
||||
duplicated_labels = time_segments[time_segments["label"].duplicated()]
|
||||
if(duplicated_labels.shape[0] > 0):
|
||||
error_message = 'Segements labels must be unique. The PERIODIC time segments file in [TIME_SEGMENTS][FILE] has {} row(s) with the same label {}. ' \
|
||||
'Modify {}'.format(duplicated_labels.shape[0], duplicated_labels["label"].to_numpy(), time_segments_file)
|
||||
raise ValueError(error_message)
|
||||
|
||||
# TODO Validate string format for lubridate
|
||||
|
||||
return True
|
||||
|
||||
def is_valid_event_segments(time_segments, time_segments_file):
|
||||
time_segments = time_segments.copy(deep=True)
|
||||
|
||||
valid_columns = ["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]
|
||||
if set(time_segments.columns) != set(valid_columns):
|
||||
error_message = 'The EVENT time segments file in [TIME_SEGMENTS][FILE] must have six columns: label, event_timestamp, length, shift, shift_direction and device_id ' \
|
||||
'but instead we found {}. Modify {}'.format(list(time_segments.columns), time_segments_file)
|
||||
raise ValueError(error_message)
|
||||
|
||||
if not pd.api.types.is_integer_dtype(time_segments.dtypes['event_timestamp']):
|
||||
message = 'The column event_timestamp in the EVENT time segments file in [TIME_SEGMENTS][FILE] must be integer but instead is ' \
|
||||
'{}. This usually means that not all values in this column are formed by digits. Modify {}'.format(time_segments.dtypes['event_timestamp'], time_segments_file)
|
||||
raise ValueError(message)
|
||||
|
||||
valid_shift_direction_values = [1, -1, 0]
|
||||
provided_values = time_segments["shift_direction"].unique()
|
||||
if len(list(set(provided_values) - set(valid_shift_direction_values))) > 0:
|
||||
error_message = 'The values of shift_direction column in the EVENT time segments file in [TIME_SEGMENTS][FILE] can only be 1, -1 or 0 ' \
|
||||
'but instead we found {}. Modify {}'.format(provided_values, time_segments_file)
|
||||
raise ValueError(error_message)
|
||||
|
||||
if(time_segments.shape[0] != time_segments.drop_duplicates().shape[0]):
|
||||
error_message = 'The EVENT time segments file in [TIME_SEGMENTS][FILE] has two or more rows that are identical. ' \
|
||||
'Modify {}'.format(time_segments_file)
|
||||
raise ValueError(error_message)
|
||||
|
||||
# TODO Validate string format for lubridate of length and shift
|
||||
# TODO validate unique labels per participant
|
||||
return True
|
||||
|
||||
|
||||
def parse_frequency_segments(time_segments: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
returns a table with rows identifying start and end of time slots with frequency freq (in minutes). For example,
|
||||
for freq = 10 it outputs:
|
||||
bin_id start end label
|
||||
0 00:00 00:10 epoch_0000
|
||||
1 00:10 00:20 epoch_0001
|
||||
2 00:20 00:30 epoch_0002
|
||||
...
|
||||
143 23:50 00:00 epoch_0143
|
||||
time_segments argument is expected to have the following structure:
|
||||
label length
|
||||
epoch 10
|
||||
"""
|
||||
freq = time_segments.iloc[0].loc['length']
|
||||
slots = pd.date_range(start='2020-01-01', end='2020-01-02', freq='{}min'.format(freq))
|
||||
slots = ['{:02d}:{:02d}'.format(x.hour, x.minute) for x in slots]
|
||||
|
||||
table = pd.DataFrame(slots, columns=['start_time'])
|
||||
table['length'] = time_segments.iloc[0].loc['length']
|
||||
table = table.iloc[:-1, :]
|
||||
|
||||
label = time_segments.loc[0, 'label']
|
||||
table['label'] = range(0, table.shape[0])
|
||||
table['label'] = table['label'].apply(lambda x: '{}{:04}'.format(label, x))
|
||||
|
||||
return table[['start_time', 'length', 'label']]
|
||||
|
||||
def parse_periodic_segments(time_segments):
|
||||
time_segments.loc[time_segments["repeats_on"] == "every_day", "repeats_value"] = 0
|
||||
return time_segments
|
||||
|
||||
def parse_event_segments(time_segments, device_ids):
|
||||
return time_segments.query("device_id == @device_ids")
|
||||
|
||||
def parse_time_segments(time_segments_file, segments_type, device_ids):
|
||||
# Add code to validate and parse frequencies, intervals, and events
|
||||
# Expected formats:
|
||||
# Frequency: label, length columns (e.g. my_prefix, 5) length has to be in minutes (int)
|
||||
# Interval: label, start, end columns (e.g. daily, 00:00, 23:59) start and end should be valid hours in 24 hour format
|
||||
# Event: label, timestamp, length, shift (e.g., survey1, 1532313215463, 60, -30), timestamp is a UNIX timestamp in ms (we could take a date time string instead), length is in minutes (int), shift is in minutes (+/-int) and is added/substracted from timestamp
|
||||
# Our output should have local_date, start_time, end_time, label. In the readable_datetime script, If local_date has the same value for all rows, every segment will be applied for all days, otherwise each segment will be applied only to its local_date
|
||||
time_segments = pd.read_csv(time_segments_file)
|
||||
|
||||
if time_segments is None:
|
||||
message = 'The time segments file in [TIME_SEGMENTS][FILE] is None. Modify {}'.format(time_segments_file)
|
||||
raise ValueError(message)
|
||||
|
||||
if time_segments.shape[0] == 0:
|
||||
message = 'The time segments file in [TIME_SEGMENTS][FILE] is empty. Modify {}'.format(time_segments_file)
|
||||
raise ValueError(message)
|
||||
|
||||
if(segments_type not in ["FREQUENCY", "PERIODIC", "EVENT"]):
|
||||
raise ValueError("[TIME_SEGMENTS][TYPE] can only be FREQUENCY, PERIODIC, or EVENT")
|
||||
|
||||
if(segments_type == "FREQUENCY" and is_valid_frequency_segments(time_segments, time_segments_file)):
|
||||
time_segments = parse_frequency_segments(time_segments)
|
||||
elif(segments_type == "PERIODIC" and is_valid_periodic_segments(time_segments, time_segments_file)):
|
||||
time_segments = parse_periodic_segments(time_segments)
|
||||
elif(segments_type == "EVENT" and is_valid_event_segments(time_segments, time_segments_file)):
|
||||
time_segments = parse_event_segments(time_segments, device_ids)
|
||||
else:
|
||||
raise ValueError("{} does not have a format compatible with frequency, periodic or event time segments. Please refer to [LINK]".format(time_segments_file))
|
||||
return time_segments
|
||||
|
||||
participant_file = yaml.load(open(snakemake.input[1], 'r'), Loader=yaml.FullLoader)
|
||||
device_ids = []
|
||||
for key in participant_file.keys():
|
||||
if "DEVICE_IDS" in participant_file[key] and isinstance(participant_file[key]["DEVICE_IDS"], list):
|
||||
device_ids = device_ids + participant_file[key]["DEVICE_IDS"]
|
||||
|
||||
final_time_segments = parse_time_segments(snakemake.input[0], snakemake.params["time_segments_type"], device_ids)
|
||||
|
||||
if snakemake.params["time_segments_type"] == "EVENT" and final_time_segments.shape[0] == 0:
|
||||
warnings.warn("There are no event time segments for {}. Check your time segment file {}".format(snakemake.params["pid"], snakemake.input[0]))
|
||||
|
||||
final_time_segments.to_csv(snakemake.output["segments_file"], index=False)
|
||||
pd.DataFrame({"label" : final_time_segments["label"].unique()}).to_csv(snakemake.output["segments_labels_file"], index=False)
|
|
@ -11,7 +11,7 @@ get_segment_dates <- function(data, local_timezone, day_type, delay){
|
|||
dates <- data %>%
|
||||
distinct(local_date) %>%
|
||||
mutate(local_date_obj = date(lubridate::ymd(local_date, tz = local_timezone))) %>%
|
||||
complete(local_date_obj = seq(date(min(local_date_obj) - delay), max(local_date_obj), by="days")) %>%
|
||||
complete(local_date_obj = seq(date(min(local_date_obj) - delay), date(max(local_date_obj) + delay), by="days")) %>%
|
||||
mutate(local_date = replace_na(as.character(date(local_date_obj))))
|
||||
|
||||
if(day_type == "every_day")
|
||||
|
@ -27,6 +27,27 @@ get_segment_dates <- function(data, local_timezone, day_type, delay){
|
|||
return(dates)
|
||||
}
|
||||
|
||||
create_nonoverlapping_periodic_segments <- function(nested_inferred_time_segments){
|
||||
new_segments <- (data.frame(nested_inferred_time_segments %>%
|
||||
group_by(original_label) %>%
|
||||
mutate(max_groups = max(overlap_id) + 1) %>%
|
||||
# select(label, segment_id_start, segment_id_end, overlap_id, max_groups) %>%
|
||||
nest() %>%
|
||||
mutate(data = map(data, function(nested_data){
|
||||
nested_data <- nested_data %>% arrange( segment_id_start, segment_id_end) %>%
|
||||
group_by(segment_id_start) %>%
|
||||
mutate(n_id = ((cur_group_id()-1) %% max_groups)) %>%
|
||||
filter(overlap_id == n_id) %>%
|
||||
# select(label, segment_id_start, overlap_id, n_id) %>%
|
||||
ungroup()
|
||||
})) %>%
|
||||
unnest(cols = data) %>%
|
||||
ungroup()
|
||||
))
|
||||
return(new_segments)
|
||||
}
|
||||
|
||||
|
||||
assign_rows_to_segments <- function(nested_data, nested_inferred_time_segments){
|
||||
nested_data <- nested_data %>% mutate(assigned_segments = "")
|
||||
for(i in seq_len(nrow(nested_inferred_time_segments))) {
|
||||
|
@ -113,10 +134,10 @@ assign_to_time_segment <- function(sensor_data, time_segments, time_segments_typ
|
|||
pivot_longer(cols = c(every_day,wday, mday, qday, yday), names_to = "day_type", values_to = "day_value") %>%
|
||||
filter(repeats_on == day_type & repeats_value == day_value) %>%
|
||||
# The segment ids (segment_id_start and segment_id_end) are computed in UTC to avoid having different labels for instances of a segment that happen in different timezones
|
||||
mutate(segment_id_start = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM")),
|
||||
mutate(segment_id_start = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM")) + period(overlap_duration),
|
||||
segment_id_end = segment_id_start + lubridate::duration(length),
|
||||
# The actual segments are computed using timestamps taking into account the timezone
|
||||
segment_start_ts = as.numeric(lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = local_timezone)) * 1000,
|
||||
segment_start_ts = as.numeric(lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = local_timezone) + period(overlap_duration)) * 1000,
|
||||
segment_end_ts = segment_start_ts + as.numeric(lubridate::duration(length)) * 1000 + 999,
|
||||
segment_id = paste0("[",
|
||||
paste0(label,"#",
|
||||
|
@ -128,6 +149,7 @@ assign_to_time_segment <- function(sensor_data, time_segments, time_segments_typ
|
|||
"]")) %>%
|
||||
# drop time segments with an invalid start or end time (mostly due to daylight saving changes, e.g. 2020-03-08 02:00:00 EST does not exist, clock jumps from 01:59am to 03:00am)
|
||||
drop_na(segment_start_ts, segment_end_ts)),
|
||||
inferred_time_segments = map(inferred_time_segments, create_nonoverlapping_periodic_segments),
|
||||
data = map2(data, inferred_time_segments, assign_rows_to_segments)
|
||||
) %>%
|
||||
select(-existent_dates, -inferred_time_segments, -every_date, -week_dates, -month_dates, -quarter_dates, -year_dates) %>%
|
||||
|
|
|
@ -0,0 +1,204 @@
|
|||
source("renv/activate.R")
|
||||
library("lubridate")
|
||||
library("readr")
|
||||
library("dplyr")
|
||||
library("tidyr")
|
||||
library("stringr")
|
||||
library("yaml")
|
||||
|
||||
validate_periodic_segments <- function(segments){
|
||||
invalid_lengths <- segments %>% mutate(is_valid = str_detect(length, "^[[:space:]]*(\\d+?[d|D])??[[:space:]]*(\\d+?[h|H])??[[:space:]]*(\\d+?[m|M])??[[:space:]]*(\\d+?[s|S])??$"))
|
||||
if(any(!(invalid_lengths$is_valid)))
|
||||
stop("One or more rows in your periodic time segments file have an invalid length format (XXD XXH XXM XXS): ",
|
||||
paste(invalid_lengths %>% filter(!is_valid) %>% pull(label), collapse = ", "))
|
||||
|
||||
if(any(is.na(segments$length_period)))
|
||||
stop("One or more rows in your periodic time segments file have an invalid length value: ",
|
||||
paste(segments %>% filter(is.na(length_period)) %>% pull(label), collapse = ","))
|
||||
|
||||
if(any(is.na(segments$start_time_format)))
|
||||
stop("One or more rows in your periodic time segments file have an invalid start_time (HH:MM:SS): ",
|
||||
paste(segments %>% filter(is.na(start_time_format)) %>% pull(label), collapse = ", "))
|
||||
|
||||
longer_start_time <- segments %>% mutate(is_longer = start_time_format > period("23H 59M 59S"))
|
||||
if(any(longer_start_time$is_longer))
|
||||
stop("One or more rows in your periodic time segments file have a start_time longer than 23:59:59: ",
|
||||
paste(longer_start_time %>% filter(is_longer) %>% pull(label), collapse = ", "))
|
||||
|
||||
invalid_repeats_on <- segments %>% filter(!repeats_on %in% c("every_day", "wday", "mday", "qday","yday")) %>% pull(label)
|
||||
if(length(invalid_repeats_on) > 0)
|
||||
stop("One or more rows in your periodic time segments file have an invalid repeats_on: ",
|
||||
paste(invalid_repeats_on, collapse = ","),
|
||||
". Valid values include: ",
|
||||
paste(c("every_day", "wday", "mday", "qday","yday"), collapse = ", "))
|
||||
|
||||
if(nrow(count(segments, label) %>% filter(n > 1)) > 0)
|
||||
stop("The values in the column 'label' should be unique but they are not: ",
|
||||
paste(count(segments, label) %>% filter(n > 1) %>% pull(label), collapse = ", "),
|
||||
". Valid values include: ",
|
||||
paste(c("every_day", "wday", "mday", "qday","yday"), collapse = ", "))
|
||||
|
||||
if(nrow(filter(segments, length_period > repeats_on_period & repeats_on %in% c("mday", "qday", "yday"))))
|
||||
stop("We do not support mday, qday, or yday segments that overlap yet. Get in touch with the RAPIDS team if you'd like to have this functionality. Overlapping segments: ",
|
||||
paste((filter(segments, length_period > repeats_on_period)) %>% filter(repeats_on %in% c("mday", "qday", "yday")) %>% pull(label), collapse = ","))
|
||||
|
||||
distinct_segments <- segments %>% distinct(across(-label), .keep_all=TRUE)
|
||||
if(nrow(segments) != nrow(distinct_segments))
|
||||
stop("Your periodic time segments file has ", nrow(segments) - nrow(distinct_segments), " duplicated row(s) (excluding label): ",
|
||||
paste(setdiff(segments %>% pull(label), distinct_segments %>% pull(label)), collapse = ","))
|
||||
|
||||
invalid_repeats_value <- segments %>%
|
||||
mutate(is_invalid = case_when(repeats_on == "every_day" ~ repeats_value != 0,
|
||||
repeats_on == "wday" ~ repeats_value < 1 | repeats_value > 7,
|
||||
repeats_on == "mday" ~ repeats_value < 1 | repeats_value > 31,
|
||||
repeats_on == "qday" ~ repeats_value < 1 | repeats_value > 91,
|
||||
repeats_on == "yday" ~ repeats_value < 1 | repeats_value > 365))
|
||||
if(any(invalid_repeats_value$is_invalid))
|
||||
stop("One or more rows in your periodic time segments file have an invalid repeats_value (0 for every_day, [1,7] for wday, [1,31] for mday, [1,91] for qday, [1,366] for yday): ",
|
||||
paste(invalid_repeats_value %>% filter(is_invalid) %>% pull(label), collapse = ", "))
|
||||
return(segments)
|
||||
|
||||
}
|
||||
|
||||
validate_periodic_columns <- function(segments){
|
||||
if(nrow(segments) == 0)
|
||||
stop("Your periodic time segments file is empty: ", segments_file)
|
||||
|
||||
if(!identical(colnames(segments), c("label","start_time","length","repeats_on","repeats_value")))
|
||||
stop("Your periodic time segments file does not have the expected columns (label,start_time,length,repeats_on,repeats_value). Maybe you have a typo in the names?")
|
||||
return(segments)
|
||||
}
|
||||
|
||||
prepare_periodic_segments <- function(segments){
|
||||
segments <- segments %>%
|
||||
validate_periodic_columns() %>%
|
||||
mutate(length_period = period(length),
|
||||
start_time_format = hms(start_time, quiet = TRUE),
|
||||
repeats_on_period = case_when(repeats_on == "every_day" ~ period("1D"),
|
||||
repeats_on == "wday" ~ period("7D"),
|
||||
repeats_on == "mday" ~ period("28D"),
|
||||
repeats_on == "qday" ~ period("95D"),
|
||||
repeats_on == "yday" ~ period("365D"))) %>%
|
||||
validate_periodic_segments() %>%
|
||||
mutate(new_segments = (length_period %/% repeats_on_period) + 1) %>%
|
||||
uncount(weights = new_segments, .remove = FALSE, .id = "overlap_id") %>%
|
||||
mutate(overlap_id = overlap_id -1,
|
||||
original_label = label,
|
||||
overlap_duration = paste0(overlap_id * repeats_on_period / days(1),"D"),
|
||||
label = paste0(label, "_RR", overlap_id, "SS")) %>%
|
||||
select(label,start_time,length,repeats_on,repeats_value,overlap_duration,overlap_id,original_label)
|
||||
return(segments)
|
||||
}
|
||||
|
||||
validate_frequency_segments <- function(segments){
|
||||
if(nrow(segments) == 0)
|
||||
stop("Your frequency time segments file is empty: ", segments_file)
|
||||
if(!identical(colnames(segments), c("label","length")))
|
||||
stop("Your frequency time segments file does not have the expected columns (label, length). Maybe you have a typo in the names?")
|
||||
if(nrow(segments) > 1)
|
||||
stop("Your frequency time segments file cannot have more than one row")
|
||||
if(any(is.na(segments$label)))
|
||||
stop("Your frequency time segments file has an empty or invalid label")
|
||||
if(nrow(segments %>% filter(!is.na(length) & length >= 1 & length <= 1440)) == 0)
|
||||
stop("Your frequency time segments file has an empty or invalid length (only numbers between [1,1440] are accepted), you typed: ", segments$length)
|
||||
return(segments)
|
||||
}
|
||||
|
||||
prepare_frequency_segments <- function(segments){
|
||||
validate_frequency_segments(segments)
|
||||
stamp_fn <- stamp("23:10", orders = c("HM"), quiet = TRUE)
|
||||
new_segments <- data.frame(start_time = seq.POSIXt(from = ymd_hms("2020-01-01 00:00:00"),
|
||||
to=ymd_hms("2020-01-02 00:00:00"),
|
||||
by=paste(segments$length, "min")))
|
||||
new_segments <- new_segments %>%
|
||||
head(-1) %>%
|
||||
mutate(start_time = stamp_fn(start_time),
|
||||
length = segments$length,
|
||||
label = paste0(segments$label, str_pad(row_number()-1, width = 4, pad = "0")))
|
||||
|
||||
}
|
||||
|
||||
get_devices_ids <- function(participant_data){
|
||||
devices_ids = c()
|
||||
for(device in participant_data)
|
||||
for(attribute in names(device))
|
||||
if(attribute == "DEVICE_IDS")
|
||||
devices_ids <- c(devices_ids, device[[attribute]])
|
||||
return(devices_ids)
|
||||
}
|
||||
|
||||
validate_event_segments <- function(segments){
|
||||
if(nrow(segments) == 0)
|
||||
stop("The following time segments file is empty: ", segments_file)
|
||||
|
||||
if(!identical(colnames(segments), c("label","event_timestamp","length","shift","shift_direction","device_id")))
|
||||
stop("Your periodic time segments file does not have the expected columns (label,event_timestamp,length,shift,shift_direction,device_id). Maybe you have a typo in the names?")
|
||||
|
||||
invalid_lengths <- segments %>% mutate(is_valid = str_detect(length, "^[[:space:]]*(\\d+?[d|D])??[[:space:]]*(\\d+?[h|H])??[[:space:]]*(\\d+?[m|M])??[[:space:]]*(\\d+?[s|S])??$"))
|
||||
if(any(!(invalid_lengths$is_valid)))
|
||||
stop("One or more rows in your event time segments file have an invalid length format (XXD XXH XXM XXS): ",
|
||||
paste(invalid_lengths %>% filter(!is_valid) %>% pull(label), collapse = ", "))
|
||||
|
||||
invalid_shifts <- segments %>% mutate(is_valid = str_detect(shift, "^[[:space:]]*(\\d+?[d|D])??[[:space:]]*(\\d+?[h|H])??[[:space:]]*(\\d+?[m|M])??[[:space:]]*(\\d+?[s|S])??$"))
|
||||
if(any(!(invalid_shifts$is_valid)))
|
||||
stop("One or more rows in your event time segments file have an invalid shift format (XXD XXH XXM XXS): ",
|
||||
paste(invalid_shifts %>% filter(!is_valid) %>% pull(label), collapse = ", "))
|
||||
|
||||
invalid_shift_direction <- segments %>% filter(shift_direction < -1 | shift_direction > 1)
|
||||
if(nrow(invalid_shift_direction) > 0)
|
||||
stop("One or more rows in your event time segments file have an invalid shift direction (-1,0,1): ",
|
||||
paste(invalid_shift_direction %>% pull(label), collapse = ", "))
|
||||
|
||||
invalid_timestamps <- segments %>% filter(is.na(event_timestamp))
|
||||
if(nrow(invalid_timestamps) > 0)
|
||||
stop("One or more rows in your event time segments file have an empty timestamp: ",
|
||||
paste(invalid_timestamps %>% pull(label), collapse = ", "))
|
||||
|
||||
invalid_timestamps <- segments %>% filter(event_timestamp <= 999999999999)
|
||||
if(nrow(invalid_timestamps) > 0)
|
||||
stop("One or more rows in your event time segments file is not in milliseconds: ",
|
||||
paste(invalid_timestamps %>% pull(label), collapse = ", "))
|
||||
|
||||
distinct_segments <- segments %>% mutate(row_id = row_number()) %>% distinct(across(c(-label, -row_id)), .keep_all=TRUE)
|
||||
if(nrow(segments) != nrow(distinct_segments))
|
||||
stop("Your event time segments file has ", nrow(segments) - nrow(distinct_segments), " duplicated row(s) (excluding label). Duplicated row number(s): ",
|
||||
paste(setdiff(segments %>% mutate(row_id = row_number()) %>% pull(row_id), distinct_segments %>% pull(row_id)), collapse = ","))
|
||||
|
||||
return(segments)
|
||||
}
|
||||
|
||||
prepare_event_segments <- function(segments, participant_data){
|
||||
participant_devices <- get_devices_ids(participant_data)
|
||||
if(length(participant_devices) == 0)
|
||||
stop("There are no devices in the participant file.")
|
||||
|
||||
new_segments <- segments%>%
|
||||
validate_event_segments() %>%
|
||||
filter(device_id %in% participant_devices)
|
||||
return(new_segments)
|
||||
}
|
||||
|
||||
compute_time_segments <- function(){
|
||||
type = snakemake@params[["time_segments_type"]]
|
||||
pid = snakemake@params[["pid"]]
|
||||
segments_file <- snakemake@input[["segments_file"]]
|
||||
participant_file <- snakemake@input[["participant_file"]]
|
||||
message("Processing ",type, " time segments for ", pid,"'s ", participant_file)
|
||||
|
||||
if(type == "FREQUENCY"){
|
||||
segments <- read_csv(segments_file, col_types = cols_only(label = "c", length = "i"), trim_ws = TRUE)
|
||||
new_segments <- prepare_frequency_segments(segments)
|
||||
} else if(type == "PERIODIC"){
|
||||
segments <- read_csv(segments_file, col_types = cols_only(label = "c", start_time = "c",length = "c",repeats_on = "c",repeats_value = "i"), trim_ws = TRUE)
|
||||
new_segments <- prepare_periodic_segments(segments)
|
||||
} else if(type == "EVENT"){
|
||||
participant_data <- yaml::read_yaml(participant_file)
|
||||
segments <- read_csv(segments_file, col_types = cols_only(label = "c", event_timestamp = "d",length = "c",shift = "c",shift_direction = "i", device_id = "c"), trim_ws = TRUE)
|
||||
new_segments <- prepare_event_segments(segments, participant_data)
|
||||
}
|
||||
|
||||
write.csv(new_segments %>% select(label) %>% distinct(label), snakemake@output[["segments_labels_file"]], row.names = FALSE, quote = FALSE)
|
||||
write.csv(new_segments,snakemake@output[["segments_file"]], row.names = FALSE, quote = FALSE)
|
||||
}
|
||||
|
||||
compute_time_segments()
|
|
@ -11,4 +11,4 @@ for(location_features_file in location_features_files){
|
|||
location_features <- merge(location_features, read.csv(location_features_file), all = TRUE)
|
||||
}
|
||||
|
||||
write.csv(location_features, snakemake@output[[1]], row.names = FALSE)
|
||||
write.csv(location_features %>% arrange(local_segment), snakemake@output[[1]], row.names = FALSE)
|
|
@ -1,22 +1,13 @@
|
|||
source("renv/activate.R")
|
||||
|
||||
library(tidyr)
|
||||
library(purrr)
|
||||
library("dplyr", warn.conflicts = F)
|
||||
library("methods")
|
||||
library("mgm")
|
||||
library("qgraph")
|
||||
library("dplyr", warn.conflicts = F)
|
||||
library("scales")
|
||||
library("ggplot2")
|
||||
library("purrr")
|
||||
library("tidyr")
|
||||
library("reshape2")
|
||||
|
||||
feature_files <- snakemake@input[["feature_files"]]
|
||||
|
||||
features_for_individual_model <- feature_files %>%
|
||||
map(read.csv, stringsAsFactors = F, colClasses = c(local_segment = "character", local_segment_label = "character", local_segment_start_datetime="character", local_segment_end_datetime="character")) %>%
|
||||
reduce(full_join, by=c("local_segment","local_segment_label","local_segment_start_datetime","local_segment_end_datetime"))
|
||||
reduce(full_join, by=c("local_segment","local_segment_label","local_segment_start_datetime","local_segment_end_datetime")) %>%
|
||||
arrange(local_segment)
|
||||
|
||||
write.csv(features_for_individual_model, snakemake@output[[1]], row.names = FALSE)
|
||||
|
|
|
@ -69,8 +69,8 @@ fetch_provider_features <- function(provider, provider_key, sensor_key, sensor_d
|
|||
for(feature in provider[["FEATURES"]])
|
||||
sensor_features[,feature] <- NA
|
||||
}
|
||||
|
||||
sensor_features <- sensor_features %>% extract(col = local_segment,
|
||||
sensor_features <- sensor_features %>% mutate(local_segment = str_remove(local_segment, "_RR\\d+SS")) %>%
|
||||
extract(col = local_segment,
|
||||
into = c("local_segment_label", "local_segment_start_datetime", "local_segment_end_datetime"),
|
||||
"(.*)#(.*),(.*)",
|
||||
remove = FALSE)
|
||||
|
|
|
@ -113,6 +113,7 @@ def fetch_provider_features(provider, provider_key, sensor_key, sensor_data_file
|
|||
for feature in provider["FEATURES"]:
|
||||
sensor_features[feature] = None
|
||||
segment_colums = pd.DataFrame()
|
||||
sensor_features['local_segment'] = sensor_features['local_segment'].str.replace(r'_RR\d+SS', '')
|
||||
split_segemnt_columns = sensor_features["local_segment"].str.split(pat="(.*)#(.*),(.*)", expand=True)
|
||||
new_segment_columns = split_segemnt_columns.iloc[:,1:4] if split_segemnt_columns.shape[1] == 5 else pd.DataFrame(columns=["local_segment_label", "local_segment_start_datetime","local_segment_end_datetime"])
|
||||
segment_colums[["local_segment_label", "local_segment_start_datetime", "local_segment_end_datetime"]] = new_segment_columns
|
||||
|
|
Loading…
Reference in New Issue