From 31ec5b0da4e64d921b36b3ffbe2e7573dbeef1c9 Mon Sep 17 00:00:00 2001 From: JulioV Date: Wed, 26 Aug 2020 12:09:53 -0400 Subject: [PATCH] Finalise new day segment structure with calls as example --- Snakefile | 2 +- config.yaml | 3 +- data/external/daysegments_bluetooth.csv | 4 - data/external/daysegments_default.csv | 6 +- data/external/daysegments_event.csv | 8 ++ data/external/daysegments_frequency.csv | 2 + data/external/daysegments_interval.csv | 6 ++ rules/common.smk | 24 +++-- rules/features.smk | 6 +- rules/preprocessing.smk | 6 +- src/data/compute_day_segments.py | 127 +++++++++++++++-------- src/data/readable_datetime.R | 130 +++++++++++++++++------- src/data/unify_utils.R | 2 +- src/features/call/call_base.R | 56 +++++----- src/features/call_features.R | 15 ++- 15 files changed, 263 insertions(+), 134 deletions(-) delete mode 100644 data/external/daysegments_bluetooth.csv create mode 100644 data/external/daysegments_event.csv create mode 100644 data/external/daysegments_frequency.csv create mode 100644 data/external/daysegments_interval.csv diff --git a/Snakefile b/Snakefile index 647db573..77d225d0 100644 --- a/Snakefile +++ b/Snakefile @@ -41,7 +41,7 @@ if config["CALLS"]["COMPUTE"]: files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"])) files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"])) files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime_unified.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"])) - files_to_compute.extend(expand("data/processed/{pid}/calls_{call_type}_{day_segment}.csv", pid=config["PIDS"], call_type=config["CALLS"]["TYPES"], day_segment = config["CALLS"]["DAY_SEGMENTS"])) + files_to_compute.extend(expand("data/processed/{pid}/calls_{call_type}.csv", pid=config["PIDS"], call_type=config["CALLS"]["TYPES"], day_segment = config["CALLS"]["DAY_SEGMENTS"])) if config["BARNETT_LOCATION"]["COMPUTE"]: if config["BARNETT_LOCATION"]["LOCATIONS_TO_USE"] == "RESAMPLE_FUSED": diff --git a/config.yaml b/config.yaml index 5d54eee8..99bd4346 100644 --- a/config.yaml +++ b/config.yaml @@ -4,7 +4,8 @@ PIDS: [t01] # Global var with common day segments DAY_SEGMENTS: &day_segments - "data/external/daysegments_default.csv" + TYPE: INTERVAL_EVERY_DAY # FREQUENCY_EVERY_DAY, INTERVAL_EVERY_DAY, INTERVAL_FLEXIBLE_DAY + FILE: "data/external/daysegments_interval.csv" # Global timezone # Use codes from https://en.wikipedia.org/wiki/List_of_tz_database_time_zones diff --git a/data/external/daysegments_bluetooth.csv b/data/external/daysegments_bluetooth.csv deleted file mode 100644 index 7ca74747..00000000 --- a/data/external/daysegments_bluetooth.csv +++ /dev/null @@ -1,4 +0,0 @@ -label,start,end -daily,00:00, 23:59 -morning,06:00, 11:59 -eveningblue,18:00, 21:59 diff --git a/data/external/daysegments_default.csv b/data/external/daysegments_default.csv index 6f275993..28d17a88 100644 --- a/data/external/daysegments_default.csv +++ b/data/external/daysegments_default.csv @@ -1,4 +1,2 @@ -label,start,end -daily,00:00, 23:59 -morning,06:00, 11:59 -evening,18:00, 23:59 +label,start_time,length +daily,00:00:00,"23H 59M 59S" diff --git a/data/external/daysegments_event.csv b/data/external/daysegments_event.csv new file mode 100644 index 00000000..45890d0d --- /dev/null +++ b/data/external/daysegments_event.csv @@ -0,0 +1,8 @@ +label,start_date_time,length,shift,shift_direction +stress,2020-05-04 11:30:00,1hours,30minutes,-1 +stress,2020-05-04 13:30:00,1hours,30minutes,-1 +stress1,2020-05-04 11:30:00,1hours,30minutes,-1 +stress2,2020-05-04 13:30:00,1hours,30minutes,-1 +weekly,2020-04-21 00:00:00,7days,0,0 +weekly,2020-04-28 00:00:00,7days,0,0 +weekly,2020-05-05 00:00:00,7days,0,0 diff --git a/data/external/daysegments_frequency.csv b/data/external/daysegments_frequency.csv new file mode 100644 index 00000000..55bcfba3 --- /dev/null +++ b/data/external/daysegments_frequency.csv @@ -0,0 +1,2 @@ +label,length +tenminutes,10 \ No newline at end of file diff --git a/data/external/daysegments_interval.csv b/data/external/daysegments_interval.csv new file mode 100644 index 00000000..ccb83d13 --- /dev/null +++ b/data/external/daysegments_interval.csv @@ -0,0 +1,6 @@ +label,start_time,length +daily,00:00:00,23H 59M 59S +morning,06:00:00,5H 59M 59S +afternoon,12:00:00,5H 59M 59S +evening,18:00:00,5H 59M 59S +night,00:00:00,5H 59M 59S \ No newline at end of file diff --git a/rules/common.smk b/rules/common.smk index f4dc9b16..68747403 100644 --- a/rules/common.smk +++ b/rules/common.smk @@ -26,6 +26,22 @@ def optional_phone_sensed_bins_input(wildcards): return expand("data/raw/{{pid}}/{table}_with_datetime.csv", table = tables_platform) +def find_day_segments_input_file(wildcards): + for key, values in config.items(): + if "DB_TABLE" in config[key] and config[key]["DB_TABLE"] == wildcards.sensor: + if "DAY_SEGMENTS" in config[key]: + return config[key]["DAY_SEGMENTS"]["FILE"] + else: + raise ValueError("{} should have a [DAY_SEGMENTS][FILE] parameter containing the path to its day segments file".format(wildcards.sensor)) + +def find_day_segments_input_type(wildcards): + for key, values in config.items(): + if "DB_TABLE" in config[key] and config[key]["DB_TABLE"] == wildcards.sensor: + if "DAY_SEGMENTS" in config[key]: + return config[key]["DAY_SEGMENTS"]["TYPE"] + else: + raise ValueError("{} should have a [DAY_SEGMENTS][TYPE] parameter containing INTERVAL, FREQUENCY, or EVENT".format(wildcards.sensor)) + # Features.smk ######################################################################################################### def optional_ar_input(wildcards): @@ -111,11 +127,3 @@ def optional_heatmap_days_by_sensors_input(wildcards): tables_platform = [table for table in config["HEATMAP_DAYS_BY_SENSORS"]["DB_TABLES"] if table not in [config["CONVERSATION"]["DB_TABLE"]["ANDROID"], config["ACTIVITY_RECOGNITION"]["DB_TABLE"]["ANDROID"]]] # for ios, discard any android tables that may exist return expand("data/raw/{{pid}}/{table}_with_datetime.csv", table = tables_platform) - -def find_day_segments_input_file(wildcards): - for key, values in config.items(): - if "DB_TABLE" in config[key] and config[key]["DB_TABLE"] == wildcards.sensor: - if "DAY_SEGMENTS" in config[key]: - return config[key]["DAY_SEGMENTS"] - else: - raise ValueError("{} should have a DAY_SEGMENTS parameter containing the path to its day segments file".format(wildcards.sensor)) diff --git a/rules/features.smk b/rules/features.smk index 58c5450b..7098e6c9 100644 --- a/rules/features.smk +++ b/rules/features.smk @@ -12,13 +12,13 @@ rule messages_features: rule call_features: input: - expand("data/raw/{{pid}}/{sensor}_with_datetime_unified.csv", sensor=config["CALLS"]["DB_TABLE"]) + expand("data/raw/{{pid}}/{sensor}_with_datetime_unified.csv", sensor=config["CALLS"]["DB_TABLE"]), + day_segments_labels = expand("data/interim/{sensor}_day_segments_labels.csv", sensor=config["CALLS"]["DB_TABLE"]) params: call_type = "{call_type}", - day_segment = "{day_segment}", features = lambda wildcards: config["CALLS"]["FEATURES"][wildcards.call_type] output: - "data/processed/{pid}/calls_{call_type}_{day_segment}.csv" + "data/processed/{pid}/calls_{call_type}.csv" script: "../src/features/call_features.R" diff --git a/rules/preprocessing.smk b/rules/preprocessing.smk index c0afeb89..4d8e9439 100644 --- a/rules/preprocessing.smk +++ b/rules/preprocessing.smk @@ -41,8 +41,11 @@ rule download_dataset: rule compute_day_segments: input: find_day_segments_input_file + params: + day_segments_type = find_day_segments_input_type output: segments_file = "data/interim/{sensor}_day_segments.csv", + segments_labels_file = "data/interim/{sensor}_day_segments_labels.csv", script: "../src/data/compute_day_segments.py" @@ -62,7 +65,8 @@ rule readable_datetime: day_segments = "data/interim/{sensor}_day_segments.csv" params: timezones = None, - fixed_timezone = config["READABLE_DATETIME"]["FIXED_TIMEZONE"] + fixed_timezone = config["READABLE_DATETIME"]["FIXED_TIMEZONE"], + day_segments_type = find_day_segments_input_type wildcard_constraints: sensor = '.*(' + '|'.join([re.escape(x) for x in PHONE_SENSORS]) + ').*' # only process smartphone sensors, not fitbit output: diff --git a/src/data/compute_day_segments.py b/src/data/compute_day_segments.py index 88dec5cb..e93b7f2b 100644 --- a/src/data/compute_day_segments.py +++ b/src/data/compute_day_segments.py @@ -1,54 +1,89 @@ import pandas as pd -def is_valid_frequency_segments(day_segments): +def is_valid_frequency_segments(day_segments, day_segments_file): """ returns true if day_segment has the expected structure for generating frequency segments; raises ValueError exception otherwise. """ - if day_segments is None: - message = 'Table of frequency segmentation info is None. ' \ - 'Check the file under DAY_SEGMENTS in config.yaml' - raise ValueError(message) + + valid_columns = ["label", "length"] + if len(list(set(day_segments.columns) - set(valid_columns))) > 0: + error_message = 'The FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must have two columns: label, and length ' \ + 'but instead we found {}. Modify {}'.format(list(day_segments.columns), day_segments_file) + raise ValueError(error_message) - if day_segments.shape[0] == 0: - message = 'Table of frequency segmentation info is empty. ' \ - 'Check the file under DAY_SEGMENTS in config.yaml' - raise ValueError(message) if day_segments.shape[0] > 1: - message = 'Table of frequency segmentation info provides multiple specification but only one is allowed. ' \ - 'Check the file under DAY_SEGMENTS in config.yaml' - raise ValueError(message) - - if 'length' not in day_segments.columns: - message = 'Table of frequency segmentation info must provide segment length. ' \ - 'Check the file under DAY_SEGMENTS in config.yaml' - raise ValueError(message) - if 'label' not in day_segments.columns: - message = 'Table of frequency segmentation info must provide segment label. ' \ - 'Check the file under DAY_SEGMENTS in config.yaml' + message = 'The FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] can only have 1 row.' \ + 'Modify {}'.format(day_segments_file) raise ValueError(message) if not pd.api.types.is_integer_dtype(day_segments.dtypes['length']): - message = 'Only integer segment length is allowed in the table of frequency segmentation; ' \ - 'found {}. Check the file under DAY_SEGMENTS in config.yaml'.format(day_segments.dtypes['length']) + message = 'The column length in the FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must be integer but instead is ' \ + '{}. Modify {}'.format(day_segments.dtypes['length'], day_segments_file) raise ValueError(message) if day_segments.iloc[0].loc['length'] < 0: - message = 'Only positive integer segment length is allowed in the table of frequency segmentation; ' \ - 'found {}. Check the file under DAY_SEGMENTS in config.yaml'.format(day_segments.iloc[0].loc['length']) + message = 'The value in column length in the FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must be positive but instead is ' \ + '{}. Modify {}'.format(day_segments.iloc[0].loc['length'], day_segments_file) raise ValueError(message) if day_segments.iloc[0].loc['length'] >= 1440: - message = 'Segment length in the table of frequency segmentation should be shorter than a day (in minutes); ' \ - 'found {}. Check the file under DAY_SEGMENTS in config.yaml'.format(day_segments.iloc[0].loc['length']) + message = 'The column length in the FREQUENCY_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must be shorter than a day in minutes (1440) but instead is ' \ + '{}. Modify {}'.format(day_segments.iloc[0].loc['length'], day_segments_file) raise ValueError(message) return True -def is_valid_interval_segments(day_segments): +def is_valid_interval_segments(day_segments, day_segments_file): + day_segments = day_segments.copy(deep=True) + + valid_columns = ["label", "start_time", "length"] + if len(list(set(day_segments.columns) - set(valid_columns))) > 0: + error_message = 'The INTERVAL_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] must have three columns: label, start_time and length ' \ + 'but instead we found {}. Modify {}'.format(list(day_segments.columns), day_segments_file) + raise ValueError(error_message) + + try: + day_segments["start_time"] = pd.to_datetime(day_segments["start_time"]) + except ValueError as err: + raise ValueError("At least one start_time in the INTERVAL_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] has an invalid format, it should be HH:MM in 24hr clock({}). Modify {}".format(err, day_segments_file)) + + if(day_segments.shape[0] != day_segments.drop_duplicates().shape[0]): + error_message = 'The INTERVAL_EVERY_DAY day segments file in [DAY_SEGMENTS][FILE] has two or more rows that are identical. ' \ + 'Modify {}'.format(day_segments_file) + raise ValueError(error_message) + + # TODO Validate string format for lubridate + return True -def is_valid_event_segments(day_segments): - return False +def is_valid_event_segments(day_segments, day_segments_file): + day_segments = day_segments.copy(deep=True) + + valid_columns = ["label", "start_date_time", "length", "shift", "shift_direction"] + if len(list(set(day_segments.columns) - set(valid_columns))) > 0: + error_message = 'The INTERVAL_FLEXIBLE_DAY day segments file in [DAY_SEGMENTS][FILE] must have five columns: label, start_date_time, length, shift and shift_direction ' \ + 'but instead we found {}. Modify {}'.format(list(day_segments.columns), day_segments_file) + raise ValueError(error_message) + + try: + day_segments["start_date_time"] = pd.to_datetime(day_segments["start_date_time"], format='%Y-%m-%d %H:%M:%S', errors='raise') + except ValueError as err: + raise ValueError("At least one start_date_time has an invalid format, it should be YYYY-MM-DD HH:MM:SS in 24hr clock({}). Modify {}".format(err, day_segments_file)) + + valid_shift_direction_values = [1, -1, 0] + provided_values = day_segments["shift_direction"].unique() + if len(list(set(provided_values) - set(valid_shift_direction_values))) > 0: + error_message = 'The values of shift_direction column in the INTERVAL_FLEXIBLE_DAY day segments file in [DAY_SEGMENTS][FILE] can only be 1, -1 or 0 ' \ + 'but instead we found {}. Modify {}'.format(provided_values, day_segments_file) + raise ValueError(error_message) + + if(day_segments.shape[0] != day_segments.drop_duplicates().shape[0]): + error_message = 'The INTERVAL_FLEXIBLE_DAY day segments file in [DAY_SEGMENTS][FILE] has two or more rows that are identical. ' \ + 'Modify {}'.format(day_segments_file) + raise ValueError(error_message) + + # TODO Validate string format for lubridate of length and shift + return True def parse_frequency_segments(day_segments: pd.DataFrame) -> pd.DataFrame: @@ -70,26 +105,22 @@ def parse_frequency_segments(day_segments: pd.DataFrame) -> pd.DataFrame: slots = ['{:02d}:{:02d}'.format(x.hour, x.minute) for x in slots] table = pd.DataFrame(slots, columns=['start_time']) - table['end_time'] = table['start_time'].shift(-1) + table['length'] = day_segments.iloc[0].loc['length'] table = table.iloc[:-1, :] label = day_segments.loc[0, 'label'] table['label'] = range(0, table.shape[0]) - table['label'] = table['label'].apply(lambda x: '{}_{:04}'.format(label, x)) + table['label'] = table['label'].apply(lambda x: '{}{:04}'.format(label, x)) - table['local_date'] = None - - return table[['local_date', 'start_time', 'end_time', 'label']] + return table[['start_time', 'length', 'label']] def parse_interval_segments(day_segments): - day_segments["local_date"] = 1 - day_segments = day_segments.rename(columns={"start": "start_time", "end":"end_time"}) return day_segments def parse_event_segments(day_segments): return day_segments -def parse_day_segments(day_segments_file): +def parse_day_segments(day_segments_file, segments_type): # Add code to validate and parse frequencies, intervals, and events # Expected formats: # Frequency: label, length columns (e.g. my_prefix, 5) length has to be in minutes (int) @@ -98,15 +129,27 @@ def parse_day_segments(day_segments_file): # Our output should have local_date, start_time, end_time, label. In the readable_datetime script, If local_date has the same value for all rows, every segment will be applied for all days, otherwise each segment will be applied only to its local_date day_segments = pd.read_csv(day_segments_file) - if(is_valid_frequency_segments(day_segments)): + if day_segments is None: + message = 'The day segments file in [DAY_SEGMENTS][FILE] is None. Modify {}'.format(local_date) + raise ValueError(message) + + if day_segments.shape[0] == 0: + message = 'The day segments file in [DAY_SEGMENTS][FILE] is empty. Modify {}'.format(local_date) + raise ValueError(message) + + if(segments_type not in ["FREQUENCY_EVERY_DAY", "INTERVAL_EVERY_DAY", "INTERVAL_FLEXIBLE_DAY"]): + raise ValueError("[DAY_SEGMENTS][TYPE] can only be FREQUENCY_EVERY_DAY, INTERVAL_EVERY_DAY, or INTERVAL_FLEXIBLE_DAY") + + if(segments_type == "FREQUENCY_EVERY_DAY" and is_valid_frequency_segments(day_segments, day_segments_file)): day_segments = parse_frequency_segments(day_segments) - elif(is_valid_interval_segments(day_segments)): + elif(segments_type == "INTERVAL_EVERY_DAY" and is_valid_interval_segments(day_segments, day_segments_file)): day_segments = parse_interval_segments(day_segments) - elif(is_valid_event_segments(day_segments)): + elif(segments_type == "INTERVAL_FLEXIBLE_DAY" and is_valid_event_segments(day_segments, day_segments_file)): day_segments = parse_event_segments(day_segments) else: raise ValueError("{} does not have a format compatible with frequency, interval or event day segments. Please refer to [LINK]".format(day_segments_file)) return day_segments -day_segments = parse_day_segments(snakemake.input[0]) -day_segments.to_csv(snakemake.output["segments_file"], index=False) \ No newline at end of file +final_day_segments = parse_day_segments(snakemake.input[0], snakemake.params["day_segments_type"]) +final_day_segments.to_csv(snakemake.output["segments_file"], index=False) +pd.DataFrame({"label" : final_day_segments["label"].unique()}).to_csv(snakemake.output["segments_labels_file"], index=False) \ No newline at end of file diff --git a/src/data/readable_datetime.R b/src/data/readable_datetime.R index dfc019b9..b220df74 100644 --- a/src/data/readable_datetime.R +++ b/src/data/readable_datetime.R @@ -5,37 +5,92 @@ library("readr") library("lubridate") input <- read.csv(snakemake@input[["sensor_input"]]) %>% arrange(timestamp) -day_segments <- read.csv(snakemake@input[["day_segments"]]) %>% filter(label != "daily") #daily is done by default by all scripts +day_segments <- read.csv(snakemake@input[["day_segments"]]) +day_segments_type <- snakemake@params[["day_segments_type"]] sensor_output <- snakemake@output[[1]] timezone_periods <- snakemake@params[["timezone_periods"]] fixed_timezone <- snakemake@params[["fixed_timezone"]] -assign_to_day_segment <- function(data, day_segments){ - data <- data %>% mutate(local_day_segment = NA) +assign_to_day_segment <- function(data, day_segments, day_segments_type, fixed_timezone){ - # All segments belong to the same date, so we assume all days have the same segments - if(length(unique(day_segments$local_date)) == 1){ - data <- data %>% mutate(local_time_obj = lubridate::hms(local_time)) - day_segments <- day_segments %>% mutate(start_time = lubridate::hm(start_time), - end_time = lubridate::hm(end_time)) + if(day_segments_type == "FREQUENCY_EVERY_DAY"){ + data <- data %>% mutate(local_date_time_obj = lubridate::parse_date_time(local_time, orders = c("HMS", "HM"))) + day_segments <- day_segments %>% mutate(start_time = lubridate::parse_date_time(start_time, orders = c("HMS", "HM")), + end_time = start_time + minutes(length)) + + # Create a new column for each day_segment for(row_id in 1:nrow(day_segments)){ row = day_segments[row_id,] - data <- data %>% mutate(local_day_segment = ifelse(local_time_obj >= row$start_time & local_time_obj <= row$end_time, row$label, local_day_segment)) + data <- data %>% mutate(!!paste("local_day_segment", row_id, sep = "_") := ifelse(local_date_time_obj >= row$start_time & local_date_time_obj < row$end_time, + paste0("[", + row$label, "_", + local_date, "_", + paste(str_pad(hour(row$start_time),2, pad="0"), str_pad(minute(row$start_time),2, pad="0"), str_pad(second(row$start_time),2, pad="0"),sep =":"), + "]"), NA)) } - data <- data %>% select(-local_time_obj) - # Segments belong to different dates, so each day can have different segments - }else{ - data <- data %>% mutate(local_date_time_obj = lubridate::ymd_hms(local_date_time)) - day_segments <- day_segments %>% mutate(start_local_date_time_obj = lubridate::ymd_hm(paste(local_date, start_time)), - end_local_date_time_obj = lubridate::ymd_hm(paste(local_date, end_time)), + + } else if (day_segments_type == "INTERVAL_EVERY_DAY"){ + + data_dates <- data %>% select(local_date) %>% distinct(local_date) + inferred_day_segments <- crossing(day_segments, data_dates) %>% + mutate(start_local_date_time_obj = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = fixed_timezone), + end_local_date_time_obj = start_local_date_time_obj + lubridate::period(length), + date_time_interval = lubridate::interval(start_local_date_time_obj, end_local_date_time_obj)) %>% + group_by(label, local_date) %>% + mutate(group_start_datetime = lubridate::parse_date_time(paste(local_date, start_time), orders = c("Ymd HMS", "Ymd HM"), tz = fixed_timezone), + group_end_datetime = group_start_datetime + lubridate::period(length), + group_start_datetime = min(group_start_datetime), + group_end_datetime = max(group_end_datetime)) %>% + ungroup() + + + data <- data %>% mutate(local_date_time_obj = lubridate::ymd_hms(local_date_time, tz = fixed_timezone)) + + # Create a new column for each day_segment + for(row_id in 1:nrow(inferred_day_segments)){ + row = inferred_day_segments[row_id,] + data <- data %>% mutate(!!paste("local_day_segment", row_id, sep = "_") := ifelse(local_date_time_obj %within% row$date_time_interval, + paste0("[", + paste(sep= "#", + row$label, + lubridate::date(row$group_start_datetime), + paste(str_pad(hour(row$group_start_datetime),2, pad="0"), str_pad(minute(row$group_start_datetime),2, pad="0"), str_pad(second(row$group_start_datetime),2, pad="0"),sep =":"), + lubridate::date(row$group_end_datetime), + paste(str_pad(hour(row$group_end_datetime),2, pad="0"), str_pad(minute(row$group_end_datetime),2, pad="0"), str_pad(second(row$group_end_datetime),2, pad="0"),sep =":") + ), + "]"), NA)) + } + + + } else if ( day_segments_type == "INTERVAL_FLEXIBLE_DAY"){ + data <- data %>% mutate(local_date_time_obj = lubridate::ymd_hms(local_date_time, tz = fixed_timezone)) + day_segments <- day_segments %>% mutate(shift = ifelse(shift == "0", "0seconds", shift), + start_local_date_time_obj = lubridate::ymd_hms(start_date_time, tz = fixed_timezone) + (lubridate::period(shift) * ifelse(shift_direction >= 0, 1, -1)), + end_local_date_time_obj = start_local_date_time_obj + lubridate::period(length), date_time_interval = lubridate::interval(start_local_date_time_obj, end_local_date_time_obj)) + + # Create a new column for each day_segment for(row_id in 1:nrow(day_segments)){ row = day_segments[row_id,] - data <- data %>% mutate(local_day_segment = ifelse(local_date_time_obj %within% row$date_time_interval, row$label, local_day_segment)) + print(row$length) + data <- data %>% mutate(!!paste("local_day_segment", row_id, sep = "_") := ifelse(local_date_time_obj %within% row$date_time_interval, + paste0("[", + paste(sep= "#", + row$label, + lubridate::date(row$start_local_date_time_obj), + paste(str_pad(hour(row$start_local_date_time_obj),2, pad="0"), str_pad(minute(row$start_local_date_time_obj),2, pad="0"), str_pad(second(row$start_local_date_time_obj),2, pad="0"),sep =":"), + lubridate::date(row$end_local_date_time_obj), + paste(str_pad(hour(row$end_local_date_time_obj),2, pad="0"), str_pad(minute(row$end_local_date_time_obj),2, pad="0"), str_pad(second(row$end_local_date_time_obj),2, pad="0"),sep =":") + ), + "]"), NA)) } - data <- data %>% select(-local_date_time_obj) } + # Join all day_segments in a single column + data <- data %>% + unite("assigned_segments", starts_with("local_day_segment"), sep = "|", na.rm = TRUE) %>% + select(-local_date_time_obj) + return(data) } @@ -45,28 +100,31 @@ split_local_date_time <- function(data, day_segments){ separate(local_time, c("local_hour", "local_minute"), ":", remove = FALSE, extra = "drop") %>% mutate(local_hour = as.numeric(local_hour), local_minute = as.numeric(local_minute)) - - split_data <- assign_to_day_segment(split_data, day_segments) + return(split_data) } if(!is.null(timezone_periods)){ - timezones <- read_csv(timezone_periods) - tz_starts <- timezones$start - output <- input %>% - mutate(timezone = findInterval(timestamp / 1000, tz_starts), # Set an interval ID based on timezones' start column - timezone = ifelse(timezone == 0, 1, timezone), # Correct the first timezone ID - timezone = recode(timezone, !!! timezones$timezone), # Swap IDs for text labels - timezone = as.character(timezone)) %>% - rowwise() %>% - mutate(utc_date_time = as.POSIXct(timestamp/1000, origin="1970-01-01", tz="UTC"), - local_date_time = format(utc_date_time, tz = timezone, usetz = T)) - output <- split_local_date_time(output, day_segments) - write.csv(output, sensor_output) + # TODO: Not active yet + # timezones <- read_csv(timezone_periods) + # tz_starts <- timezones$start + # output <- input %>% + # mutate(timezone = findInterval(timestamp / 1000, tz_starts), # Set an interval ID based on timezones' start column + # timezone = ifelse(timezone == 0, 1, timezone), # Correct the first timezone ID + # timezone = recode(timezone, !!! timezones$timezone), # Swap IDs for text labels + # timezone = as.character(timezone)) %>% + # rowwise() %>% + # mutate(utc_date_time = as.POSIXct(timestamp/1000, origin="1970-01-01", tz="UTC"), + # local_date_time = format(utc_date_time, tz = timezone, usetz = T, "%Y-%m-%d %H:%M:%S")) + # output <- split_local_date_time(output, day_segments) + # TODO: Implement day segment assigment with support for multiple timezones + # output <- assign_to_day_segment(output, day_segments, day_segments_type, fixed_timezone) + # write.csv(output, sensor_output) } else if(!is.null(fixed_timezone)){ - output <- input %>% - mutate(utc_date_time = as.POSIXct(timestamp/1000, origin="1970-01-01", tz="UTC"), - local_date_time = format(utc_date_time, tz = fixed_timezone, usetz = F)) - output <- split_local_date_time(output, day_segments) - write_csv(output, sensor_output) + output <- input %>% + mutate(utc_date_time = as.POSIXct(timestamp/1000, origin="1970-01-01", tz="UTC"), + local_date_time = format(utc_date_time, tz = fixed_timezone, usetz = F, "%Y-%m-%d %H:%M:%S")) + output <- split_local_date_time(output, day_segments) + output <- assign_to_day_segment(output, day_segments, day_segments_type, fixed_timezone) + write_csv(output, sensor_output) } diff --git a/src/data/unify_utils.R b/src/data/unify_utils.R index f373c55f..768dc2b1 100644 --- a/src/data/unify_utils.R +++ b/src/data/unify_utils.R @@ -54,7 +54,7 @@ unify_ios_calls <- function(ios_calls){ local_time = first(local_time), local_hour = first(local_hour), local_minute = first(local_minute), - local_day_segment = first(local_day_segment)) + assigned_segments = first(assigned_segments)) } else { ios_calls <- ios_calls %>% summarise(call_type_sequence = paste(call_type, collapse = ","), call_duration = sum(call_duration), timestamp = first(timestamp)) diff --git a/src/features/call/call_base.R b/src/features/call/call_base.R index b5a01025..a641b6ee 100644 --- a/src/features/call/call_base.R +++ b/src/features/call/call_base.R @@ -1,13 +1,5 @@ library('tidyr') - -filter_by_day_segment <- function(data, day_segment) { - if(day_segment %in% c("morning", "afternoon", "evening", "night")) - data <- data %>% filter(local_day_segment == day_segment) - else if(day_segment == "daily") - return(data) - else - return(data %>% head(0)) -} +library('stringr') Mode <- function(v) { uniqv <- unique(v) @@ -16,7 +8,7 @@ Mode <- function(v) { base_call_features <- function(calls, call_type, day_segment, requested_features){ # Output dataframe - features = data.frame(local_date = character(), stringsAsFactors = FALSE) + features = data.frame(local_segment = character(), stringsAsFactors = FALSE) # The name of the features this function can compute base_features_names <- c("count", "distinctcontacts", "meanduration", "sumduration", "minduration", "maxduration", "stdduration", "modeduration", "entropyduration", "timefirstcall", "timelastcall", "countmostfrequentcontact") @@ -28,13 +20,21 @@ base_call_features <- function(calls, call_type, day_segment, requested_features call_type_label = ifelse(call_type == "incoming", "1", ifelse(call_type == "outgoing", "2", ifelse(call_type == "missed", "3", NA))) if(is.na(call_type_label)) stop(paste("Call type can online be incoming, outgoing or missed but instead you typed: ", call_type)) - calls <- calls %>% filter(call_type == call_type_label) %>% filter_by_day_segment(day_segment) + + # Filter the rows that belong to day_segment, and put the segment full name in a new column for grouping + date_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2}" + hour_regex = "[0-9]{2}:[0-9]{2}:[0-9]{2}" + calls <- calls %>% + filter(call_type == call_type_label) %>% + filter(grepl(paste0("\\[", day_segment, "#"),assigned_segments)) %>% + mutate(local_segment = str_extract(assigned_segments, paste0("\\[", day_segment, "#", date_regex, "#", hour_regex, "#", date_regex, "#", hour_regex, "\\]")), + local_segment = str_sub(local_segment, 2, -2)) # get rid of first and last character([]) # If there are not features or data to work with, return an empty df with appropiate columns names if(length(features_to_compute) == 0) return(features) if(nrow(calls) < 1) - return(cbind(features, read.csv(text = paste(paste("call", call_type, day_segment, features_to_compute, sep = "_"), collapse = ","), stringsAsFactors = FALSE))) + return(cbind(features, read.csv(text = paste(paste("call", call_type, features_to_compute, sep = "_"), collapse = ","), stringsAsFactors = FALSE))) for(feature_name in features_to_compute){ if(feature_name == "countmostfrequentcontact"){ @@ -48,28 +48,28 @@ base_call_features <- function(calls, call_type, day_segment, requested_features pull(trace) feature <- calls %>% filter(trace == mostfrequentcontact) %>% - group_by(local_date) %>% - summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := n()) %>% + group_by(local_segment) %>% + summarise(!!paste("call", call_type, feature_name, sep = "_") := n()) %>% replace(is.na(.), 0) - features <- merge(features, feature, by="local_date", all = TRUE) + features <- merge(features, feature, by="local_segment", all = TRUE) } else { feature <- calls %>% - group_by(local_date) + group_by(local_segment) feature <- switch(feature_name, - "count" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := n()), - "distinctcontacts" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := n_distinct(trace)), - "meanduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := mean(call_duration)), - "sumduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := sum(call_duration)), - "minduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := min(call_duration)), - "maxduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := max(call_duration)), - "stdduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := sd(call_duration)), - "modeduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := Mode(call_duration)), - "entropyduration" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := entropy.MillerMadow(call_duration)), - "timefirstcall" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := first(local_hour) * 60 + first(local_minute)), - "timelastcall" = feature %>% summarise(!!paste("call", call_type, day_segment, feature_name, sep = "_") := last(local_hour) * 60 + last(local_minute))) + "count" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := n()), + "distinctcontacts" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := n_distinct(trace)), + "meanduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := mean(call_duration)), + "sumduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := sum(call_duration)), + "minduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := min(call_duration)), + "maxduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := max(call_duration)), + "stdduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := sd(call_duration)), + "modeduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := Mode(call_duration)), + "entropyduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := entropy.MillerMadow(call_duration)), + "timefirstcall" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := first(local_hour) * 60 + first(local_minute)), + "timelastcall" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := last(local_hour) * 60 + last(local_minute))) - features <- merge(features, feature, by="local_date", all = TRUE) + features <- merge(features, feature, by="local_segment", all = TRUE) } } features <- features %>% mutate_at(vars(contains("countmostfrequentcontact")), list( ~ replace_na(., 0))) diff --git a/src/features/call_features.R b/src/features/call_features.R index 324dc24c..b2e761dc 100644 --- a/src/features/call_features.R +++ b/src/features/call_features.R @@ -1,18 +1,23 @@ source("renv/activate.R") source("src/features/call/call_base.R") library(dplyr) -library(entropy) calls <- read.csv(snakemake@input[[1]], stringsAsFactors = FALSE) -day_segment <- snakemake@params[["day_segment"]] +day_segments_labels <- read.csv(snakemake@input[["day_segments_labels"]]) requested_features <- snakemake@params[["features"]] call_type <- snakemake@params[["call_type"]] -features = data.frame(local_date = character(), stringsAsFactors = FALSE) +features = data.frame(local_segment = character(), stringsAsFactors = FALSE) -# Compute base Call features -features <- merge(features, base_call_features(calls, call_type, day_segment, requested_features), by="local_date", all = TRUE) +day_segments <- day_segments_labels %>% pull(label) +for (day_segment in day_segments) + features <- merge(features, base_call_features(calls, call_type, day_segment, requested_features), all = TRUE) if(ncol(features) != length(requested_features) + 1) stop(paste0("The number of features in the output dataframe (=", ncol(features),") does not match the expected value (=", length(requested_features)," + 1). Verify your Call feature extraction functions")) +features <- features %>% separate(col = local_segment, + into = c("segment", "local_start_date", "local_start_time", "local_end_date", "local_end_time"), + sep = "#", + remove = FALSE) + write.csv(features, snakemake@output[[1]], row.names = FALSE)