Refactor fused location resampling

pull/103/head
JulioV 2020-10-07 11:51:31 -04:00
parent c0c32d9f9e
commit 6b1e006b82
6 changed files with 95 additions and 58 deletions

View File

@ -25,6 +25,7 @@ if config["PHONE_VALID_SENSED_BINS"]["COMPUTE"] or config["PHONE_VALID_SENSED_DA
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=pids, sensor=table))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=pids, sensor=table))
files_to_compute.extend(expand("data/interim/{pid}/phone_sensed_bins.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/phone_sensed_timestamps.csv", pid=config["PIDS"]))
if config["PHONE_VALID_SENSED_DAYS"]["COMPUTE"]:
files_to_compute.extend(expand("data/interim/{pid}/phone_valid_sensed_days_{min_valid_hours_per_day}hours_{min_valid_bins_per_hour}bins.csv",
@ -163,7 +164,8 @@ for provider in config["LOCATIONS"]["PROVIDERS"].keys():
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["LOCATIONS"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["LOCATIONS"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_processed_{locations_to_use}.csv", pid=config["PIDS"], sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]))
files_to_compute.extend(expand("data/interim/{pid}/{sensor}_processed_{locations_to_use}.csv", pid=config["PIDS"], sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]))
files_to_compute.extend(expand("data/interim/{pid}/{sensor}_processed_{locations_to_use}_with_datetime.csv", pid=config["PIDS"], sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]))
files_to_compute.extend(expand("data/interim/{pid}/{sensor_key}_features/{sensor_key}_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["LOCATIONS"]["PROVIDERS"][provider]["SRC_LANGUAGE"], provider_key=provider, sensor_key="LOCATIONS".lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/{sensor_key}.csv", pid=config["PIDS"], sensor_key="LOCATIONS".lower()))

View File

@ -26,6 +26,16 @@ def optional_phone_sensed_bins_input(wildcards):
return expand("data/raw/{{pid}}/{table}_with_datetime.csv", table = tables_platform)
def optional_phone_sensed_timestamps_input(wildcards):
platform = infer_participant_platform("data/external/"+wildcards.pid)
if platform == "android":
tables_platform = [table for table in config["PHONE_VALID_SENSED_BINS"]["DB_TABLES"] if table not in [config["CONVERSATION"]["DB_TABLE"]["IOS"], config["ACTIVITY_RECOGNITION"]["DB_TABLE"]["IOS"]]] # for android, discard any ios tables that may exist
elif platform == "ios":
tables_platform = [table for table in config["PHONE_VALID_SENSED_BINS"]["DB_TABLES"] if table not in [config["CONVERSATION"]["DB_TABLE"]["ANDROID"], config["ACTIVITY_RECOGNITION"]["DB_TABLE"]["ANDROID"]]] # for ios, discard any android tables that may exist
return expand("data/raw/{{pid}}/{table}_raw.csv", table = tables_platform)
# Features.smk #########################################################################################################
def find_features_files(wildcards):
feature_files = []

View File

@ -111,7 +111,7 @@ rule ios_activity_recognition_deltas:
rule locations_python_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_processed_{locations_to_use}.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]),
sensor_data = expand("data/interim/{{pid}}/{sensor}_processed_{locations_to_use}_with_datetime.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]),
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["LOCATIONS"]["PROVIDERS"][wildcards.provider_key],
@ -123,7 +123,7 @@ rule locations_python_features:
rule locations_r_features:
input:
sensor_data = expand("data/raw/{{pid}}/{sensor}_processed_{locations_to_use}.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]),
sensor_data = expand("data/interim/{{pid}}/{sensor}_processed_{locations_to_use}_with_datetime.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]),
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["LOCATIONS"]["PROVIDERS"][wildcards.provider_key],

View File

@ -70,7 +70,7 @@ rule readable_datetime:
day_segments_type = config["DAY_SEGMENTS"]["TYPE"],
include_past_periodic_segments = config["DAY_SEGMENTS"]["INCLUDE_PAST_PERIODIC_SEGMENTS"]
wildcard_constraints:
sensor = '.*(' + '|'.join([re.escape(x) for x in PHONE_SENSORS]) + ').*' # only process smartphone sensors, not fitbit
sensor = '(' + '|'.join([re.escape(x) for x in PHONE_SENSORS]) + ')' # only process smartphone sensors, not fitbit
output:
"data/raw/{pid}/{sensor}_with_datetime.csv"
script:
@ -86,6 +86,14 @@ rule phone_sensed_bins:
script:
"../src/data/phone_sensed_bins.R"
rule phone_sensed_timestamps:
input:
all_sensors = optional_phone_sensed_timestamps_input
output:
"data/interim/{pid}/phone_sensed_timestamps.csv"
script:
"../src/data/phone_sensed_timestamps.R"
rule phone_valid_sensed_days:
input:
phone_sensed_bins = "data/interim/{pid}/phone_sensed_bins.csv"
@ -112,21 +120,35 @@ rule unify_ios_android:
rule process_location_types:
input:
locations = "data/raw/{pid}/{sensor}_with_datetime.csv",
phone_sensed_bins = rules.phone_sensed_bins.output,
day_segments = "data/interim/day_segments/{pid}_day_segments.csv"
locations = "data/raw/{pid}/{sensor}_raw.csv",
phone_sensed_timestamps = "data/interim/{pid}/phone_sensed_timestamps.csv",
params:
bin_size = config["PHONE_VALID_SENSED_BINS"]["BIN_SIZE"],
timezone = config["LOCATIONS"]["TIMEZONE"],
consecutive_threshold = config["LOCATIONS"]["FUSED_RESAMPLED_CONSECUTIVE_THRESHOLD"],
time_since_valid_location = config["LOCATIONS"]["FUSED_RESAMPLED_TIME_SINCE_VALID_LOCATION"],
day_segments_type = config["DAY_SEGMENTS"]["TYPE"],
locations_to_use = "{locations_to_used}"
locations_to_use = "{locations_to_use}"
wildcard_constraints:
locations_to_use = '(ALL|GPS|FUSED_RESAMPLED)'
output:
"data/raw/{pid}/{sensor}_processed_{locations_to_used}.csv"
"data/interim/{pid}/{sensor}_processed_{locations_to_use}.csv"
script:
"../src/data/process_location_types.R"
rule readable_datetime_location_processed:
input:
sensor_input = expand("data/interim/{{pid}}/{sensor}_processed_{locations_to_use}.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]),
day_segments = "data/interim/day_segments/{pid}_day_segments.csv"
params:
timezones = None,
fixed_timezone = config["READABLE_DATETIME"]["FIXED_TIMEZONE"],
day_segments_type = config["DAY_SEGMENTS"]["TYPE"],
include_past_periodic_segments = config["DAY_SEGMENTS"]["INCLUDE_PAST_PERIODIC_SEGMENTS"]
wildcard_constraints:
locations_to_use = '(ALL|GPS|FUSED_RESAMPLED)'
output:
expand("data/interim/{{pid}}/{sensor}_processed_{locations_to_use}_with_datetime.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"])
script:
"../src/data/readable_datetime.R"
rule application_genres:
input:
"data/raw/{pid}/{sensor}_with_datetime.csv"

View File

@ -0,0 +1,18 @@
source("renv/activate.R")
library(dplyr)
library(readr)
library(tidyr)
library(purrr)
all_sensors = snakemake@input[["all_sensors"]]
sensor_timestamps <- tibble(files = all_sensors) %>%
mutate(timestamps = map(files,~ read_csv(.,col_types = cols_only(timestamp = col_double()))),
sensor = row_number(),
files = NULL) %>%
unnest(timestamps) %>%
mutate(timestamp = (timestamp %/% 1000) * 1000) %>%
distinct(timestamp, .keep_all = TRUE) %>%
arrange(timestamp)
write.csv(sensor_timestamps, snakemake@output[[1]], row.names = FALSE)

View File

@ -3,21 +3,15 @@ library(dplyr)
library(readr)
library(tidyr)
source("src/data/assign_to_day_segment.R")
bin_size <- snakemake@params[["bin_size"]]
timezone <- snakemake@params[["timezone"]]
consecutive_threshold <- snakemake@params[["consecutive_threshold"]]
time_since_valid_location <- snakemake@params[["time_since_valid_location"]]
location_to_used <- snakemake@params[["time_since_valocation_to_usedlid_location"]]
day_segments <- read.csv(snakemake@input[["day_segments"]])
day_segments_type <- snakemake@params[["day_segments_type"]]
locations_to_use <- snakemake@params[["locations_to_use"]]
phone_sensed_bins <- read_csv(snakemake@input[["phone_sensed_bins"]], col_types = cols(local_date = col_character()))
locations <- read_csv(snakemake@input[["locations"]], col_types = cols()) %>% filter(provider == "fused") %>%
filter(double_latitude != 0 & double_longitude != 0)
phone_sensed_timestamps <- read_csv(snakemake@input[["phone_sensed_timestamps"]], col_types = cols_only(timestamp = col_double()))
locations <- read.csv(snakemake@input[["locations"]]) %>%
filter(double_latitude != 0 & double_longitude != 0) %>%
drop_na(double_longitude, double_latitude)
locations_to_use <- snakemake@params["locations_to_use"]
if(!locations_to_use %in% c("ALL", "FUSED_RESAMPLED", "GPS")){
print("Unkown location filter, provide one of the following three: ALL, GPS, or FUSED_RESAMPLED")
quit(save = "no", status = 1, runLast = FALSE)
@ -31,44 +25,35 @@ if(locations_to_use == "ALL"){
} else if(locations_to_use == "FUSED_RESAMPLED"){
locations <- locations %>% filter(provider == "fused")
if(nrow(locations) > 0){
sensed_minute_bins <- phone_sensed_bins %>%
pivot_longer(-local_date, names_to = c("hour", "bin"), names_sep = "_", values_to = "sensor_count") %>%
mutate(hour = as.integer(hour), bin = as.integer(bin)) %>%
complete(nesting(local_date, hour), bin = seq(0, 59,1)) %>%
fill(sensor_count) %>%
mutate(timestamp = as.numeric(as.POSIXct(paste0(local_date, " ", hour,":", bin,":00"), format = "%Y-%m-%d %H:%M:%S", tz = timezone)) * 1000 ) %>%
filter(sensor_count > 0) %>%
select(timestamp)
resampled_locations <- locations %>%
select(-assigned_segments) %>%
bind_rows(sensed_minute_bins) %>%
mutate(provider = replace_na(provider, "resampled")) %>%
arrange(timestamp) %>%
processed_locations <- locations %>%
# TODO filter repeated location rows based on the accurcy
distinct(timestamp, .keep_all = TRUE) %>%
bind_rows(phone_sensed_timestamps) %>%
arrange(timestamp) %>%
# We group and therefore, fill in, missing rows that appear after a valid fused location record and exist
# within consecutive_threshold minutes from each other
mutate(consecutive_time_diff = c(1, diff(timestamp)),
resample_group = cumsum(!is.na(double_longitude) | consecutive_time_diff > (1000 * 60 * consecutive_threshold))) %>%
group_by(resample_group) %>%
# drop rows that are logged after time_since_valid_location minutes from the last valid fused location
filter((timestamp - first(timestamp) < (1000 * 60 * time_since_valid_location))) %>%
fill(-timestamp, -resample_group) %>%
select(-consecutive_time_diff) %>%
drop_na(double_longitude, double_latitude, accuracy) %>%
# Add local date_time
mutate(utc_date_time = as.POSIXct(timestamp/1000, origin="1970-01-01", tz="UTC"),
local_date_time = format(utc_date_time, tz = timezone, usetz = F)) %>%
separate(local_date_time, c("local_date","local_time"), "\\s", remove = FALSE) %>%
separate(local_time, c("local_hour", "local_minute"), ":", remove = FALSE, extra = "drop") %>%
mutate(local_hour = as.numeric(local_hour),
local_minute = as.numeric(local_minute)) %>%
# Delete resampled rows that exist in the same minute as other original (fused) rows
group_by(local_date, local_hour, local_minute) %>%
mutate(n = n()) %>%
filter(n == 1 | (n > 1 & provider == "fused")) %>%
select(-n) %>%
ungroup()
processed_locations <- assign_to_day_segment(resampled_locations, day_segments, day_segments_type)
resample_group = cumsum(!is.na(double_longitude) | consecutive_time_diff > (1000 * 60 * consecutive_threshold))) %>%
group_by(resample_group) %>%
# Filter those rows that are further away than time_since_valid_location since the last fused location
mutate(time_from_fused = timestamp - first(timestamp)) %>%
filter(provider == "fused" | (time_from_fused < (1000 * 60 * time_since_valid_location))) %>%
# Summarise the period to resample for
summarise(limit = max(timestamp), timestamp = first(timestamp), double_latitude = first(double_latitude), double_longitude = first(double_longitude),
double_bearing=first(double_bearing), double_speed = first(double_speed), double_altitude=first(double_altitude), provider=first(provider),
accuracy=first(accuracy), label=first(label)) %>%
# the limit will be equal to the next timestamp-1 or the last binded timestamp (limit) plus the consecutive_threshold buffer
# you can think of consecutive_threshold as the period a location row is valid for
mutate(limit = pmin(lead(timestamp, default = 9999999999999) - 1, limit + (1000 * 60 * consecutive_threshold)),
n_resample = (limit - timestamp)%/%60001,
n_resample = if_else(n_resample == 0, 1, n_resample)) %>%
drop_na(double_longitude, double_latitude) %>%
uncount(weights = n_resample, .id = "id") %>%
mutate(provider = if_else(id > 1, "resampled", provider),
id = id -1,
timestamp = timestamp + (id * 60000)) %>%
ungroup() %>%
select(-resample_group, -limit, -id)
} else {
processed_locations <- locations
}