Refactor the function to fetch provider features
parent
b0f1477d7e
commit
011b9736d5
|
@ -64,7 +64,7 @@ rule ios_activity_recognition_deltas:
|
|||
|
||||
rule locations_python_features:
|
||||
input:
|
||||
location_data = expand("data/raw/{{pid}}/{sensor}_processed_{locations_to_use}.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]),
|
||||
sensor_data = expand("data/raw/{{pid}}/{sensor}_processed_{locations_to_use}.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]),
|
||||
day_segments_labels = "data/interim/day_segments_labels.csv"
|
||||
params:
|
||||
provider = lambda wildcards: config["LOCATIONS"]["PROVIDERS"][wildcards.provider_key],
|
||||
|
@ -72,11 +72,11 @@ rule locations_python_features:
|
|||
output:
|
||||
"data/interim/{pid}/locations_features/locations_python_{provider_key}.csv"
|
||||
script:
|
||||
"../src/features/location/locations_entry.py"
|
||||
"../src/features/locations/locations_entry.py"
|
||||
|
||||
rule locations_r_features:
|
||||
input:
|
||||
location_data = expand("data/raw/{{pid}}/{sensor}_processed_{locations_to_use}.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]),
|
||||
sensor_data = expand("data/raw/{{pid}}/{sensor}_processed_{locations_to_use}.csv", sensor=config["LOCATIONS"]["DB_TABLE"], locations_to_use=config["LOCATIONS"]["LOCATIONS_TO_USE"]),
|
||||
day_segments_labels = "data/interim/day_segments_labels.csv"
|
||||
params:
|
||||
provider = lambda wildcards: config["LOCATIONS"]["PROVIDERS"][wildcards.provider_key],
|
||||
|
@ -84,7 +84,7 @@ rule locations_r_features:
|
|||
output:
|
||||
"data/interim/{pid}/locations_features/locations_r_{provider_key}.csv"
|
||||
script:
|
||||
"../src/features/location/locations_entry.R"
|
||||
"../src/features/locations/locations_entry.R"
|
||||
|
||||
rule bluetooth_features:
|
||||
input:
|
||||
|
|
|
@ -1,78 +0,0 @@
|
|||
library('tidyr')
|
||||
library('stringr')
|
||||
library('entropy')
|
||||
|
||||
Mode <- function(v) {
|
||||
uniqv <- unique(v)
|
||||
uniqv[which.max(tabulate(match(v, uniqv)))]
|
||||
}
|
||||
|
||||
base_call_features <- function(calls, call_type, day_segment, requested_features){
|
||||
# Output dataframe
|
||||
features = data.frame(local_segment = character(), stringsAsFactors = FALSE)
|
||||
|
||||
# The name of the features this function can compute
|
||||
base_features_names <- c("count", "distinctcontacts", "meanduration", "sumduration", "minduration", "maxduration", "stdduration", "modeduration", "entropyduration", "timefirstcall", "timelastcall", "countmostfrequentcontact")
|
||||
|
||||
# The subset of requested features this function can compute
|
||||
features_to_compute <- intersect(base_features_names, requested_features)
|
||||
|
||||
# Filter rows that belong to the calls type and day segment of interest
|
||||
call_type_label = ifelse(call_type == "incoming", "1", ifelse(call_type == "outgoing", "2", ifelse(call_type == "missed", "3", NA)))
|
||||
if(is.na(call_type_label))
|
||||
stop(paste("Call type can online be incoming, outgoing or missed but instead you typed: ", call_type))
|
||||
|
||||
# Filter the rows that belong to day_segment, and put the segment full name in a new column for grouping
|
||||
date_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2}"
|
||||
hour_regex = "[0-9]{2}:[0-9]{2}:[0-9]{2}"
|
||||
calls <- calls %>%
|
||||
filter(call_type == call_type_label) %>%
|
||||
filter(grepl(paste0("\\[", day_segment, "#"),assigned_segments)) %>%
|
||||
mutate(local_segment = str_extract(assigned_segments, paste0("\\[", day_segment, "#", date_regex, "#", hour_regex, "#", date_regex, "#", hour_regex, "\\]")),
|
||||
local_segment = str_sub(local_segment, 2, -2)) # get rid of first and last character([])
|
||||
|
||||
# If there are not features or data to work with, return an empty df with appropiate columns names
|
||||
if(length(features_to_compute) == 0)
|
||||
return(features)
|
||||
if(nrow(calls) < 1)
|
||||
return(cbind(features, read.csv(text = paste(paste("call", call_type, features_to_compute, sep = "_"), collapse = ","), stringsAsFactors = FALSE)))
|
||||
|
||||
for(feature_name in features_to_compute){
|
||||
if(feature_name == "countmostfrequentcontact"){
|
||||
# Get the number of messages for the most frequent contact throughout the study
|
||||
mostfrequentcontact <- calls %>%
|
||||
group_by(trace) %>%
|
||||
mutate(N=n()) %>%
|
||||
ungroup() %>%
|
||||
filter(N == max(N)) %>%
|
||||
head(1) %>% # if there are multiple contacts with the same amount of messages pick the first one only
|
||||
pull(trace)
|
||||
feature <- calls %>%
|
||||
filter(trace == mostfrequentcontact) %>%
|
||||
group_by(local_segment) %>%
|
||||
summarise(!!paste("call", call_type, feature_name, sep = "_") := n()) %>%
|
||||
replace(is.na(.), 0)
|
||||
features <- merge(features, feature, by="local_segment", all = TRUE)
|
||||
} else {
|
||||
feature <- calls %>%
|
||||
group_by(local_segment)
|
||||
|
||||
feature <- switch(feature_name,
|
||||
"count" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := n()),
|
||||
"distinctcontacts" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := n_distinct(trace)),
|
||||
"meanduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := mean(call_duration)),
|
||||
"sumduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := sum(call_duration)),
|
||||
"minduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := min(call_duration)),
|
||||
"maxduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := max(call_duration)),
|
||||
"stdduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := sd(call_duration)),
|
||||
"modeduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := Mode(call_duration)),
|
||||
"entropyduration" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := entropy.MillerMadow(call_duration)),
|
||||
"timefirstcall" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := first(local_hour) * 60 + first(local_minute)),
|
||||
"timelastcall" = feature %>% summarise(!!paste("call", call_type, feature_name, sep = "_") := last(local_hour) * 60 + last(local_minute)))
|
||||
|
||||
features <- merge(features, feature, by="local_segment", all = TRUE)
|
||||
}
|
||||
}
|
||||
features <- features %>% mutate_at(vars(contains("countmostfrequentcontact")), list( ~ replace_na(., 0)))
|
||||
return(features)
|
||||
}
|
|
@ -1,44 +0,0 @@
|
|||
source("renv/activate.R")
|
||||
source("src/features/utils/utils.R")
|
||||
library("dplyr")
|
||||
library("stringr")
|
||||
library("tidyr")
|
||||
|
||||
location_data <- read.csv(snakemake@input[["location_data"]], stringsAsFactors = FALSE)
|
||||
day_segments_labels <- read.csv(snakemake@input[["day_segments_labels"]], stringsAsFactors = FALSE)
|
||||
provider <- snakemake@params["provider"][["provider"]]
|
||||
provider_key <- snakemake@params["provider_key"]
|
||||
|
||||
location_features <- data.frame(local_segment = character(), stringsAsFactors = FALSE)
|
||||
|
||||
if(!"FEATURES" %in% names(provider))
|
||||
stop(paste0("Provider config[LOCATION][PROVIDERS][", provider_key,"] is missing a FEATURES attribute in config.yaml"))
|
||||
|
||||
if(provider[["COMPUTE"]] == TRUE){
|
||||
code_path <- paste0("src/features/location/", provider[["SRC_FOLDER"]], "/main.R")
|
||||
source(code_path)
|
||||
features_function <- match.fun(paste0(provider[["SRC_FOLDER"]], "_location_features"))
|
||||
day_segments <- day_segments_labels %>% pull(label)
|
||||
for (day_segment in day_segments){
|
||||
print(paste(rapids_log_tag,"Processing", provider_key, day_segment))
|
||||
|
||||
features <- features_function(location_data, day_segment, provider)
|
||||
|
||||
# Check all features names contain the provider key so they are unique
|
||||
features_names <- colnames(features %>% select(-local_segment))
|
||||
if(any(!grepl(paste0(".*(",str_to_lower(provider_key),").*"), features_names)))
|
||||
stop(paste("The name of all location features of", provider_key," must contain its name in lower case but the following don't [", paste(features_names[!grepl(paste0(".*(",str_to_lower(provider_key),").*"), features_names)], collapse = ", "), "]"))
|
||||
|
||||
location_features <- merge(location_features, features, all = TRUE)
|
||||
}
|
||||
} else {
|
||||
for(feature in provider[["FEATURES"]])
|
||||
location_features[,feature] <- NA
|
||||
}
|
||||
|
||||
location_features <- location_features %>% separate(col = local_segment,
|
||||
into = c("local_segment_label", "local_start_date", "local_start_time", "local_end_date", "local_end_time"),
|
||||
sep = "#",
|
||||
remove = FALSE)
|
||||
|
||||
write.csv(location_features, snakemake@output[[1]], row.names = FALSE)
|
|
@ -1,39 +0,0 @@
|
|||
import pandas as pd
|
||||
from importlib import import_module, util
|
||||
from pathlib import Path
|
||||
|
||||
# import filter_data_by_segment from src/features/utils/utils.py
|
||||
spec = util.spec_from_file_location("util", str(Path(snakemake.scriptdir).parent / "utils" / "utils.py"))
|
||||
mod = util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
filter_data_by_segment = getattr(mod, "filter_data_by_segment")
|
||||
rapids_log_tag = getattr(mod, "rapids_log_tag")
|
||||
|
||||
location_data = pd.read_csv(snakemake.input["location_data"][0])
|
||||
day_segments_labels = pd.read_csv(snakemake.input["day_segments_labels"], header=0)
|
||||
mypath = snakemake.params["mypath"]
|
||||
provider = snakemake.params["provider"]
|
||||
provider_key = snakemake.params["provider_key"]
|
||||
location_features = pd.DataFrame(columns=["local_segment"])
|
||||
|
||||
if "FEATURES" not in provider:
|
||||
raise ValueError("Provider config[LOCATION][PROVIDERS][{}] is missing a FEATURES attribute in config.yaml".format(provider_key))
|
||||
|
||||
if provider["COMPUTE"] == True:
|
||||
code_path = provider["SRC_FOLDER"] + ".main"
|
||||
feature_module = import_module(code_path)
|
||||
feature_function = getattr(feature_module, provider["SRC_FOLDER"] + "_location_features")
|
||||
|
||||
for day_segment in day_segments_labels["label"]:
|
||||
print("{} Processing {} {}".format(rapids_log_tag, provider_key, day_segment))
|
||||
features = feature_function(location_data, day_segment, provider, filter_data_by_segment=filter_data_by_segment)
|
||||
location_features = location_features.merge(features, how="outer")
|
||||
else:
|
||||
for feature in provider["FEATURES"]:
|
||||
location_features[feature] = None
|
||||
|
||||
segment_colums = pd.DataFrame()
|
||||
segment_colums[["local_segment_label", "local_start_date", "local_start_time", "local_end_date", "local_end_time"]] = location_features["local_segment"].str.split(pat="#", expand=True)
|
||||
for i in range(segment_colums.shape[1]):
|
||||
location_features.insert(1 + i, segment_colums.columns[i], segment_colums[segment_colums.columns[i]])
|
||||
location_features.to_csv(snakemake.output[0], index=False)
|
|
@ -3,7 +3,7 @@ library("dplyr")
|
|||
library("stringr")
|
||||
|
||||
# Load Ian Barnett's code. Taken from https://scholar.harvard.edu/ibarnett/software/gpsmobility
|
||||
file.sources = list.files(c("src/features/location/barnett/library"), pattern="*.R$", full.names=TRUE, ignore.case=TRUE)
|
||||
file.sources = list.files(c("src/features/locations/barnett/library"), pattern="*.R$", full.names=TRUE, ignore.case=TRUE)
|
||||
sapply(file.sources,source,.GlobalEnv)
|
||||
|
||||
create_empty_file <- function(requested_features){
|
||||
|
@ -27,7 +27,7 @@ create_empty_file <- function(requested_features){
|
|||
) %>% select(all_of(requested_features)))
|
||||
}
|
||||
|
||||
barnett_location_features <- function(location_data, day_segment, params){
|
||||
barnett_features <- function(location_data, day_segment, params){
|
||||
location_features <- NULL
|
||||
location <- location_data
|
||||
accuracy_limit <- params[["ACCURACY_LIMIT"]]
|
|
@ -4,7 +4,7 @@ from astropy.timeseries import LombScargle
|
|||
from sklearn.cluster import DBSCAN
|
||||
from math import radians, cos, sin, asin, sqrt
|
||||
|
||||
def doryab_location_features(location_data, day_segment, params, filter_data_by_segment, *args, **kwargs):
|
||||
def doryab_features(location_data, day_segment, params, filter_data_by_segment, *args, **kwargs):
|
||||
requested_features = params["FEATURES"]
|
||||
dbscan_eps = params["DBSCAN_EPS"]
|
||||
dbscan_minsamples = params["DBSCAN_MINSAMPLES"]
|
|
@ -0,0 +1,13 @@
|
|||
source("renv/activate.R")
|
||||
source("src/features/utils/utils.R")
|
||||
library("dplyr")
|
||||
library("tidyr")
|
||||
|
||||
sensor_data_file <- snakemake@input[["sensor_data"]]
|
||||
day_segments_file <- snakemake@input[["day_segments_labels"]]
|
||||
provider <- snakemake@params["provider"][["provider"]]
|
||||
provider_key <- snakemake@params["provider_key"]
|
||||
|
||||
sensor_features <- fetch_provider_features(provider, provider_key, "locations", sensor_data_file, day_segments_file)
|
||||
|
||||
write.csv(sensor_features, snakemake@output[[1]], row.names = FALSE)
|
|
@ -0,0 +1,18 @@
|
|||
import pandas as pd
|
||||
from importlib import import_module, util
|
||||
from pathlib import Path
|
||||
|
||||
# import fetch_provider_features from src/features/utils/utils.py
|
||||
spec = util.spec_from_file_location("util", str(Path(snakemake.scriptdir).parent / "utils" / "utils.py"))
|
||||
mod = util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
fetch_provider_features = getattr(mod, "fetch_provider_features")
|
||||
|
||||
sensor_data_file = snakemake.input["sensor_data"][0]
|
||||
day_segments_file = snakemake.input["day_segments_labels"]
|
||||
provider = snakemake.params["provider"]
|
||||
provider_key = snakemake.params["provider_key"]
|
||||
|
||||
sensor_features = fetch_provider_features(provider, provider_key, "locations", sensor_data_file, day_segments_file)
|
||||
|
||||
sensor_features.to_csv(snakemake.output[0], index=False)
|
|
@ -1,4 +1,7 @@
|
|||
library("stringr")
|
||||
|
||||
rapids_log_tag <- "RAPIDS:"
|
||||
|
||||
filter_data_by_segment <- function(data, day_segment){
|
||||
# Filter the rows that belong to day_segment, and put the segment full name in a new column for grouping
|
||||
date_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2}"
|
||||
|
@ -9,4 +12,41 @@ filter_data_by_segment <- function(data, day_segment){
|
|||
local_segment = str_sub(local_segment, 2, -2)) # get rid of first and last character([])
|
||||
return(data)
|
||||
}
|
||||
rapids_log_tag <- "RAPIDS:"
|
||||
|
||||
fetch_provider_features <- function(provider, provider_key, config_key, sensor_data_file, day_segments_file){
|
||||
sensor_features <- data.frame(local_segment = character(), stringsAsFactors = FALSE)
|
||||
|
||||
sensor_data <- read.csv(sensor_data_file, stringsAsFactors = FALSE)
|
||||
day_segments_labels <- read.csv(day_segments_file, stringsAsFactors = FALSE)
|
||||
|
||||
if(!"FEATURES" %in% names(provider))
|
||||
stop(paste0("Provider config[CALLS][PROVIDERS][", provider_key,"] is missing a FEATURES attribute in config.yaml"))
|
||||
|
||||
if(provider[["COMPUTE"]] == TRUE){
|
||||
code_path <- paste0("src/features/", config_key,"/", provider[["SRC_FOLDER"]], "/main.R")
|
||||
source(code_path)
|
||||
features_function <- match.fun(paste0(provider[["SRC_FOLDER"]], "_features"))
|
||||
day_segments <- day_segments_labels %>% pull(label)
|
||||
for (day_segment in day_segments){
|
||||
print(paste(rapids_log_tag,"Processing", config_key, provider_key, day_segment))
|
||||
|
||||
features <- features_function(sensor_data, day_segment, provider)
|
||||
|
||||
# Check all features names contain the provider key so they are unique
|
||||
features_names <- colnames(features %>% select(-local_segment))
|
||||
if(any(!grepl(paste0(".*(",str_to_lower(provider_key),").*"), features_names)))
|
||||
stop(paste("The name of all calls features of", provider_key," must contain its name in lower case but the following don't [", paste(features_names[!grepl(paste0(".*(",str_to_lower(provider_key),").*"), features_names)], collapse = ", "), "]"))
|
||||
|
||||
sensor_features <- merge(sensor_features, features, all = TRUE)
|
||||
}
|
||||
} else {
|
||||
for(feature in provider[["FEATURES"]])
|
||||
sensor_features[,feature] <- NA
|
||||
}
|
||||
|
||||
sensor_features <- sensor_features %>% separate(col = local_segment,
|
||||
into = c("local_segment_label", "local_start_date", "local_start_time", "local_end_date", "local_end_time"),
|
||||
sep = "#",
|
||||
remove = FALSE)
|
||||
return(sensor_features)
|
||||
}
|
|
@ -1,3 +1,4 @@
|
|||
rapids_log_tag = "RAPIDS:"
|
||||
|
||||
def filter_data_by_segment(data, day_segment):
|
||||
date_regex = "[0-9]{4}[\-|\/][0-9]{2}[\-|\/][0-9]{2}"
|
||||
|
@ -6,4 +7,36 @@ def filter_data_by_segment(data, day_segment):
|
|||
data["local_segment"] = data["assigned_segments"].str.extract(segment_regex, expand=True)
|
||||
return(data.dropna(subset = ["local_segment"]))
|
||||
|
||||
rapids_log_tag = "RAPIDS:"
|
||||
def fetch_provider_features(provider, provider_key, config_key, sensor_data_file, day_segments_file):
|
||||
import pandas as pd
|
||||
from importlib import import_module, util
|
||||
|
||||
sensor_features = pd.DataFrame(columns=["local_segment"])
|
||||
sensor_data = pd.read_csv(sensor_data_file)
|
||||
day_segments_labels = pd.read_csv(day_segments_file, header=0)
|
||||
if "FEATURES" not in provider:
|
||||
raise ValueError("Provider config[{}][PROVIDERS][{}] is missing a FEATURES attribute in config.yaml".format(config_key.upper(), provider_key))
|
||||
|
||||
if provider["COMPUTE"] == True:
|
||||
code_path = provider["SRC_FOLDER"] + ".main"
|
||||
feature_module = import_module(code_path)
|
||||
feature_function = getattr(feature_module, provider["SRC_FOLDER"] + "_features")
|
||||
|
||||
for day_segment in day_segments_labels["label"]:
|
||||
print("{} Processing {} {} {}".format(rapids_log_tag, config_key, provider_key, day_segment))
|
||||
print("---")
|
||||
features = feature_function(sensor_data, day_segment, provider, filter_data_by_segment=filter_data_by_segment)
|
||||
print("2")
|
||||
sensor_features = sensor_features.merge(features, how="outer")
|
||||
else:
|
||||
for feature in provider["FEATURES"]:
|
||||
sensor_features[feature] = None
|
||||
print("3")
|
||||
segment_colums = pd.DataFrame()
|
||||
split_segemnt_columns = sensor_features["local_segment"].str.split(pat="#", expand=True)
|
||||
new_segment_columns = split_segemnt_columns if split_segemnt_columns.shape[1] == 5 else pd.DataFrame(columns=["local_segment_label", "local_start_date", "local_start_time", "local_end_date", "local_end_time"])
|
||||
segment_colums[["local_segment_label", "local_start_date", "local_start_time", "local_end_date", "local_end_time"]] = new_segment_columns
|
||||
for i in range(segment_colums.shape[1]):
|
||||
sensor_features.insert(1 + i, segment_colums.columns[i], segment_colums[segment_colums.columns[i]])
|
||||
|
||||
return sensor_features
|
Loading…
Reference in New Issue