From 6b1e006b82e16b2d8bfc2eaccfd1ecc6b5f4727c Mon Sep 17 00:00:00 2001 From: JulioV Date: Wed, 7 Oct 2020 11:51:31 -0400 Subject: [PATCH] Refactor fused location resampling --- Snakefile | 4 +- rules/common.smk | 10 ++++ rules/features.smk | 4 +- rules/preprocessing.smk | 40 ++++++++++++---- src/data/phone_sensed_timestamps.R | 18 +++++++ src/data/process_location_types.R | 77 ++++++++++++------------------ 6 files changed, 95 insertions(+), 58 deletions(-) create mode 100644 src/data/phone_sensed_timestamps.R diff --git a/Snakefile b/Snakefile index f28a86e0..dd2a70be 100644 --- a/Snakefile +++ b/Snakefile @@ -25,6 +25,7 @@ if config["PHONE_VALID_SENSED_BINS"]["COMPUTE"] or config["PHONE_VALID_SENSED_DA files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=pids, sensor=table)) files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=pids, sensor=table)) files_to_compute.extend(expand("data/interim/{pid}/phone_sensed_bins.csv", pid=config["PIDS"])) + files_to_compute.extend(expand("data/interim/{pid}/phone_sensed_timestamps.csv", pid=config["PIDS"])) if config["PHONE_VALID_SENSED_DAYS"]["COMPUTE"]: files_to_compute.extend(expand("data/interim/{pid}/phone_valid_sensed_days_{min_valid_hours_per_day}hours_{min_valid_bins_per_hour}bins.csv", @@ -163,7 +164,8 @@ for provider in config["LOCATIONS"]["PROVIDERS"].keys(): files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["LOCATIONS"]["DB_TABLE"])) files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["LOCATIONS"]["DB_TABLE"])) - files_to_compute.extend(expand("data/raw/{pid}/{sensor}_processed_{locations_to_use}.csv", pid=config["PIDS"], sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"])) + files_to_compute.extend(expand("data/interim/{pid}/{sensor}_processed_{locations_to_use}.csv", pid=config["PIDS"], sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"])) + files_to_compute.extend(expand("data/interim/{pid}/{sensor}_processed_{locations_to_use}_with_datetime.csv", pid=config["PIDS"], sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"])) files_to_compute.extend(expand("data/interim/{pid}/{sensor_key}_features/{sensor_key}_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["LOCATIONS"]["PROVIDERS"][provider]["SRC_LANGUAGE"], provider_key=provider, sensor_key="LOCATIONS".lower())) files_to_compute.extend(expand("data/processed/features/{pid}/{sensor_key}.csv", pid=config["PIDS"], sensor_key="LOCATIONS".lower())) diff --git a/rules/common.smk b/rules/common.smk index 1ed7b0ae..5f16807e 100644 --- a/rules/common.smk +++ b/rules/common.smk @@ -26,6 +26,16 @@ def optional_phone_sensed_bins_input(wildcards): return expand("data/raw/{{pid}}/{table}_with_datetime.csv", table = tables_platform) +def optional_phone_sensed_timestamps_input(wildcards): + platform = infer_participant_platform("data/external/"+wildcards.pid) + + if platform == "android": + tables_platform = [table for table in config["PHONE_VALID_SENSED_BINS"]["DB_TABLES"] if table not in [config["CONVERSATION"]["DB_TABLE"]["IOS"], config["ACTIVITY_RECOGNITION"]["DB_TABLE"]["IOS"]]] # for android, discard any ios tables that may exist + elif platform == "ios": + tables_platform = [table for table in config["PHONE_VALID_SENSED_BINS"]["DB_TABLES"] if table not in [config["CONVERSATION"]["DB_TABLE"]["ANDROID"], config["ACTIVITY_RECOGNITION"]["DB_TABLE"]["ANDROID"]]] # for ios, discard any android tables that may exist + + return expand("data/raw/{{pid}}/{table}_raw.csv", table = tables_platform) + # Features.smk ######################################################################################################### def find_features_files(wildcards): feature_files = [] diff --git a/rules/features.smk b/rules/features.smk index d3323260..94225cad 100644 --- a/rules/features.smk +++ b/rules/features.smk @@ -111,7 +111,7 @@ rule ios_activity_recognition_deltas: rule locations_python_features: input: - sensor_data = expand("data/raw/{{pid}}/{sensor}_processed_{locations_to_use}.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]), + sensor_data = expand("data/interim/{{pid}}/{sensor}_processed_{locations_to_use}_with_datetime.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]), day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv" params: provider = lambda wildcards: config["LOCATIONS"]["PROVIDERS"][wildcards.provider_key], @@ -123,7 +123,7 @@ rule locations_python_features: rule locations_r_features: input: - sensor_data = expand("data/raw/{{pid}}/{sensor}_processed_{locations_to_use}.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]), + sensor_data = expand("data/interim/{{pid}}/{sensor}_processed_{locations_to_use}_with_datetime.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]), day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv" params: provider = lambda wildcards: config["LOCATIONS"]["PROVIDERS"][wildcards.provider_key], diff --git a/rules/preprocessing.smk b/rules/preprocessing.smk index 4a94f5ae..8e4581c0 100644 --- a/rules/preprocessing.smk +++ b/rules/preprocessing.smk @@ -70,7 +70,7 @@ rule readable_datetime: day_segments_type = config["DAY_SEGMENTS"]["TYPE"], include_past_periodic_segments = config["DAY_SEGMENTS"]["INCLUDE_PAST_PERIODIC_SEGMENTS"] wildcard_constraints: - sensor = '.*(' + '|'.join([re.escape(x) for x in PHONE_SENSORS]) + ').*' # only process smartphone sensors, not fitbit + sensor = '(' + '|'.join([re.escape(x) for x in PHONE_SENSORS]) + ')' # only process smartphone sensors, not fitbit output: "data/raw/{pid}/{sensor}_with_datetime.csv" script: @@ -86,6 +86,14 @@ rule phone_sensed_bins: script: "../src/data/phone_sensed_bins.R" +rule phone_sensed_timestamps: + input: + all_sensors = optional_phone_sensed_timestamps_input + output: + "data/interim/{pid}/phone_sensed_timestamps.csv" + script: + "../src/data/phone_sensed_timestamps.R" + rule phone_valid_sensed_days: input: phone_sensed_bins = "data/interim/{pid}/phone_sensed_bins.csv" @@ -112,21 +120,35 @@ rule unify_ios_android: rule process_location_types: input: - locations = "data/raw/{pid}/{sensor}_with_datetime.csv", - phone_sensed_bins = rules.phone_sensed_bins.output, - day_segments = "data/interim/day_segments/{pid}_day_segments.csv" + locations = "data/raw/{pid}/{sensor}_raw.csv", + phone_sensed_timestamps = "data/interim/{pid}/phone_sensed_timestamps.csv", params: - bin_size = config["PHONE_VALID_SENSED_BINS"]["BIN_SIZE"], - timezone = config["LOCATIONS"]["TIMEZONE"], consecutive_threshold = config["LOCATIONS"]["FUSED_RESAMPLED_CONSECUTIVE_THRESHOLD"], time_since_valid_location = config["LOCATIONS"]["FUSED_RESAMPLED_TIME_SINCE_VALID_LOCATION"], - day_segments_type = config["DAY_SEGMENTS"]["TYPE"], - locations_to_use = "{locations_to_used}" + locations_to_use = "{locations_to_use}" + wildcard_constraints: + locations_to_use = '(ALL|GPS|FUSED_RESAMPLED)' output: - "data/raw/{pid}/{sensor}_processed_{locations_to_used}.csv" + "data/interim/{pid}/{sensor}_processed_{locations_to_use}.csv" script: "../src/data/process_location_types.R" +rule readable_datetime_location_processed: + input: + sensor_input = expand("data/interim/{{pid}}/{sensor}_processed_{locations_to_use}.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]), + day_segments = "data/interim/day_segments/{pid}_day_segments.csv" + params: + timezones = None, + fixed_timezone = config["READABLE_DATETIME"]["FIXED_TIMEZONE"], + day_segments_type = config["DAY_SEGMENTS"]["TYPE"], + include_past_periodic_segments = config["DAY_SEGMENTS"]["INCLUDE_PAST_PERIODIC_SEGMENTS"] + wildcard_constraints: + locations_to_use = '(ALL|GPS|FUSED_RESAMPLED)' + output: + expand("data/interim/{{pid}}/{sensor}_processed_{locations_to_use}_with_datetime.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]) + script: + "../src/data/readable_datetime.R" + rule application_genres: input: "data/raw/{pid}/{sensor}_with_datetime.csv" diff --git a/src/data/phone_sensed_timestamps.R b/src/data/phone_sensed_timestamps.R new file mode 100644 index 00000000..7d40a2ae --- /dev/null +++ b/src/data/phone_sensed_timestamps.R @@ -0,0 +1,18 @@ +source("renv/activate.R") +library(dplyr) +library(readr) +library(tidyr) +library(purrr) + +all_sensors = snakemake@input[["all_sensors"]] + +sensor_timestamps <- tibble(files = all_sensors) %>% + mutate(timestamps = map(files,~ read_csv(.,col_types = cols_only(timestamp = col_double()))), + sensor = row_number(), + files = NULL) %>% + unnest(timestamps) %>% + mutate(timestamp = (timestamp %/% 1000) * 1000) %>% + distinct(timestamp, .keep_all = TRUE) %>% + arrange(timestamp) + +write.csv(sensor_timestamps, snakemake@output[[1]], row.names = FALSE) \ No newline at end of file diff --git a/src/data/process_location_types.R b/src/data/process_location_types.R index 8f425f70..a692d2cd 100644 --- a/src/data/process_location_types.R +++ b/src/data/process_location_types.R @@ -3,21 +3,15 @@ library(dplyr) library(readr) library(tidyr) -source("src/data/assign_to_day_segment.R") - -bin_size <- snakemake@params[["bin_size"]] -timezone <- snakemake@params[["timezone"]] consecutive_threshold <- snakemake@params[["consecutive_threshold"]] time_since_valid_location <- snakemake@params[["time_since_valid_location"]] -location_to_used <- snakemake@params[["time_since_valocation_to_usedlid_location"]] -day_segments <- read.csv(snakemake@input[["day_segments"]]) -day_segments_type <- snakemake@params[["day_segments_type"]] +locations_to_use <- snakemake@params[["locations_to_use"]] -phone_sensed_bins <- read_csv(snakemake@input[["phone_sensed_bins"]], col_types = cols(local_date = col_character())) -locations <- read_csv(snakemake@input[["locations"]], col_types = cols()) %>% filter(provider == "fused") %>% - filter(double_latitude != 0 & double_longitude != 0) +phone_sensed_timestamps <- read_csv(snakemake@input[["phone_sensed_timestamps"]], col_types = cols_only(timestamp = col_double())) +locations <- read.csv(snakemake@input[["locations"]]) %>% + filter(double_latitude != 0 & double_longitude != 0) %>% + drop_na(double_longitude, double_latitude) -locations_to_use <- snakemake@params["locations_to_use"] if(!locations_to_use %in% c("ALL", "FUSED_RESAMPLED", "GPS")){ print("Unkown location filter, provide one of the following three: ALL, GPS, or FUSED_RESAMPLED") quit(save = "no", status = 1, runLast = FALSE) @@ -31,44 +25,35 @@ if(locations_to_use == "ALL"){ } else if(locations_to_use == "FUSED_RESAMPLED"){ locations <- locations %>% filter(provider == "fused") if(nrow(locations) > 0){ - sensed_minute_bins <- phone_sensed_bins %>% - pivot_longer(-local_date, names_to = c("hour", "bin"), names_sep = "_", values_to = "sensor_count") %>% - mutate(hour = as.integer(hour), bin = as.integer(bin)) %>% - complete(nesting(local_date, hour), bin = seq(0, 59,1)) %>% - fill(sensor_count) %>% - mutate(timestamp = as.numeric(as.POSIXct(paste0(local_date, " ", hour,":", bin,":00"), format = "%Y-%m-%d %H:%M:%S", tz = timezone)) * 1000 ) %>% - filter(sensor_count > 0) %>% - select(timestamp) - - resampled_locations <- locations %>% - select(-assigned_segments) %>% - bind_rows(sensed_minute_bins) %>% - mutate(provider = replace_na(provider, "resampled")) %>% - arrange(timestamp) %>% + processed_locations <- locations %>% + # TODO filter repeated location rows based on the accurcy + distinct(timestamp, .keep_all = TRUE) %>% + bind_rows(phone_sensed_timestamps) %>% + arrange(timestamp) %>% # We group and therefore, fill in, missing rows that appear after a valid fused location record and exist # within consecutive_threshold minutes from each other mutate(consecutive_time_diff = c(1, diff(timestamp)), - resample_group = cumsum(!is.na(double_longitude) | consecutive_time_diff > (1000 * 60 * consecutive_threshold))) %>% - group_by(resample_group) %>% - # drop rows that are logged after time_since_valid_location minutes from the last valid fused location - filter((timestamp - first(timestamp) < (1000 * 60 * time_since_valid_location))) %>% - fill(-timestamp, -resample_group) %>% - select(-consecutive_time_diff) %>% - drop_na(double_longitude, double_latitude, accuracy) %>% - # Add local date_time - mutate(utc_date_time = as.POSIXct(timestamp/1000, origin="1970-01-01", tz="UTC"), - local_date_time = format(utc_date_time, tz = timezone, usetz = F)) %>% - separate(local_date_time, c("local_date","local_time"), "\\s", remove = FALSE) %>% - separate(local_time, c("local_hour", "local_minute"), ":", remove = FALSE, extra = "drop") %>% - mutate(local_hour = as.numeric(local_hour), - local_minute = as.numeric(local_minute)) %>% - # Delete resampled rows that exist in the same minute as other original (fused) rows - group_by(local_date, local_hour, local_minute) %>% - mutate(n = n()) %>% - filter(n == 1 | (n > 1 & provider == "fused")) %>% - select(-n) %>% - ungroup() - processed_locations <- assign_to_day_segment(resampled_locations, day_segments, day_segments_type) + resample_group = cumsum(!is.na(double_longitude) | consecutive_time_diff > (1000 * 60 * consecutive_threshold))) %>% + group_by(resample_group) %>% + # Filter those rows that are further away than time_since_valid_location since the last fused location + mutate(time_from_fused = timestamp - first(timestamp)) %>% + filter(provider == "fused" | (time_from_fused < (1000 * 60 * time_since_valid_location))) %>% + # Summarise the period to resample for + summarise(limit = max(timestamp), timestamp = first(timestamp), double_latitude = first(double_latitude), double_longitude = first(double_longitude), + double_bearing=first(double_bearing), double_speed = first(double_speed), double_altitude=first(double_altitude), provider=first(provider), + accuracy=first(accuracy), label=first(label)) %>% + # the limit will be equal to the next timestamp-1 or the last binded timestamp (limit) plus the consecutive_threshold buffer + # you can think of consecutive_threshold as the period a location row is valid for + mutate(limit = pmin(lead(timestamp, default = 9999999999999) - 1, limit + (1000 * 60 * consecutive_threshold)), + n_resample = (limit - timestamp)%/%60001, + n_resample = if_else(n_resample == 0, 1, n_resample)) %>% + drop_na(double_longitude, double_latitude) %>% + uncount(weights = n_resample, .id = "id") %>% + mutate(provider = if_else(id > 1, "resampled", provider), + id = id -1, + timestamp = timestamp + (id * 60000)) %>% + ungroup() %>% + select(-resample_group, -limit, -id) } else { processed_locations <- locations }