Refactor assign to day segment

pull/103/head
JulioV 2020-09-14 14:21:36 -04:00
parent e7db230426
commit 3c45f6b152
10 changed files with 238 additions and 138 deletions

View File

@ -4,8 +4,8 @@ PIDS: [test01]
# Global var with common day segments
DAY_SEGMENTS: &day_segments
TYPE: INTERVAL_EVERY_DAY # FREQUENCY_EVERY_DAY, INTERVAL_EVERY_DAY, INTERVAL_FLEXIBLE_DAY
FILE: "data/external/daysegments_interval.csv"
TYPE: PERIODIC # FREQUENCY, PERIODIC, EVENT
FILE: "data/external/daysegments_periodic.csv"
# Global timezone
# Use codes from https://en.wikipedia.org/wiki/List_of_tz_database_time_zones

View File

@ -1,8 +1,9 @@
label,start_date_time,length,shift,shift_direction
stress,2020-05-04 11:30:00,1hours,30minutes,-1
stress,2020-05-04 13:30:00,1hours,30minutes,-1
stress1,2020-05-04 11:30:00,1hours,30minutes,-1
stress2,2020-05-04 13:30:00,1hours,30minutes,-1
weekly,2020-04-21 00:00:00,7days,0,0
weekly,2020-04-28 00:00:00,7days,0,0
weekly,2020-05-05 00:00:00,7days,0,0
label,event_timestamp,length,shift,shift_direction,pid
stress,1587661220000,1hours,0minutes,1,test01
stress,1587747620000,4hours,4hours,-1,test01
stress,1587906020000,3hours,0minutes,1,test01
stress,1588003220000,7hours,4hours,-1,test01
stress,1588172420000,9hours,0,-1,test01
mood,1587661220000,7days,0,0,p02
mood,1587747620000,7days,0,0,p02
mood,1587906020000,7days,0,0,p02

1 label start_date_time event_timestamp length shift shift_direction pid
2 stress 2020-05-04 11:30:00 1587661220000 1hours 30minutes 0minutes -1 1 test01
3 stress 2020-05-04 13:30:00 1587747620000 1hours 4hours 30minutes 4hours -1 test01
4 stress1 stress 2020-05-04 11:30:00 1587906020000 1hours 3hours 30minutes 0minutes -1 1 test01
5 stress2 stress 2020-05-04 13:30:00 1588003220000 1hours 7hours 30minutes 4hours -1 test01
6 weekly stress 2020-04-21 00:00:00 1588172420000 7days 9hours 0 0 -1 test01
7 weekly mood 2020-04-28 00:00:00 1587661220000 7days 0 0 p02
8 weekly mood 2020-05-05 00:00:00 1587747620000 7days 0 0 p02
9 mood 1587906020000 7days 0 0 p02

View File

@ -1,6 +0,0 @@
label,start_time,length
daily,00:00:00,23H 59M 59S
morning,06:00:00,5H 59M 59S
afternoon,12:00:00,5H 59M 59S
evening,18:00:00,5H 59M 59S
night,00:00:00,5H 59M 59S
1 label start_time length
2 daily 00:00:00 23H 59M 59S
3 morning 06:00:00 5H 59M 59S
4 afternoon 12:00:00 5H 59M 59S
5 evening 18:00:00 5H 59M 59S
6 night 00:00:00 5H 59M 59S

View File

@ -0,0 +1,7 @@
label,start_time,length,repeats_on,repeats_value
daily,00:00:00,23H 59M 59S,every_day,0
weekly,00:00:00,6D 23H 59M 59S,mday,31
morning,06:00:00,5H 59M 59S,every_day,0
afternoon,12:00:00,5H 59M 59S,every_day,0
evening,18:00:00,5H 59M 59S,wday,2
night,00:00:00,5H 59M 59S,qday,5
1 label start_time length repeats_on repeats_value
2 daily 00:00:00 23H 59M 59S every_day 0
3 weekly 00:00:00 6D 23H 59M 59S mday 31
4 morning 06:00:00 5H 59M 59S every_day 0
5 afternoon 12:00:00 5H 59M 59S every_day 0
6 evening 18:00:00 5H 59M 59S wday 2
7 night 00:00:00 5H 59M 59S qday 5

View File

@ -9,7 +9,7 @@ rule join_features_from_providers:
rule messages_r_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["MESSAGES"]["DB_TABLE"]),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["MESSAGES"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -21,7 +21,7 @@ rule messages_r_features:
rule messages_python_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["MESSAGES"]["DB_TABLE"]),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["MESSAGES"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -33,7 +33,7 @@ rule messages_python_features:
rule calls_python_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime_unified.csv", sensor=config["CALLS"]["DB_TABLE"]),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["CALLS"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -45,7 +45,7 @@ rule calls_python_features:
rule calls_r_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime_unified.csv", sensor=config["CALLS"]["DB_TABLE"]),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["CALLS"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -89,7 +89,7 @@ rule ios_activity_recognition_deltas:
rule locations_python_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_processed_{locations_to_use}.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["LOCATIONS"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}",
@ -101,7 +101,7 @@ rule locations_python_features:
rule locations_r_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_processed_{locations_to_use}.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["LOCATIONS"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -113,7 +113,7 @@ rule locations_r_features:
rule bluetooth_r_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["BLUETOOTH"]["DB_TABLE"]),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["BLUETOOTH"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -125,7 +125,7 @@ rule bluetooth_r_features:
rule bluetooth_python_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["BLUETOOTH"]["DB_TABLE"]),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["BLUETOOTH"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -176,7 +176,7 @@ rule screen_features:
rule light_r_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["LIGHT"]["DB_TABLE"]),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["LIGHT"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -188,7 +188,7 @@ rule light_r_features:
rule light_python_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["LIGHT"]["DB_TABLE"]),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["LIGHT"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -200,7 +200,7 @@ rule light_python_features:
rule conversation_r_features:
input:
sensor_data = optional_conversation_input,
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["CONVERSATION"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -212,7 +212,7 @@ rule conversation_r_features:
rule conversation_python_features:
input:
sensor_data = optional_conversation_input,
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["CONVERSATION"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -238,7 +238,7 @@ rule accelerometer_features:
rule applications_foreground_r_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime_with_genre.csv", sensor=config["APPLICATIONS_FOREGROUND"]["DB_TABLE"]),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["APPLICATIONS_FOREGROUND"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -250,7 +250,7 @@ rule applications_foreground_r_features:
rule applications_foreground_python_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime_with_genre.csv", sensor=config["APPLICATIONS_FOREGROUND"]["DB_TABLE"]),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["APPLICATIONS_FOREGROUND"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -262,7 +262,7 @@ rule applications_foreground_python_features:
rule wifi_r_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor_key}_with_datetime_visibleandconnected.csv", sensor_key="WIFI".lower()),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["WIFI"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
@ -274,7 +274,7 @@ rule wifi_r_features:
rule wifi_python_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor_key}_with_datetime_visibleandconnected.csv", sensor_key="WIFI".lower()),
day_segments_labels = "data/interim/day_segments_labels.csv"
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["WIFI"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"

View File

@ -42,10 +42,11 @@ rule compute_day_segments:
input:
config["DAY_SEGMENTS"]["FILE"]
params:
day_segments_type = config["DAY_SEGMENTS"]["TYPE"]
day_segments_type = config["DAY_SEGMENTS"]["TYPE"],
pid = "{pid}"
output:
segments_file = "data/interim/day_segments.csv",
segments_labels_file = "data/interim/day_segments_labels.csv",
segments_file = "data/interim/day_segments/{pid}_day_segments.csv",
segments_labels_file = "data/interim/day_segments/{pid}_day_segments_labels.csv",
script:
"../src/data/compute_day_segments.py"
@ -62,7 +63,7 @@ if len(config["WIFI"]["DB_TABLE"]["CONNECTED_ACCESS_POINTS"]) > 0:
rule readable_datetime:
input:
sensor_input = "data/raw/{pid}/{sensor}_raw.csv",
day_segments = "data/interim/day_segments.csv"
day_segments = "data/interim/day_segments/{pid}_day_segments.csv"
params:
timezones = None,
fixed_timezone = config["READABLE_DATETIME"]["FIXED_TIMEZONE"],
@ -112,7 +113,7 @@ rule process_location_types:
input:
locations = "data/raw/{pid}/{sensor}_with_datetime.csv",
phone_sensed_bins = rules.phone_sensed_bins.output,
day_segments = "data/interim/day_segments.csv"
day_segments = "data/interim/day_segments/{pid}_day_segments.csv"
params:
bin_size = config["PHONE_VALID_SENSED_BINS"]["BIN_SIZE"],
timezone = config["LOCATIONS"]["TIMEZONE"],

View File

@ -1,85 +1,131 @@
library("tidyverse")
library("lubridate")
assign_to_day_segment <- function(data, day_segments, day_segments_type, fixed_timezone){
assign_to_day_segment <- function(sensor_data, day_segments, day_segments_type){
if(day_segments_type == "FREQUENCY_EVERY_DAY"){
data <- data %>% mutate(local_date_time_obj = lubridate::parse_date_time(local_time, orders = c("HMS", "HM")))
if(day_segments_type == "FREQUENCY"){ #FREQUENCY
sensor_data <- sensor_data %>% mutate(local_date_time_obj = lubridate::parse_date_time(local_time, orders = c("HMS", "HM")))
day_segments <- day_segments %>% mutate(start_time = lubridate::parse_date_time(start_time, orders = c("HMS", "HM")),
end_time = start_time + minutes(length))
# Create a new column for each day_segment
for(row_id in 1:nrow(day_segments)){
row = day_segments[row_id,]
data <- data %>% mutate(!!paste("local_day_segment", row_id, sep = "_") := ifelse(local_date_time_obj >= row$start_time & local_date_time_obj < row$end_time,
sensor_data <- sensor_data %>% mutate(!!paste("local_day_segment", row_id, sep = "_") := ifelse(local_date_time_obj >= row$start_time & local_date_time_obj < row$end_time,
paste0("[",
row$label, "_",
local_date, "_",
paste(str_pad(hour(row$start_time),2, pad="0"), str_pad(minute(row$start_time),2, pad="0"), str_pad(second(row$start_time),2, pad="0"),sep =":"),
row$label, "#",
local_date, "#",
paste(str_pad(hour(row$start_time),2, pad="0"), str_pad(minute(row$start_time),2, pad="0"), str_pad(second(row$start_time),2, pad="0"),sep =":"), "#",
local_date, "#",
paste(str_pad(hour(row$end_time),2, pad="0"), str_pad(minute(row$end_time),2, pad="0"), str_pad(second(row$end_time),2, pad="0"),sep =":"),
"]"), NA))
}
} else if (day_segments_type == "INTERVAL_EVERY_DAY"){
data_dates <- data %>% select(local_date) %>% distinct(local_date)
inferred_day_segments <- crossing(day_segments, data_dates) %>%
mutate(start_local_date_time_obj = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = fixed_timezone),
end_local_date_time_obj = start_local_date_time_obj + lubridate::period(length),
date_time_interval = lubridate::interval(start_local_date_time_obj, end_local_date_time_obj)) %>%
group_by(label, local_date) %>%
mutate(group_start_datetime = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = fixed_timezone),
group_end_datetime = group_start_datetime + lubridate::period(length),
group_start_datetime = min(group_start_datetime),
group_end_datetime = max(group_end_datetime)) %>%
ungroup()
data <- data %>% mutate(local_date_time_obj = lubridate::ymd_hms(local_date_time, tz = fixed_timezone))
# Create a new column for each day_segment
for(row_id in 1:nrow(inferred_day_segments)){
row = inferred_day_segments[row_id,]
data <- data %>% mutate(!!paste("local_day_segment", row_id, sep = "_") := ifelse(local_date_time_obj %within% row$date_time_interval,
paste0("[",
paste(sep= "#",
row$label,
lubridate::date(row$group_start_datetime),
paste(str_pad(hour(row$group_start_datetime),2, pad="0"), str_pad(minute(row$group_start_datetime),2, pad="0"), str_pad(second(row$group_start_datetime),2, pad="0"),sep =":"),
lubridate::date(row$group_end_datetime),
paste(str_pad(hour(row$group_end_datetime),2, pad="0"), str_pad(minute(row$group_end_datetime),2, pad="0"), str_pad(second(row$group_end_datetime),2, pad="0"),sep =":")
),
"]"), NA))
}
} else if ( day_segments_type == "INTERVAL_FLEXIBLE_DAY"){
data <- data %>% mutate(local_date_time_obj = lubridate::ymd_hms(local_date_time, tz = fixed_timezone))
day_segments <- day_segments %>% mutate(shift = ifelse(shift == "0", "0seconds", shift),
start_local_date_time_obj = lubridate::ymd_hms(start_date_time, tz = fixed_timezone) + (lubridate::period(shift) * ifelse(shift_direction >= 0, 1, -1)),
end_local_date_time_obj = start_local_date_time_obj + lubridate::period(length),
date_time_interval = lubridate::interval(start_local_date_time_obj, end_local_date_time_obj))
# Create a new column for each day_segment
for(row_id in 1:nrow(day_segments)){
row = day_segments[row_id,]
print(row$length)
data <- data %>% mutate(!!paste("local_day_segment", row_id, sep = "_") := ifelse(local_date_time_obj %within% row$date_time_interval,
paste0("[",
paste(sep= "#",
row$label,
lubridate::date(row$start_local_date_time_obj),
paste(str_pad(hour(row$start_local_date_time_obj),2, pad="0"), str_pad(minute(row$start_local_date_time_obj),2, pad="0"), str_pad(second(row$start_local_date_time_obj),2, pad="0"),sep =":"),
lubridate::date(row$end_local_date_time_obj),
paste(str_pad(hour(row$end_local_date_time_obj),2, pad="0"), str_pad(minute(row$end_local_date_time_obj),2, pad="0"), str_pad(second(row$end_local_date_time_obj),2, pad="0"),sep =":")
),
"]"), NA))
}
}
# Join all day_segments in a single column
data <- data %>%
sensor_data <- sensor_data %>%
unite("assigned_segments", starts_with("local_day_segment"), sep = "|", na.rm = TRUE) %>%
select(-local_date_time_obj)
return(data)
}
} else if (day_segments_type == "PERIODIC"){ #PERIODIC
sensor_data <- sensor_data %>%
mutate(row_n = row_number()) %>%
group_by(local_timezone) %>%
nest() %>%
# get existent days that we need to start segments from
mutate(existent_dates = map(data, ~.x %>%
distinct(local_date) %>%
mutate(local_date_obj = lubridate::ymd(local_date, tz = local_timezone),
every_day = 0,
wday = wday(local_date_obj, week_start = 1),
mday = mday(local_date_obj),
qday = qday(local_date_obj),
yday = yday(local_date_obj)
) %>% select(local_date, every_day, wday, mday, qday, yday)),
# build the actual day segments taking into account the users requested leangth and repeat schedule
inferred_day_segments = map(existent_dates,
~ crossing(day_segments, .x) %>%
pivot_longer(cols = c(every_day,wday, mday, qday, yday), names_to = "day_type", values_to = "day_value") %>%
filter(repeats_on == day_type & repeats_value == day_value) %>%
mutate(segment_start = (lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = local_timezone)),
segment_end = segment_start + lubridate::period(length),
segment_interval = lubridate::interval(segment_start, segment_end),
segment_id = paste0("[",
paste(sep= "#",
label,
lubridate::date(int_start(segment_interval)),
paste(str_pad(hour(int_start(segment_interval)),2, pad="0"),
str_pad(minute(int_start(segment_interval)),2, pad="0"),
str_pad(second(int_start(segment_interval)),2, pad="0"),sep =":"),
lubridate::date(int_end(segment_interval)),
paste(str_pad(hour(int_end(segment_interval)),2, pad="0"),
str_pad(minute(int_end(segment_interval)),2, pad="0"),
str_pad(second(int_end(segment_interval)),2, pad="0"),sep =":")
),
"]")) %>%
select(segment_interval, label, segment_id)),
# loop thorugh every day segment and assigned it to the rows that fall within its start and end
data = map2(data, inferred_day_segments, function(nested_data, segments){
nested_data <- nested_data %>% mutate(assigned_segments = NA_character_, row_date_time = lubridate::ymd_hms(local_date_time, tz = local_timezone))
for(row_id in 1:nrow(segments)){
row = segments[row_id,]
nested_data <- nested_data %>%
mutate(assigned_segments_temp = if_else(row_date_time %within% row$segment_interval, row$segment_id, NA_character_)) %>%
unite(col = "assigned_segments", c(assigned_segments, assigned_segments_temp), na.rm = TRUE, sep = "") %>%
mutate(assigned_segments = str_replace(assigned_segments, pattern = "\\]\\[", replacement = "\\]\\|\\[")) # this replaces ][ with ]|[
}
return(nested_data %>% select(-row_date_time))
})
) %>%
unnest(cols = data) %>%
arrange(row_n) %>%
select(-row_n, -existent_dates, -inferred_day_segments)
} else if ( day_segments_type == "EVENT"){
most_common_tz <- sensor_data %>% count(local_timezone) %>% slice(which.max(n)) %>% pull(local_timezone)
day_segments <- day_segments %>% mutate(shift = ifelse(shift == "0", "0seconds", shift),
segment_start = event_timestamp + (as.integer(seconds(lubridate::duration(shift))) * ifelse(shift_direction >= 0, 1, -1) * 1000),
segment_end = segment_start + (as.integer(seconds(lubridate::duration(length))) * 1000),
segment_start_datetime = lubridate::as_datetime(segment_start/1000, tz = most_common_tz), # these start and end datetime objects are for labeling only
segment_end_datetime = lubridate::as_datetime(segment_end/1000, tz = most_common_tz),
segment_id = paste0("[",
paste(sep= "#",
label,
lubridate::date(segment_start_datetime),
paste(str_pad(hour(segment_start_datetime),2, pad="0"),
str_pad(minute(segment_start_datetime),2, pad="0"),
str_pad(second(segment_start_datetime),2, pad="0"),sep =":"),
lubridate::date(segment_end_datetime),
paste(str_pad(hour(segment_end_datetime),2, pad="0"),
str_pad(minute(segment_end_datetime),2, pad="0"),
str_pad(second(segment_end_datetime),2, pad="0"),sep =":")
),
"]")) %>%
select(-segment_start_datetime, -segment_end_datetime)
sensor_data <- sensor_data %>%
mutate(row_n = row_number()) %>%
group_by(local_timezone) %>%
nest() %>%
mutate(data = map(data, function(nested_data){
nested_data <- nested_data %>% mutate(assigned_segments = NA_character_)
for(row_id in 1:nrow(day_segments)){
row = day_segments[row_id,]
nested_data <- nested_data %>%
mutate(assigned_segments_temp = if_else(timestamp >= row$segment_start & timestamp <= row$segment_end, row$segment_id, NA_character_)) %>%
unite(col = "assigned_segments", c(assigned_segments, assigned_segments_temp), na.rm = TRUE, sep = "") %>%
mutate(assigned_segments = str_replace(assigned_segments, pattern = "\\]\\[", replacement = "\\]\\|\\[")) #replace ][ with ]|[
}
return(nested_data)
})) %>%
unnest(cols = data) %>%
arrange(row_n) %>%
select(-row_n)
}

View File

@ -8,50 +8,97 @@ def is_valid_frequency_segments(day_segments, day_segments_file):
valid_columns = ["label", "length"]
if len(list(set(day_segments.columns) - set(valid_columns))) > 0:
error_message = 'The FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must have two columns: label, and length ' \
error_message = 'The FREQUENCY day segments file in [DAY_SEGMENTS][FILE] must have two columns: label, and length ' \
'but instead we found {}. Modify {}'.format(list(day_segments.columns), day_segments_file)
raise ValueError(error_message)
if day_segments.shape[0] > 1:
message = 'The FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] can only have 1 row.' \
message = 'The FREQUENCY day segments file in [DAY_SEGMENTS][FILE] can only have 1 row.' \
'Modify {}'.format(day_segments_file)
raise ValueError(message)
if not pd.api.types.is_integer_dtype(day_segments.dtypes['length']):
message = 'The column length in the FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must be integer but instead is ' \
'{}. Modify {}'.format(day_segments.dtypes['length'], day_segments_file)
message = 'The column length in the FREQUENCY day segments file in [DAY_SEGMENTS][FILE] must be integer but instead is ' \
'{}. . This usually means that not all values in this column are formed by digits. Modify {}'.format(day_segments.dtypes['length'], day_segments_file)
raise ValueError(message)
if day_segments.iloc[0].loc['length'] < 0:
message = 'The value in column length in the FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must be positive but instead is ' \
message = 'The value in column length in the FREQUENCY day segments file in [DAY_SEGMENTS][FILE] must be positive but instead is ' \
'{}. Modify {}'.format(day_segments.iloc[0].loc['length'], day_segments_file)
raise ValueError(message)
if day_segments.iloc[0].loc['length'] >= 1440:
message = 'The column length in the FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must be shorter than a day in minutes (1440) but instead is ' \
message = 'The column length in the FREQUENCY day segments file in [DAY_SEGMENTS][FILE] must be shorter than a day in minutes (1440) but instead is ' \
'{}. Modify {}'.format(day_segments.iloc[0].loc['length'], day_segments_file)
raise ValueError(message)
return True
def is_valid_interval_segments(day_segments, day_segments_file):
def is_valid_periodic_segments(day_segments, day_segments_file):
day_segments = day_segments.copy(deep=True)
valid_columns = ["label", "start_time", "length"]
valid_columns = ["label", "start_time", "length", "repeats_on", "repeats_value"]
if len(list(set(day_segments.columns) - set(valid_columns))) > 0:
error_message = 'The INTERVAL_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must have three columns: label, start_time and length ' \
error_message = 'The PERIODIC day segments file in [DAY_SEGMENTS][FILE] must have five columns: label, start_time, length, repeats_on, repeats_value ' \
'but instead we found {}. Modify {}'.format(list(day_segments.columns), day_segments_file)
raise ValueError(error_message)
valid_repeats_on = ["every_day", "wday", "mday", "qday", "yday"]
if len(list(set(day_segments["repeats_on"]) - set(valid_repeats_on))) > 0:
error_message = 'The column repeats_on in the PERIODIC day segments file in [DAY_SEGMENTS][FILE] can only accept: "every_day", "wday", "mday", "qday", or "yday" ' \
'but instead we found {}. Modify {}'.format(list(set(day_segments["repeats_on"])), day_segments_file)
raise ValueError(error_message)
if not pd.api.types.is_integer_dtype(day_segments.dtypes['repeats_value']):
message = 'The column repeats_value in the PERIODIC day segments file in [DAY_SEGMENTS][FILE] must be integer but instead is ' \
'{}. . This usually means that not all values in this column are formed by digits. Modify {}'.format(day_segments.dtypes['repeats_value'], day_segments_file)
raise ValueError(message)
invalid_day_segments = day_segments.query("repeats_on == 'every_day' and repeats_value != 0")
if invalid_day_segments.shape[0] > 0:
message = 'Every row with repeats_on=every_day must have a repeats_value=0 in the PERIODIC day segments file in [DAY_SEGMENTS][FILE].' \
' Modify row(s) of segment(s) {} of {}'.format(invalid_day_segments["label"].to_numpy(), day_segments_file)
raise ValueError(message)
invalid_day_segments = day_segments.query("repeats_on == 'wday' and (repeats_value < 1 | repeats_value > 7)")
if invalid_day_segments.shape[0] > 0:
message = 'Every row with repeats_on=wday must have a repeats_value=[1,7] in the PERIODIC day segments file in [DAY_SEGMENTS][FILE].' \
' Modify row(s) of segment(s) {} of {}'.format(invalid_day_segments["label"].to_numpy(), day_segments_file)
raise ValueError(message)
invalid_day_segments = day_segments.query("repeats_on == 'mday' and (repeats_value < 1 | repeats_value > 31)")
if invalid_day_segments.shape[0] > 0:
message = 'Every row with repeats_on=mday must have a repeats_value=[1,31] in the PERIODIC day segments file in [DAY_SEGMENTS][FILE].' \
' Modify row(s) of segment(s) {} of {}'.format(invalid_day_segments["label"].to_numpy(), day_segments_file)
raise ValueError(message)
invalid_day_segments = day_segments.query("repeats_on == 'qday' and (repeats_value < 1 | repeats_value > 92)")
if invalid_day_segments.shape[0] > 0:
message = 'Every row with repeats_on=qday must have a repeats_value=[1,92] in the PERIODIC day segments file in [DAY_SEGMENTS][FILE].' \
' Modify row(s) of segment(s) {} of {}'.format(invalid_day_segments["label"].to_numpy(), day_segments_file)
raise ValueError(message)
invalid_day_segments = day_segments.query("repeats_on == 'yday' and (repeats_value < 1 | repeats_value > 366)")
if invalid_day_segments.shape[0] > 0:
message = 'Every row with repeats_on=yday must have a repeats_value=[1,366] in the PERIODIC day segments file in [DAY_SEGMENTS][FILE].' \
' Modify row(s) of segment(s) {} of {}'.format(invalid_day_segments["label"].to_numpy(), day_segments_file)
raise ValueError(message)
try:
day_segments["start_time"] = pd.to_datetime(day_segments["start_time"])
except ValueError as err:
raise ValueError("At least one start_time in the INTERVAL_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] has an invalid format, it should be HH:MM in 24hr clock({}). Modify {}".format(err, day_segments_file))
raise ValueError("At least one start_time in the PERIODIC day segments file in [DAY_SEGMENTS][FILE] has an invalid format, it should be HH:MM:SS in 24hr clock({}). Modify {}".format(err, day_segments_file))
if(day_segments.shape[0] != day_segments.drop_duplicates().shape[0]):
error_message = 'The INTERVAL_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] has two or more rows that are identical. ' \
error_message = 'The PERIODIC day segments file in [DAY_SEGMENTS][FILE] has two or more rows that are identical. ' \
'Modify {}'.format(day_segments_file)
raise ValueError(error_message)
duplicated_labels = day_segments[day_segments["label"].duplicated()]
if(duplicated_labels.shape[0] > 0):
error_message = 'Segements labels must be unique. The PERIODIC day segments file in [DAY_SEGMENTS][FILE] has {} row(s) with the same label {}. ' \
'Modify {}'.format(duplicated_labels.shape[0], duplicated_labels["label"].to_numpy(), day_segments_file)
raise ValueError(error_message)
# TODO Validate string format for lubridate
return True
@ -59,30 +106,31 @@ def is_valid_interval_segments(day_segments, day_segments_file):
def is_valid_event_segments(day_segments, day_segments_file):
day_segments = day_segments.copy(deep=True)
valid_columns = ["label", "start_date_time", "length", "shift", "shift_direction"]
valid_columns = ["label", "event_timestamp", "length", "shift", "shift_direction", "pid"]
if len(list(set(day_segments.columns) - set(valid_columns))) > 0:
error_message = 'The INTERVAL_FLEXIBLE_DAY day segments file in [DAY_SEGMENTS][FILE] must have five columns: label, start_date_time, length, shift and shift_direction ' \
error_message = 'The EVENT day segments file in [DAY_SEGMENTS][FILE] must have six columns: label, event_timestamp, length, shift, shift_direction and pid ' \
'but instead we found {}. Modify {}'.format(list(day_segments.columns), day_segments_file)
raise ValueError(error_message)
try:
day_segments["start_date_time"] = pd.to_datetime(day_segments["start_date_time"], format='%Y-%m-%d %H:%M:%S', errors='raise')
except ValueError as err:
raise ValueError("At least one start_date_time has an invalid format, it should be YYYY-MM-DD HH:MM:SS in 24hr clock({}). Modify {}".format(err, day_segments_file))
if not pd.api.types.is_integer_dtype(day_segments.dtypes['event_timestamp']):
message = 'The column event_timestamp in the EVENT day segments file in [DAY_SEGMENTS][FILE] must be integer but instead is ' \
'{}. This usually means that not all values in this column are formed by digits. Modify {}'.format(day_segments.dtypes['event_timestamp'], day_segments_file)
raise ValueError(message)
valid_shift_direction_values = [1, -1, 0]
provided_values = day_segments["shift_direction"].unique()
if len(list(set(provided_values) - set(valid_shift_direction_values))) > 0:
error_message = 'The values of shift_direction column in the INTERVAL_FLEXIBLE_DAY day segments file in [DAY_SEGMENTS][FILE] can only be 1, -1 or 0 ' \
error_message = 'The values of shift_direction column in the EVENT day segments file in [DAY_SEGMENTS][FILE] can only be 1, -1 or 0 ' \
'but instead we found {}. Modify {}'.format(provided_values, day_segments_file)
raise ValueError(error_message)
if(day_segments.shape[0] != day_segments.drop_duplicates().shape[0]):
error_message = 'The INTERVAL_FLEXIBLE_DAY day segments file in [DAY_SEGMENTS][FILE] has two or more rows that are identical. ' \
error_message = 'The EVENT day segments file in [DAY_SEGMENTS][FILE] has two or more rows that are identical. ' \
'Modify {}'.format(day_segments_file)
raise ValueError(error_message)
# TODO Validate string format for lubridate of length and shift
# TODO validate unique labels per participant
return True
@ -114,13 +162,14 @@ def parse_frequency_segments(day_segments: pd.DataFrame) -> pd.DataFrame:
return table[['start_time', 'length', 'label']]
def parse_interval_segments(day_segments):
def parse_periodic_segments(day_segments):
day_segments.loc[day_segments["repeats_on"] == "every_day", "repeats_value"] = 0
return day_segments
def parse_event_segments(day_segments):
return day_segments
def parse_event_segments(day_segments, pid):
return day_segments.query("pid == @pid")
def parse_day_segments(day_segments_file, segments_type):
def parse_day_segments(day_segments_file, segments_type, pid):
# Add code to validate and parse frequencies, intervals, and events
# Expected formats:
# Frequency: label, length columns (e.g. my_prefix, 5) length has to be in minutes (int)
@ -130,26 +179,26 @@ def parse_day_segments(day_segments_file, segments_type):
day_segments = pd.read_csv(day_segments_file)
if day_segments is None:
message = 'The day segments file in [DAY_SEGMENTS][FILE] is None. Modify {}'.format(local_date)
message = 'The day segments file in [DAY_SEGMENTS][FILE] is None. Modify {}'.format(day_segments_file)
raise ValueError(message)
if day_segments.shape[0] == 0:
message = 'The day segments file in [DAY_SEGMENTS][FILE] is empty. Modify {}'.format(local_date)
message = 'The day segments file in [DAY_SEGMENTS][FILE] is empty. Modify {}'.format(day_segments_file)
raise ValueError(message)
if(segments_type not in ["FREQUENCY_EVERY_DAY", "INTERVAL_EVERY_DAY", "INTERVAL_FLEXIBLE_DAY"]):
raise ValueError("[DAY_SEGMENTS][TYPE] can only be FREQUENCY_EVERY_DAY, INTERVAL_EVERY_DAY, or INTERVAL_FLEXIBLE_DAY")
if(segments_type not in ["FREQUENCY", "PERIODIC", "EVENT"]):
raise ValueError("[DAY_SEGMENTS][TYPE] can only be FREQUENCY, PERIODIC, or EVENT")
if(segments_type == "FREQUENCY_EVERY_DAY" and is_valid_frequency_segments(day_segments, day_segments_file)):
if(segments_type == "FREQUENCY" and is_valid_frequency_segments(day_segments, day_segments_file)):
day_segments = parse_frequency_segments(day_segments)
elif(segments_type == "INTERVAL_EVERY_DAY" and is_valid_interval_segments(day_segments, day_segments_file)):
day_segments = parse_interval_segments(day_segments)
elif(segments_type == "INTERVAL_FLEXIBLE_DAY" and is_valid_event_segments(day_segments, day_segments_file)):
day_segments = parse_event_segments(day_segments)
elif(segments_type == "PERIODIC" and is_valid_periodic_segments(day_segments, day_segments_file)):
day_segments = parse_periodic_segments(day_segments)
elif(segments_type == "EVENT" and is_valid_event_segments(day_segments, day_segments_file)):
day_segments = parse_event_segments(day_segments, pid)
else:
raise ValueError("{} does not have a format compatible with frequency, interval or event day segments. Please refer to [LINK]".format(day_segments_file))
raise ValueError("{} does not have a format compatible with frequency, periodic or event day segments. Please refer to [LINK]".format(day_segments_file))
return day_segments
final_day_segments = parse_day_segments(snakemake.input[0], snakemake.params["day_segments_type"])
final_day_segments = parse_day_segments(snakemake.input[0], snakemake.params["day_segments_type"], snakemake.params["pid"])
final_day_segments.to_csv(snakemake.output["segments_file"], index=False)
pd.DataFrame({"label" : final_day_segments["label"].unique()}).to_csv(snakemake.output["segments_labels_file"], index=False)

View File

@ -40,7 +40,8 @@ if(!is.null(timezone_periods)){
} else if(!is.null(fixed_timezone)){
output <- input %>%
mutate(utc_date_time = as.POSIXct(timestamp/1000, origin="1970-01-01", tz="UTC"),
local_date_time = format(utc_date_time, tz = fixed_timezone, usetz = F, "%Y-%m-%d %H:%M:%S"))
local_timezone = fixed_timezone,
local_date_time = format(utc_date_time, tz = fixed_timezone, "%Y-%m-%d %H:%M:%S"))
output <- split_local_date_time(output, day_segments)
output <- assign_to_day_segment(output, day_segments, day_segments_type, fixed_timezone)
write_csv(output, sensor_output)

View File

@ -54,6 +54,7 @@ unify_ios_calls <- function(ios_calls){
local_time = first(local_time),
local_hour = first(local_hour),
local_minute = first(local_minute),
local_timezone = first(local_timezone),
assigned_segments = first(assigned_segments))
}
else {