Refactor select_days_to_analyse, fix merge bugs, add clean metrics for model

pull/95/head
JulioV 2020-03-17 21:15:53 -04:00
parent 8902a29e06
commit 0e173872df
11 changed files with 141 additions and 36 deletions

2
.gitignore vendored
View File

@ -107,4 +107,4 @@ reports/
*.Rproj
.RData
.Rhistory
*_profile/
sn_profile_*/

View File

@ -66,11 +66,18 @@ rule all:
pid = config["PIDS"],
day_segment = config["STEP"]["DAY_SEGMENTS"]),
# Models
expand("data/processed/{pid}/metrics_for_individual_model/{source}_{day_segment}.csv",
expand("data/processed/{pid}/metrics_for_individual_model/{source}_{day_segment}_original.csv",
pid = config["PIDS"],
source = config["METRICS_FOR_ANALYSIS"]["SOURCES"],
day_segment = config["METRICS_FOR_ANALYSIS"]["DAY_SEGMENTS"]),
expand("data/processed/metrics_for_population_model/{source}_{day_segment}.csv",
expand("data/processed/metrics_for_population_model/{source}_{day_segment}_original.csv",
source = config["METRICS_FOR_ANALYSIS"]["SOURCES"],
day_segment = config["METRICS_FOR_ANALYSIS"]["DAY_SEGMENTS"]),
expand("data/processed/{pid}/metrics_for_individual_model/{source}_{day_segment}_clean.csv",
pid = config["PIDS"],
source = config["METRICS_FOR_ANALYSIS"]["SOURCES"],
day_segment = config["METRICS_FOR_ANALYSIS"]["DAY_SEGMENTS"]),
expand("data/processed/metrics_for_population_model/{source}_{day_segment}_clean.csv",
source = config["METRICS_FOR_ANALYSIS"]["SOURCES"],
day_segment = config["METRICS_FOR_ANALYSIS"]["DAY_SEGMENTS"]),
# Vizualisations

View File

@ -125,12 +125,28 @@ STEP:
INCLUDE_ZERO_STEP_ROWS: True
METRICS_FOR_ANALYSIS:
GROUNDTRUTH_TABLE: participant_info
SOURCES: &sources ["phone_metrics", "fitbit_metrics", "phone_fitbit_metrics"]
DAY_SEGMENTS: *day_segments
PHONE_METRICS: [accelerometer, applications_foreground, battery, call_incoming, call_missed, call_outgoing, google_activity_recognition, light, location_barnett, screen, sms_received, sms_sent]
FITBIT_METRICS: [fitbit_heartrate, fitbit_step]
PHONE_FITBIT_METRICS: "" # This array is merged in the input_merge_features_of_single_participant function in models.snakefile
DROP_VALID_SENSED_DAYS: True
# Whether or not to include only days with enough valid sensed hours
# logic can be found in rule phone_valid_sensed_days of rules/preprocessing.snakefile
DROP_VALID_SENSED_DAYS:
ENABLED: True
# Whether or not to include certain days in the analysis, logic can be found in rule days_to_analyse of rules/mystudy.snakefile
# If you want to include all days downloaded for each participant, set ENABLED to False
DAYS_TO_ANALYSE:
ENABLED: True
DAYS_BEFORE_SURGERY: 15
DAYS_IN_HOSPITAL: F # T or F
DAYS_AFTER_DISCHARGE: 7
DAYS_IN_HOSPITAL: F
# Cleaning Parameters
COLS_NAN_THRESHOLD: 0.5
COLS_VAR_THRESHOLD: True
ROWS_NAN_THRESHOLD: 0.5
PARTICIPANTS_DAY_THRESHOLD: 7

View File

@ -4,22 +4,66 @@ def input_merge_metrics_of_single_participant(wildcards):
else:
return expand("data/processed/{pid}/{metrics}_{day_segment}.csv", pid=wildcards.pid, metrics=config["METRICS_FOR_ANALYSIS"][wildcards.source.upper()], day_segment=wildcards.day_segment)
def optional_input_days_to_include(wildcards):
if config["METRICS_FOR_ANALYSIS"]["DAYS_TO_ANALYSE"]["ENABLED"]:
# This input automatically trigers the rule days_to_analyse in mystudy.snakefile
return ["data/interim/{pid}/days_to_analyse" + \
"_" + str(config["METRICS_FOR_ANALYSIS"]["DAYS_TO_ANALYSE"]["DAYS_BEFORE_SURGERY"]) + \
"_" + str(config["METRICS_FOR_ANALYSIS"]["DAYS_TO_ANALYSE"]["DAYS_IN_HOSPITAL"]) + \
"_" + str(config["METRICS_FOR_ANALYSIS"]["DAYS_TO_ANALYSE"]["DAYS_AFTER_DISCHARGE"]) + ".csv"]
else:
return []
def optional_input_valid_sensed_days(wildcards):
if config["METRICS_FOR_ANALYSIS"]["DROP_VALID_SENSED_DAYS"]["ENABLED"]:
# This input automatically trigers the rule phone_valid_sensed_days in preprocessing.snakefile
return ["data/interim/{pid}/phone_valid_sensed_days.csv"]
else:
return []
rule merge_metrics_for_individual_model:
input:
metric_files = input_merge_metrics_of_single_participant,
phone_valid_sensed_days = "data/interim/{pid}/phone_valid_sensed_days.csv"
phone_valid_sensed_days = optional_input_valid_sensed_days,
days_to_include = optional_input_days_to_include
params:
drop_valid_sensed_days = config["METRICS_FOR_ANALYSIS"]["DROP_VALID_SENSED_DAYS"],
source = "{source}"
output:
"data/processed/{pid}/metrics_for_individual_model/{source}_{day_segment}.csv"
"data/processed/{pid}/metrics_for_individual_model/{source}_{day_segment}_original.csv"
script:
"../src/models/merge_metrics_for_individual_model.R"
rule merge_metrics_for_population_model:
input:
metric_files = expand("data/processed/{pid}/metrics_for_individual_model/{{source}}_{{day_segment}}.csv", pid=config["PIDS"])
metric_files = expand("data/processed/{pid}/metrics_for_individual_model/{{source}}_{{day_segment}}_original.csv", pid=config["PIDS"])
output:
"data/processed/metrics_for_population_model/{source}_{day_segment}.csv"
"data/processed/metrics_for_population_model/{source}_{day_segment}_original.csv"
script:
"../src/models/merge_metrics_for_population_model.R"
rule clean_metrics_for_individual_model:
input:
rules.merge_metrics_for_individual_model.output
params:
cols_nan_threshold = config["METRICS_FOR_ANALYSIS"]["COLS_NAN_THRESHOLD"],
cols_var_threshold = config["METRICS_FOR_ANALYSIS"]["COLS_VAR_THRESHOLD"],
rows_nan_threshold = config["METRICS_FOR_ANALYSIS"]["ROWS_NAN_THRESHOLD"],
participants_day_threshold = config["METRICS_FOR_ANALYSIS"]["PARTICIPANTS_DAY_THRESHOLD"]
output:
"data/processed/{pid}/metrics_for_individual_model/{source}_{day_segment}_clean.csv"
script:
"../src/models/clean_metrics_for_model.R"
rule clean_metrics_for_population_model:
input:
rules.merge_metrics_for_population_model.output
params:
cols_nan_threshold = config["METRICS_FOR_ANALYSIS"]["COLS_NAN_THRESHOLD"],
cols_var_threshold = config["METRICS_FOR_ANALYSIS"]["COLS_VAR_THRESHOLD"],
rows_nan_threshold = config["METRICS_FOR_ANALYSIS"]["ROWS_NAN_THRESHOLD"],
participants_day_threshold = config["METRICS_FOR_ANALYSIS"]["PARTICIPANTS_DAY_THRESHOLD"]
output:
"data/processed/metrics_for_population_model/{source}_{day_segment}_clean.csv"
script:
"../src/models/clean_metrics_for_model.R"

View File

@ -1,11 +1,10 @@
rule days_to_analyse:
input:
participant_info = "data/external/participant_info.csv",
pid_file = "data/external/{pid}"
participant_info = "data/raw/{pid}/" + config["METRICS_FOR_ANALYSIS"]["GROUNDTRUTH_TABLE"] + "_raw.csv"
params:
days_before_surgery = config["METRICS_FOR_ANALYSIS"]["DAYS_BEFORE_SURGERY"],
days_after_discharge = config["METRICS_FOR_ANALYSIS"]["DAYS_AFTER_DISCHARGE"],
days_in_hospital= config["METRICS_FOR_ANALYSIS"]["DAYS_IN_HOSPITAL"]
days_before_surgery = "{days_before_surgery}",
days_in_hospital = "{days_in_hospital}",
days_after_discharge= "{days_after_discharge}"
output:
"data/interim/{pid}/days_to_analyse_{days_before_surgery}_{days_in_hospital}_{days_after_discharge}.csv"
script:

View File

@ -0,0 +1,40 @@
source("packrat/init.R")
library(tidyr)
library(dplyr)
filter_participant_without_enough_days <- function(clean_metrics, participants_day_threshold){
if("pid" %in% colnames(clean_metrics))
clean_metrics <- clean_metrics %>% group_by(pid)
clean_metrics <- clean_metrics %>%
filter(n() >= participants_day_threshold) %>%
ungroup()
return(clean_metrics)
}
clean_metrics <- read.csv(snakemake@input[[1]])
cols_nan_threshold <- snakemake@params[["cols_nan_threshold"]]
drop_zero_variance_columns <- snakemake@params[["cols_var_threshold"]]
rows_nan_threshold <- snakemake@params[["rows_nan_threshold"]]
participants_day_threshold <- snakemake@params[["participants_day_threshold"]]
# We have to do this before and after dropping rows, that's why is duplicated
clean_metrics <- filter_participant_without_enough_days(clean_metrics, participants_day_threshold)
# drop columns with a percentage of NA values above cols_nan_threshold
if(nrow(clean_metrics))
clean_metrics <- clean_metrics %>% select_if(~ sum(is.na(.)) / length(.) <= cols_nan_threshold )
if(drop_zero_variance_columns)
clean_metrics <- clean_metrics %>% select_if(grepl("pid|local_date",names(.)) | sapply(., n_distinct) > 1)
# drop rows with a percentage of NA values above rows_nan_threshold
clean_metrics <- clean_metrics %>%
mutate(percentage_na = rowSums(is.na(.)) / ncol(.)) %>%
filter(percentage_na < rows_nan_threshold) %>%
select(-percentage_na)
clean_metrics <- filter_participant_without_enough_days(clean_metrics, participants_day_threshold)
write.csv(clean_metrics, snakemake@output[[1]], row.names = FALSE)

View File

@ -5,16 +5,20 @@ library(purrr)
library(dplyr)
metric_files <- snakemake@input[["metric_files"]]
phone_valid_sensed_days <- read.csv(snakemake@input[["phone_valid_sensed_days"]])
drop_valid_sensed_days <- snakemake@params[["drop_valid_sensed_days"]]
phone_valid_sensed_days <- snakemake@input[["phone_valid_sensed_days"]]
days_to_include <- snakemake@input[["days_to_include"]]
source <- snakemake@params[["source"]]
metrics_for_individual_model <- metric_files %>%
map(read.csv, stringsAsFactors = F, colClasses = c(local_date = "character")) %>%
reduce(full_join, by="local_date")
if(drop_valid_sensed_days && source == "phone_metrics"){
metrics_for_individual_model <- merge(metrics_for_individual_model, phone_valid_sensed_days, by="local_date") %>% select(-valid_hours)
}
if(!is.null(phone_valid_sensed_days) && source %in% c("phone_metrics", "phone_fitbit_metrics")){
metrics_for_individual_model <- merge(metrics_for_individual_model, read.csv(phone_valid_sensed_days), by="local_date") %>% select(-valid_hours)
}
if(!is.null(days_to_include)){
metrics_for_individual_model <- merge(metrics_for_individual_model, read.csv(days_to_include), by="local_date")
}
write.csv(metrics_for_individual_model, snakemake@output[[1]], row.names = FALSE)

View File

@ -7,10 +7,10 @@ library(stringr)
metric_files <- snakemake@input[["metric_files"]]
metrics_of_all_participants <- data_frame(filename = metric_files) %>% # create a data frame
metrics_of_all_participants <- tibble(filename = metric_files) %>% # create a data frame
mutate(file_contents = map(filename, ~ read.csv(., stringsAsFactors = F, colClasses = c(local_date = "character"))),
pid = str_match(filename, ".*/(p[0-9]{2})/.*")[,2]) %>%
unnest() %>%
pid = str_match(filename, ".*/([a-zA-Z]+?[0-9]+?)/.*")[,2]) %>%
unnest(cols = c(file_contents)) %>%
select(-filename)
write.csv(metrics_of_all_participants, snakemake@output[[1]], row.names = FALSE)

View File

@ -4,20 +4,15 @@ from datetime import timedelta
def appendDaysInRange(days_to_analyse, start_date, end_date):
num_of_days = (end_date - start_date).days
for day in range(num_of_days + 1):
days_to_analyse = days_to_analyse.append({"days_to_analyse": start_date + timedelta(days = day)}, ignore_index=True)
days_to_analyse = days_to_analyse.append({"local_date": start_date + timedelta(days = day)}, ignore_index=True)
return days_to_analyse
days_before_surgery = snakemake.params["days_before_surgery"]
days_in_hospital = snakemake.params["days_in_hospital"]
days_after_discharge = snakemake.params["days_after_discharge"]
days_before_surgery = int(snakemake.params["days_before_surgery"])
days_in_hospital = str(snakemake.params["days_in_hospital"])
days_after_discharge = int(snakemake.params["days_after_discharge"])
participant_info = pd.read_csv(snakemake.input["participant_info"], parse_dates=["surgery_date", "discharge_date"])
with open(snakemake.input["pid_file"], encoding="ISO-8859-1") as external_file:
pid_file_content = external_file.readlines()
device_ids = pid_file_content[0].strip().split(",")
days_to_analyse = pd.DataFrame(columns = ["local_date"])
days_to_analyse = pd.DataFrame(columns = ["days_to_analyse"])
participant_info = participant_info[participant_info["device_id"].isin(device_ids)]
try:
surgery_date, discharge_date = participant_info["surgery_date"].iloc[0].date(), participant_info["discharge_date"].iloc[0].date()
except: