Finalise new day segment structure with calls as example

pull/103/head
JulioV 2020-08-26 12:09:53 -04:00
parent 4b6ba12e62
commit 31ec5b0da4
15 changed files with 263 additions and 134 deletions

View File

@ -41,7 +41,7 @@ if config["CALLS"]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime_unified.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/calls_{call_type}_{day_segment}.csv", pid=config["PIDS"], call_type=config["CALLS"]["TYPES"], day_segment = config["CALLS"]["DAY_SEGMENTS"]))
files_to_compute.extend(expand("data/processed/{pid}/calls_{call_type}.csv", pid=config["PIDS"], call_type=config["CALLS"]["TYPES"], day_segment = config["CALLS"]["DAY_SEGMENTS"]))
if config["BARNETT_LOCATION"]["COMPUTE"]:
if config["BARNETT_LOCATION"]["LOCATIONS_TO_USE"] == "RESAMPLE_FUSED":

View File

@ -4,7 +4,8 @@ PIDS: [t01]
# Global var with common day segments
DAY_SEGMENTS: &day_segments
"data/external/daysegments_default.csv"
TYPE: INTERVAL_EVERY_DAY # FREQUENCY_EVERY_DAY, INTERVAL_EVERY_DAY, INTERVAL_FLEXIBLE_DAY
FILE: "data/external/daysegments_interval.csv"
# Global timezone
# Use codes from https://en.wikipedia.org/wiki/List_of_tz_database_time_zones

View File

@ -1,4 +0,0 @@
label,start,end
daily,00:00, 23:59
morning,06:00, 11:59
eveningblue,18:00, 21:59
1 label start end
2 daily 00:00 23:59
3 morning 06:00 11:59
4 eveningblue 18:00 21:59

View File

@ -1,4 +1,2 @@
label,start,end
daily,00:00, 23:59
morning,06:00, 11:59
evening,18:00, 23:59
label,start_time,length
daily,00:00:00,"23H 59M 59S"

1 label start start_time end length
2 daily 00:00 00:00:00 23:59 23H 59M 59S
morning 06:00 11:59
evening 18:00 23:59

View File

@ -0,0 +1,8 @@
label,start_date_time,length,shift,shift_direction
stress,2020-05-04 11:30:00,1hours,30minutes,-1
stress,2020-05-04 13:30:00,1hours,30minutes,-1
stress1,2020-05-04 11:30:00,1hours,30minutes,-1
stress2,2020-05-04 13:30:00,1hours,30minutes,-1
weekly,2020-04-21 00:00:00,7days,0,0
weekly,2020-04-28 00:00:00,7days,0,0
weekly,2020-05-05 00:00:00,7days,0,0
1 label start_date_time length shift shift_direction
2 stress 2020-05-04 11:30:00 1hours 30minutes -1
3 stress 2020-05-04 13:30:00 1hours 30minutes -1
4 stress1 2020-05-04 11:30:00 1hours 30minutes -1
5 stress2 2020-05-04 13:30:00 1hours 30minutes -1
6 weekly 2020-04-21 00:00:00 7days 0 0
7 weekly 2020-04-28 00:00:00 7days 0 0
8 weekly 2020-05-05 00:00:00 7days 0 0

View File

@ -0,0 +1,2 @@
label,length
tenminutes,10
1 label length
2 tenminutes 10

View File

@ -0,0 +1,6 @@
label,start_time,length
daily,00:00:00,23H 59M 59S
morning,06:00:00,5H 59M 59S
afternoon,12:00:00,5H 59M 59S
evening,18:00:00,5H 59M 59S
night,00:00:00,5H 59M 59S
1 label start_time length
2 daily 00:00:00 23H 59M 59S
3 morning 06:00:00 5H 59M 59S
4 afternoon 12:00:00 5H 59M 59S
5 evening 18:00:00 5H 59M 59S
6 night 00:00:00 5H 59M 59S

View File

@ -26,6 +26,22 @@ def optional_phone_sensed_bins_input(wildcards):
return expand("data/raw/{{pid}}/{table}_with_datetime.csv", table = tables_platform)
def find_day_segments_input_file(wildcards):
for key, values in config.items():
if "DB_TABLE" in config[key] and config[key]["DB_TABLE"] == wildcards.sensor:
if "DAY_SEGMENTS" in config[key]:
return config[key]["DAY_SEGMENTS"]["FILE"]
else:
raise ValueError("{} should have a [DAY_SEGMENTS][FILE] parameter containing the path to its day segments file".format(wildcards.sensor))
def find_day_segments_input_type(wildcards):
for key, values in config.items():
if "DB_TABLE" in config[key] and config[key]["DB_TABLE"] == wildcards.sensor:
if "DAY_SEGMENTS" in config[key]:
return config[key]["DAY_SEGMENTS"]["TYPE"]
else:
raise ValueError("{} should have a [DAY_SEGMENTS][TYPE] parameter containing INTERVAL, FREQUENCY, or EVENT".format(wildcards.sensor))
# Features.smk #########################################################################################################
def optional_ar_input(wildcards):
@ -111,11 +127,3 @@ def optional_heatmap_days_by_sensors_input(wildcards):
tables_platform = [table for table in config["HEATMAP_DAYS_BY_SENSORS"]["DB_TABLES"] if table not in [config["CONVERSATION"]["DB_TABLE"]["ANDROID"], config["ACTIVITY_RECOGNITION"]["DB_TABLE"]["ANDROID"]]] # for ios, discard any android tables that may exist
return expand("data/raw/{{pid}}/{table}_with_datetime.csv", table = tables_platform)
def find_day_segments_input_file(wildcards):
for key, values in config.items():
if "DB_TABLE" in config[key] and config[key]["DB_TABLE"] == wildcards.sensor:
if "DAY_SEGMENTS" in config[key]:
return config[key]["DAY_SEGMENTS"]
else:
raise ValueError("{} should have a DAY_SEGMENTS parameter containing the path to its day segments file".format(wildcards.sensor))

View File

@ -12,13 +12,13 @@ rule messages_features:
rule call_features:
input:
expand("data/raw/{{pid}}/{sensor}_with_datetime_unified.csv", sensor=config["CALLS"]["DB_TABLE"])
expand("data/raw/{{pid}}/{sensor}_with_datetime_unified.csv", sensor=config["CALLS"]["DB_TABLE"]),
day_segments_labels = expand("data/interim/{sensor}_day_segments_labels.csv", sensor=config["CALLS"]["DB_TABLE"])
params:
call_type = "{call_type}",
day_segment = "{day_segment}",
features = lambda wildcards: config["CALLS"]["FEATURES"][wildcards.call_type]
output:
"data/processed/{pid}/calls_{call_type}_{day_segment}.csv"
"data/processed/{pid}/calls_{call_type}.csv"
script:
"../src/features/call_features.R"

View File

@ -41,8 +41,11 @@ rule download_dataset:
rule compute_day_segments:
input:
find_day_segments_input_file
params:
day_segments_type = find_day_segments_input_type
output:
segments_file = "data/interim/{sensor}_day_segments.csv",
segments_labels_file = "data/interim/{sensor}_day_segments_labels.csv",
script:
"../src/data/compute_day_segments.py"
@ -62,7 +65,8 @@ rule readable_datetime:
day_segments = "data/interim/{sensor}_day_segments.csv"
params:
timezones = None,
fixed_timezone = config["READABLE_DATETIME"]["FIXED_TIMEZONE"]
fixed_timezone = config["READABLE_DATETIME"]["FIXED_TIMEZONE"],
day_segments_type = find_day_segments_input_type
wildcard_constraints:
sensor = '.*(' + '|'.join([re.escape(x) for x in PHONE_SENSORS]) + ').*' # only process smartphone sensors, not fitbit
output:

View File

@ -1,54 +1,89 @@
import pandas as pd
def is_valid_frequency_segments(day_segments):
def is_valid_frequency_segments(day_segments, day_segments_file):
"""
returns true if day_segment has the expected structure for generating frequency segments;
raises ValueError exception otherwise.
"""
if day_segments is None:
message = 'Table of frequency segmentation info is None. ' \
'Check the file under DAY_SEGMENTS in config.yaml'
raise ValueError(message)
if day_segments.shape[0] == 0:
message = 'Table of frequency segmentation info is empty. ' \
'Check the file under DAY_SEGMENTS in config.yaml'
raise ValueError(message)
valid_columns = ["label", "length"]
if len(list(set(day_segments.columns) - set(valid_columns))) > 0:
error_message = 'The FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must have two columns: label, and length ' \
'but instead we found {}. Modify {}'.format(list(day_segments.columns), day_segments_file)
raise ValueError(error_message)
if day_segments.shape[0] > 1:
message = 'Table of frequency segmentation info provides multiple specification but only one is allowed. ' \
'Check the file under DAY_SEGMENTS in config.yaml'
raise ValueError(message)
if 'length' not in day_segments.columns:
message = 'Table of frequency segmentation info must provide segment length. ' \
'Check the file under DAY_SEGMENTS in config.yaml'
raise ValueError(message)
if 'label' not in day_segments.columns:
message = 'Table of frequency segmentation info must provide segment label. ' \
'Check the file under DAY_SEGMENTS in config.yaml'
message = 'The FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] can only have 1 row.' \
'Modify {}'.format(day_segments_file)
raise ValueError(message)
if not pd.api.types.is_integer_dtype(day_segments.dtypes['length']):
message = 'Only integer segment length is allowed in the table of frequency segmentation; ' \
'found {}. Check the file under DAY_SEGMENTS in config.yaml'.format(day_segments.dtypes['length'])
message = 'The column length in the FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must be integer but instead is ' \
'{}. Modify {}'.format(day_segments.dtypes['length'], day_segments_file)
raise ValueError(message)
if day_segments.iloc[0].loc['length'] < 0:
message = 'Only positive integer segment length is allowed in the table of frequency segmentation; ' \
'found {}. Check the file under DAY_SEGMENTS in config.yaml'.format(day_segments.iloc[0].loc['length'])
message = 'The value in column length in the FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must be positive but instead is ' \
'{}. Modify {}'.format(day_segments.iloc[0].loc['length'], day_segments_file)
raise ValueError(message)
if day_segments.iloc[0].loc['length'] >= 1440:
message = 'Segment length in the table of frequency segmentation should be shorter than a day (in minutes); ' \
'found {}. Check the file under DAY_SEGMENTS in config.yaml'.format(day_segments.iloc[0].loc['length'])
message = 'The column length in the FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must be shorter than a day in minutes (1440) but instead is ' \
'{}. Modify {}'.format(day_segments.iloc[0].loc['length'], day_segments_file)
raise ValueError(message)
return True
def is_valid_interval_segments(day_segments):
def is_valid_interval_segments(day_segments, day_segments_file):
day_segments = day_segments.copy(deep=True)
valid_columns = ["label", "start_time", "length"]
if len(list(set(day_segments.columns) - set(valid_columns))) > 0:
error_message = 'The INTERVAL_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must have three columns: label, start_time and length ' \
'but instead we found {}. Modify {}'.format(list(day_segments.columns), day_segments_file)
raise ValueError(error_message)
try:
day_segments["start_time"] = pd.to_datetime(day_segments["start_time"])
except ValueError as err:
raise ValueError("At least one start_time in the INTERVAL_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] has an invalid format, it should be HH:MM in 24hr clock({}). Modify {}".format(err, day_segments_file))
if(day_segments.shape[0] != day_segments.drop_duplicates().shape[0]):
error_message = 'The INTERVAL_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] has two or more rows that are identical. ' \
'Modify {}'.format(day_segments_file)
raise ValueError(error_message)
# TODO Validate string format for lubridate
return True
def is_valid_event_segments(day_segments):
return False
def is_valid_event_segments(day_segments, day_segments_file):
day_segments = day_segments.copy(deep=True)
valid_columns = ["label", "start_date_time", "length", "shift", "shift_direction"]
if len(list(set(day_segments.columns) - set(valid_columns))) > 0:
error_message = 'The INTERVAL_FLEXIBLE_DAY day segments file in [DAY_SEGMENTS][FILE] must have five columns: label, start_date_time, length, shift and shift_direction ' \
'but instead we found {}. Modify {}'.format(list(day_segments.columns), day_segments_file)
raise ValueError(error_message)
try:
day_segments["start_date_time"] = pd.to_datetime(day_segments["start_date_time"], format='%Y-%m-%d %H:%M:%S', errors='raise')
except ValueError as err:
raise ValueError("At least one start_date_time has an invalid format, it should be YYYY-MM-DD HH:MM:SS in 24hr clock({}). Modify {}".format(err, day_segments_file))
valid_shift_direction_values = [1, -1, 0]
provided_values = day_segments["shift_direction"].unique()
if len(list(set(provided_values) - set(valid_shift_direction_values))) > 0:
error_message = 'The values of shift_direction column in the INTERVAL_FLEXIBLE_DAY day segments file in [DAY_SEGMENTS][FILE] can only be 1, -1 or 0 ' \
'but instead we found {}. Modify {}'.format(provided_values, day_segments_file)
raise ValueError(error_message)
if(day_segments.shape[0] != day_segments.drop_duplicates().shape[0]):
error_message = 'The INTERVAL_FLEXIBLE_DAY day segments file in [DAY_SEGMENTS][FILE] has two or more rows that are identical. ' \
'Modify {}'.format(day_segments_file)
raise ValueError(error_message)
# TODO Validate string format for lubridate of length and shift
return True
def parse_frequency_segments(day_segments: pd.DataFrame) -> pd.DataFrame:
@ -70,26 +105,22 @@ def parse_frequency_segments(day_segments: pd.DataFrame) -> pd.DataFrame:
slots = ['{:02d}:{:02d}'.format(x.hour, x.minute) for x in slots]
table = pd.DataFrame(slots, columns=['start_time'])
table['end_time'] = table['start_time'].shift(-1)
table['length'] = day_segments.iloc[0].loc['length']
table = table.iloc[:-1, :]
label = day_segments.loc[0, 'label']
table['label'] = range(0, table.shape[0])
table['label'] = table['label'].apply(lambda x: '{}_{:04}'.format(label, x))
table['label'] = table['label'].apply(lambda x: '{}{:04}'.format(label, x))
table['local_date'] = None
return table[['local_date', 'start_time', 'end_time', 'label']]
return table[['start_time', 'length', 'label']]
def parse_interval_segments(day_segments):
day_segments["local_date"] = 1
day_segments = day_segments.rename(columns={"start": "start_time", "end":"end_time"})
return day_segments
def parse_event_segments(day_segments):
return day_segments
def parse_day_segments(day_segments_file):
def parse_day_segments(day_segments_file, segments_type):
# Add code to validate and parse frequencies, intervals, and events
# Expected formats:
# Frequency: label, length columns (e.g. my_prefix, 5) length has to be in minutes (int)
@ -98,15 +129,27 @@ def parse_day_segments(day_segments_file):
# Our output should have local_date, start_time, end_time, label. In the readable_datetime script, If local_date has the same value for all rows, every segment will be applied for all days, otherwise each segment will be applied only to its local_date
day_segments = pd.read_csv(day_segments_file)
if(is_valid_frequency_segments(day_segments)):
if day_segments is None:
message = 'The day segments file in [DAY_SEGMENTS][FILE] is None. Modify {}'.format(local_date)
raise ValueError(message)
if day_segments.shape[0] == 0:
message = 'The day segments file in [DAY_SEGMENTS][FILE] is empty. Modify {}'.format(local_date)
raise ValueError(message)
if(segments_type not in ["FREQUENCY_EVERY_DAY", "INTERVAL_EVERY_DAY", "INTERVAL_FLEXIBLE_DAY"]):
raise ValueError("[DAY_SEGMENTS][TYPE] can only be FREQUENCY_EVERY_DAY, INTERVAL_EVERY_DAY, or INTERVAL_FLEXIBLE_DAY")
if(segments_type == "FREQUENCY_EVERY_DAY" and is_valid_frequency_segments(day_segments, day_segments_file)):
day_segments = parse_frequency_segments(day_segments)
elif(is_valid_interval_segments(day_segments)):
elif(segments_type == "INTERVAL_EVERY_DAY" and is_valid_interval_segments(day_segments, day_segments_file)):
day_segments = parse_interval_segments(day_segments)
elif(is_valid_event_segments(day_segments)):
elif(segments_type == "INTERVAL_FLEXIBLE_DAY" and is_valid_event_segments(day_segments, day_segments_file)):
day_segments = parse_event_segments(day_segments)
else:
raise ValueError("{} does not have a format compatible with frequency, interval or event day segments. Please refer to [LINK]".format(day_segments_file))
return day_segments
day_segments = parse_day_segments(snakemake.input[0])
day_segments.to_csv(snakemake.output["segments_file"], index=False)
final_day_segments = parse_day_segments(snakemake.input[0], snakemake.params["day_segments_type"])
final_day_segments.to_csv(snakemake.output["segments_file"], index=False)
pd.DataFrame({"label" : final_day_segments["label"].unique()}).to_csv(snakemake.output["segments_labels_file"], index=False)

View File

@ -5,37 +5,92 @@ library("readr")
library("lubridate")
input <- read.csv(snakemake@input[["sensor_input"]]) %>% arrange(timestamp)
day_segments <- read.csv(snakemake@input[["day_segments"]]) %>% filter(label != "daily") #daily is done by default by all scripts
day_segments <- read.csv(snakemake@input[["day_segments"]])
day_segments_type <- snakemake@params[["day_segments_type"]]
sensor_output <- snakemake@output[[1]]
timezone_periods <- snakemake@params[["timezone_periods"]]
fixed_timezone <- snakemake@params[["fixed_timezone"]]
assign_to_day_segment <- function(data, day_segments){
data <- data %>% mutate(local_day_segment = NA)
assign_to_day_segment <- function(data, day_segments, day_segments_type, fixed_timezone){
# All segments belong to the same date, so we assume all days have the same segments
if(length(unique(day_segments$local_date)) == 1){
data <- data %>% mutate(local_time_obj = lubridate::hms(local_time))
day_segments <- day_segments %>% mutate(start_time = lubridate::hm(start_time),
end_time = lubridate::hm(end_time))
if(day_segments_type == "FREQUENCY_EVERY_DAY"){
data <- data %>% mutate(local_date_time_obj = lubridate::parse_date_time(local_time, orders = c("HMS", "HM")))
day_segments <- day_segments %>% mutate(start_time = lubridate::parse_date_time(start_time, orders = c("HMS", "HM")),
end_time = start_time + minutes(length))
# Create a new column for each day_segment
for(row_id in 1:nrow(day_segments)){
row = day_segments[row_id,]
data <- data %>% mutate(local_day_segment = ifelse(local_time_obj >= row$start_time & local_time_obj <= row$end_time, row$label, local_day_segment))
data <- data %>% mutate(!!paste("local_day_segment", row_id, sep = "_") := ifelse(local_date_time_obj >= row$start_time & local_date_time_obj < row$end_time,
paste0("[",
row$label, "_",
local_date, "_",
paste(str_pad(hour(row$start_time),2, pad="0"), str_pad(minute(row$start_time),2, pad="0"), str_pad(second(row$start_time),2, pad="0"),sep =":"),
"]"), NA))
}
data <- data %>% select(-local_time_obj)
# Segments belong to different dates, so each day can have different segments
}else{
data <- data %>% mutate(local_date_time_obj = lubridate::ymd_hms(local_date_time))
day_segments <- day_segments %>% mutate(start_local_date_time_obj = lubridate::ymd_hm(paste(local_date, start_time)),
end_local_date_time_obj = lubridate::ymd_hm(paste(local_date, end_time)),
} else if (day_segments_type == "INTERVAL_EVERY_DAY"){
data_dates <- data %>% select(local_date) %>% distinct(local_date)
inferred_day_segments <- crossing(day_segments, data_dates) %>%
mutate(start_local_date_time_obj = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = fixed_timezone),
end_local_date_time_obj = start_local_date_time_obj + lubridate::period(length),
date_time_interval = lubridate::interval(start_local_date_time_obj, end_local_date_time_obj)) %>%
group_by(label, local_date) %>%
mutate(group_start_datetime = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = fixed_timezone),
group_end_datetime = group_start_datetime + lubridate::period(length),
group_start_datetime = min(group_start_datetime),
group_end_datetime = max(group_end_datetime)) %>%
ungroup()
data <- data %>% mutate(local_date_time_obj = lubridate::ymd_hms(local_date_time, tz = fixed_timezone))
# Create a new column for each day_segment
for(row_id in 1:nrow(inferred_day_segments)){
row = inferred_day_segments[row_id,]
data <- data %>% mutate(!!paste("local_day_segment", row_id, sep = "_") := ifelse(local_date_time_obj %within% row$date_time_interval,
paste0("[",
paste(sep= "#",
row$label,
lubridate::date(row$group_start_datetime),
paste(str_pad(hour(row$group_start_datetime),2, pad="0"), str_pad(minute(row$group_start_datetime),2, pad="0"), str_pad(second(row$group_start_datetime),2, pad="0"),sep =":"),
lubridate::date(row$group_end_datetime),
paste(str_pad(hour(row$group_end_datetime),2, pad="0"), str_pad(minute(row$group_end_datetime),2, pad="0"), str_pad(second(row$group_end_datetime),2, pad="0"),sep =":")
),
"]"), NA))
}
} else if ( day_segments_type == "INTERVAL_FLEXIBLE_DAY"){
data <- data %>% mutate(local_date_time_obj = lubridate::ymd_hms(local_date_time, tz = fixed_timezone))
day_segments <- day_segments %>% mutate(shift = ifelse(shift == "0", "0seconds", shift),
start_local_date_time_obj = lubridate::ymd_hms(start_date_time, tz = fixed_timezone) + (lubridate::period(shift) * ifelse(shift_direction >= 0, 1, -1)),
end_local_date_time_obj = start_local_date_time_obj + lubridate::period(length),
date_time_interval = lubridate::interval(start_local_date_time_obj, end_local_date_time_obj))
# Create a new column for each day_segment
for(row_id in 1:nrow(day_segments)){
row = day_segments[row_id,]
data <- data %>% mutate(local_day_segment = ifelse(local_date_time_obj %within% row$date_time_interval, row$label, local_day_segment))
print(row$length)
data <- data %>% mutate(!!paste("local_day_segment", row_id, sep = "_") := ifelse(local_date_time_obj %within% row$date_time_interval,
paste0("[",
paste(sep= "#",
row$label,
lubridate::date(row$start_local_date_time_obj),
paste(str_pad(hour(row$start_local_date_time_obj),2, pad="0"), str_pad(minute(row$start_local_date_time_obj),2, pad="0"), str_pad(second(row$start_local_date_time_obj),2, pad="0"),sep =":"),
lubridate::date(row$end_local_date_time_obj),
paste(str_pad(hour(row$end_local_date_time_obj),2, pad="0"), str_pad(minute(row$end_local_date_time_obj),2, pad="0"), str_pad(second(row$end_local_date_time_obj),2, pad="0"),sep =":")
),
"]"), NA))
}
data <- data %>% select(-local_date_time_obj)
}
# Join all day_segments in a single column
data <- data %>%
unite("assigned_segments", starts_with("local_day_segment"), sep = "|", na.rm = TRUE) %>%
select(-local_date_time_obj)
return(data)
}
@ -46,27 +101,30 @@ split_local_date_time <- function(data, day_segments){
mutate(local_hour = as.numeric(local_hour),
local_minute = as.numeric(local_minute))
split_data <- assign_to_day_segment(split_data, day_segments)
return(split_data)
}
if(!is.null(timezone_periods)){
timezones <- read_csv(timezone_periods)
tz_starts <- timezones$start
output <- input %>%
mutate(timezone = findInterval(timestamp / 1000, tz_starts), # Set an interval ID based on timezones' start column
timezone = ifelse(timezone == 0, 1, timezone), # Correct the first timezone ID
timezone = recode(timezone, !!! timezones$timezone), # Swap IDs for text labels
timezone = as.character(timezone)) %>%
rowwise() %>%
mutate(utc_date_time = as.POSIXct(timestamp/1000, origin="1970-01-01", tz="UTC"),
local_date_time = format(utc_date_time, tz = timezone, usetz = T))
output <- split_local_date_time(output, day_segments)
write.csv(output, sensor_output)
# TODO: Not active yet
# timezones <- read_csv(timezone_periods)
# tz_starts <- timezones$start
# output <- input %>%
# mutate(timezone = findInterval(timestamp / 1000, tz_starts), # Set an interval ID based on timezones' start column
# timezone = ifelse(timezone == 0, 1, timezone), # Correct the first timezone ID
# timezone = recode(timezone, !!! timezones$timezone), # Swap IDs for text labels
# timezone = as.character(timezone)) %>%
# rowwise() %>%
# mutate(utc_date_time = as.POSIXct(timestamp/1000, origin="1970-01-01", tz="UTC"),
# local_date_time = format(utc_date_time, tz = timezone, usetz = T, "%Y-%m-%d %H:%M:%S"))
# output <- split_local_date_time(output, day_segments)
# TODO: Implement day segment assigment with support for multiple timezones
# output <- assign_to_day_segment(output, day_segments, day_segments_type, fixed_timezone)
# write.csv(output, sensor_output)
} else if(!is.null(fixed_timezone)){
output <- input %>%
mutate(utc_date_time = as.POSIXct(timestamp/1000, origin="1970-01-01", tz="UTC"),
local_date_time = format(utc_date_time, tz = fixed_timezone, usetz = F))
local_date_time = format(utc_date_time, tz = fixed_timezone, usetz = F, "%Y-%m-%d %H:%M:%S"))
output <- split_local_date_time(output, day_segments)
output <- assign_to_day_segment(output, day_segments, day_segments_type, fixed_timezone)
write_csv(output, sensor_output)
}

View File

@ -54,7 +54,7 @@ unify_ios_calls <- function(ios_calls){
local_time = first(local_time),
local_hour = first(local_hour),
local_minute = first(local_minute),
local_day_segment = first(local_day_segment))
assigned_segments = first(assigned_segments))
}
else {
ios_calls <- ios_calls %>% summarise(call_type_sequence = paste(call_type, collapse = ","), call_duration = sum(call_duration), timestamp = first(timestamp))

View File

@ -1,13 +1,5 @@
library('tidyr')
filter_by_day_segment <- function(data, day_segment) {
if(day_segment %in% c("morning", "afternoon", "evening", "night"))
data <- data %>% filter(local_day_segment == day_segment)
else if(day_segment == "daily")
return(data)
else
return(data %>% head(0))
}
library('stringr')
Mode <- function(v) {
uniqv <- unique(v)
@ -16,7 +8,7 @@ Mode <- function(v) {
base_call_features <- function(calls, call_type, day_segment, requested_features){
# Output dataframe
features = data.frame(local_date = character(), stringsAsFactors = FALSE)
features = data.frame(local_segment = character(), stringsAsFactors = FALSE)
# The name of the features this function can compute
base_features_names <- c("count", "distinctcontacts", "meanduration", "sumduration", "minduration", "maxduration", "stdduration", "modeduration", "entropyduration", "timefirstcall", "timelastcall", "countmostfrequentcontact")
@ -28,13 +20,21 @@ base_call_features <- function(calls, call_type, day_segment, requested_features
call_type_label = ifelse(call_type == "incoming", "1", ifelse(call_type == "outgoing", "2", ifelse(call_type == "missed", "3", NA)))
if(is.na(call_type_label))
stop(paste("Call type can online be incoming, outgoing or missed but instead you typed: ", call_type))
calls <- calls %>% filter(call_type == call_type_label) %>% filter_by_day_segment(day_segment)
# Filter the rows that belong to day_segment, and put the segment full name in a new column for grouping
date_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2}"
hour_regex = "[0-9]{2}:[0-9]{2}:[0-9]{2}"
calls <- calls %>%
filter(call_type == call_type_label) %>%
filter(grepl(paste0("\\[", day_segment, "#"),assigned_segments)) %>%
mutate(local_segment = str_extract(assigned_segments, paste0("\\[", day_segment, "#", date_regex, "#", hour_regex, "#", date_regex, "#", hour_regex, "\\]")),
local_segment = str_sub(local_segment, 2, -2)) # get rid of first and last character([])
# If there are not features or data to work with, return an empty df with appropiate columns names
if(length(features_to_compute) == 0)
return(features)
if(nrow(calls) < 1)
return(cbind(features, read.csv(text = paste(paste("call", call_type, day_segment, features_to_compute, sep = "_"), collapse = ","), stringsAsFactors = FALSE)))
return(cbind(features, read.csv(text = paste(paste("call", call_type, features_to_compute, sep = "_"), collapse = ","), stringsAsFactors = FALSE)))
for(feature_name in features_to_compute){
if(feature_name == "countmostfrequentcontact"){
@ -48,28 +48,28 @@ base_call_features <- function(calls, call_type, day_segment, requested_features
pull(trace)
feature <- calls %>%
filter(trace == mostfrequentcontact) %>%
group_by(local_date) %>%
summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := n()) %>%
group_by(local_segment) %>%
summarise(!!paste("call", call_type, feature_name, sep = "_") := n()) %>%
replace(is.na(.), 0)
features <- merge(features, feature, by="local_date", all = TRUE)
features <- merge(features, feature, by="local_segment", all = TRUE)
} else {
feature <- calls %>%
group_by(local_date)
group_by(local_segment)
feature <- switch(feature_name,
"count" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := n()),
"distinctcontacts" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := n_distinct(trace)),
"meanduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := mean(call_duration)),
"sumduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := sum(call_duration)),
"minduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := min(call_duration)),
"maxduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := max(call_duration)),
"stdduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := sd(call_duration)),
"modeduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := Mode(call_duration)),
"entropyduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := entropy.MillerMadow(call_duration)),
"timefirstcall" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := first(local_hour) * 60 + first(local_minute)),
"timelastcall" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := last(local_hour) * 60 + last(local_minute)))
"count" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := n()),
"distinctcontacts" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := n_distinct(trace)),
"meanduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := mean(call_duration)),
"sumduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := sum(call_duration)),
"minduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := min(call_duration)),
"maxduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := max(call_duration)),
"stdduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := sd(call_duration)),
"modeduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := Mode(call_duration)),
"entropyduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := entropy.MillerMadow(call_duration)),
"timefirstcall" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := first(local_hour) * 60 + first(local_minute)),
"timelastcall" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := last(local_hour) * 60 + last(local_minute)))
features <- merge(features, feature, by="local_date", all = TRUE)
features <- merge(features, feature, by="local_segment", all = TRUE)
}
}
features <- features %>% mutate_at(vars(contains("countmostfrequentcontact")), list( ~ replace_na(., 0)))

View File

@ -1,18 +1,23 @@
source("renv/activate.R")
source("src/features/call/call_base.R")
library(dplyr)
library(entropy)
calls <- read.csv(snakemake@input[[1]], stringsAsFactors = FALSE)
day_segment <- snakemake@params[["day_segment"]]
day_segments_labels <- read.csv(snakemake@input[["day_segments_labels"]])
requested_features <- snakemake@params[["features"]]
call_type <- snakemake@params[["call_type"]]
features = data.frame(local_date = character(), stringsAsFactors = FALSE)
features = data.frame(local_segment = character(), stringsAsFactors = FALSE)
# Compute base Call features
features <- merge(features, base_call_features(calls, call_type, day_segment, requested_features), by="local_date", all = TRUE)
day_segments <- day_segments_labels %>% pull(label)
for (day_segment in day_segments)
features <- merge(features, base_call_features(calls, call_type, day_segment, requested_features), all = TRUE)
if(ncol(features) != length(requested_features) + 1)
stop(paste0("The number of features in the output dataframe (=", ncol(features),") does not match the expected value (=", length(requested_features)," + 1). Verify your Call feature extraction functions"))
features <- features %>% separate(col = local_segment,
into = c("segment", "local_start_date", "local_start_time", "local_end_date", "local_end_time"),
sep = "#",
remove = FALSE)
write.csv(features, snakemake@output[[1]], row.names = FALSE)