Migrate messages to new segments
parent
31ec5b0da4
commit
14d2d694ce
|
@ -35,13 +35,13 @@ if config["PHONE_VALID_SENSED_DAYS"]["COMPUTE"]:
|
||||||
if config["MESSAGES"]["COMPUTE"]:
|
if config["MESSAGES"]["COMPUTE"]:
|
||||||
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"]))
|
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"]))
|
||||||
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"]))
|
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"]))
|
||||||
files_to_compute.extend(expand("data/processed/{pid}/messages_{messages_type}_{day_segment}.csv", pid=config["PIDS"], messages_type = config["MESSAGES"]["TYPES"], day_segment = config["MESSAGES"]["DAY_SEGMENTS"]))
|
files_to_compute.extend(expand("data/processed/{pid}/messages_{messages_type}.csv", pid=config["PIDS"], messages_type = config["MESSAGES"]["TYPES"]))
|
||||||
|
|
||||||
if config["CALLS"]["COMPUTE"]:
|
if config["CALLS"]["COMPUTE"]:
|
||||||
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"]))
|
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"]))
|
||||||
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"]))
|
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"]))
|
||||||
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime_unified.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"]))
|
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime_unified.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"]))
|
||||||
files_to_compute.extend(expand("data/processed/{pid}/calls_{call_type}.csv", pid=config["PIDS"], call_type=config["CALLS"]["TYPES"], day_segment = config["CALLS"]["DAY_SEGMENTS"]))
|
files_to_compute.extend(expand("data/processed/{pid}/calls_{call_type}.csv", pid=config["PIDS"], call_type=config["CALLS"]["TYPES"]))
|
||||||
|
|
||||||
if config["BARNETT_LOCATION"]["COMPUTE"]:
|
if config["BARNETT_LOCATION"]["COMPUTE"]:
|
||||||
if config["BARNETT_LOCATION"]["LOCATIONS_TO_USE"] == "RESAMPLE_FUSED":
|
if config["BARNETT_LOCATION"]["LOCATIONS_TO_USE"] == "RESAMPLE_FUSED":
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
rule messages_features:
|
rule messages_features:
|
||||||
input:
|
input:
|
||||||
expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["MESSAGES"]["DB_TABLE"])
|
expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["MESSAGES"]["DB_TABLE"]),
|
||||||
|
day_segments_labels = expand("data/interim/{sensor}_day_segments_labels.csv", sensor=config["MESSAGES"]["DB_TABLE"])
|
||||||
params:
|
params:
|
||||||
messages_type = "{messages_type}",
|
messages_type = "{messages_type}",
|
||||||
day_segment = "{day_segment}",
|
|
||||||
features = lambda wildcards: config["MESSAGES"]["FEATURES"][wildcards.messages_type]
|
features = lambda wildcards: config["MESSAGES"]["FEATURES"][wildcards.messages_type]
|
||||||
output:
|
output:
|
||||||
"data/processed/{pid}/messages_{messages_type}_{day_segment}.csv"
|
"data/processed/{pid}/messages_{messages_type}.csv"
|
||||||
script:
|
script:
|
||||||
"../src/features/messages_features.R"
|
"../src/features/messages_features.R"
|
||||||
|
|
||||||
|
|
|
@ -1,17 +1,9 @@
|
||||||
library('tidyr')
|
library('tidyr')
|
||||||
|
library('stringr')
|
||||||
filter_by_day_segment <- function(data, day_segment) {
|
|
||||||
if(day_segment %in% c("morning", "afternoon", "evening", "night"))
|
|
||||||
data <- data %>% filter(local_day_segment == day_segment)
|
|
||||||
else if(day_segment == "daily")
|
|
||||||
return(data)
|
|
||||||
else
|
|
||||||
return(data %>% head(0))
|
|
||||||
}
|
|
||||||
|
|
||||||
base_messages_features <- function(messages, messages_type, day_segment, requested_features){
|
base_messages_features <- function(messages, messages_type, day_segment, requested_features){
|
||||||
# Output dataframe
|
# Output dataframe
|
||||||
features = data.frame(local_date = character(), stringsAsFactors = FALSE)
|
features = data.frame(local_segment = character(), stringsAsFactors = FALSE)
|
||||||
|
|
||||||
# The name of the features this function can compute
|
# The name of the features this function can compute
|
||||||
base_features_names <- c("countmostfrequentcontact", "count", "distinctcontacts", "timefirstmessage", "timelastmessage")
|
base_features_names <- c("countmostfrequentcontact", "count", "distinctcontacts", "timefirstmessage", "timelastmessage")
|
||||||
|
@ -19,15 +11,20 @@ base_messages_features <- function(messages, messages_type, day_segment, request
|
||||||
# The subset of requested features this function can compute
|
# The subset of requested features this function can compute
|
||||||
features_to_compute <- intersect(base_features_names, requested_features)
|
features_to_compute <- intersect(base_features_names, requested_features)
|
||||||
|
|
||||||
# Filter rows that belong to the message type and day segment of interest
|
# Filter the rows that belong to day_segment, and put the segment full name in a new column for grouping
|
||||||
messages <- messages %>% filter(message_type == ifelse(messages_type == "received", "1", ifelse(messages_type == "sent", 2, NA))) %>%
|
date_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2}"
|
||||||
filter_by_day_segment(day_segment)
|
hour_regex = "[0-9]{2}:[0-9]{2}:[0-9]{2}"
|
||||||
|
messages <- messages %>%
|
||||||
|
filter(message_type == ifelse(messages_type == "received", "1", ifelse(messages_type == "sent", 2, NA))) %>%
|
||||||
|
filter(grepl(paste0("\\[", day_segment, "#"),assigned_segments)) %>%
|
||||||
|
mutate(local_segment = str_extract(assigned_segments, paste0("\\[", day_segment, "#", date_regex, "#", hour_regex, "#", date_regex, "#", hour_regex, "\\]")),
|
||||||
|
local_segment = str_sub(local_segment, 2, -2)) # get rid of first and last character([])
|
||||||
|
|
||||||
# If there are not features or data to work with, return an empty df with appropiate columns names
|
# If there are not features or data to work with, return an empty df with appropiate columns names
|
||||||
if(length(features_to_compute) == 0)
|
if(length(features_to_compute) == 0)
|
||||||
return(features)
|
return(features)
|
||||||
if(nrow(messages) < 1)
|
if(nrow(messages) < 1)
|
||||||
return(cbind(features, read.csv(text = paste(paste("messages", messages_type, day_segment, features_to_compute, sep = "_"), collapse = ","), stringsAsFactors = FALSE)))
|
return(cbind(features, read.csv(text = paste(paste("messages", messages_type, features_to_compute, sep = "_"), collapse = ","), stringsAsFactors = FALSE)))
|
||||||
|
|
||||||
for(feature_name in features_to_compute){
|
for(feature_name in features_to_compute){
|
||||||
if(feature_name == "countmostfrequentcontact"){
|
if(feature_name == "countmostfrequentcontact"){
|
||||||
|
@ -41,21 +38,21 @@ base_messages_features <- function(messages, messages_type, day_segment, request
|
||||||
pull(trace)
|
pull(trace)
|
||||||
feature <- messages %>%
|
feature <- messages %>%
|
||||||
filter(trace == mostfrequentcontact) %>%
|
filter(trace == mostfrequentcontact) %>%
|
||||||
group_by(local_date) %>%
|
group_by(local_segment) %>%
|
||||||
summarise(!!paste("messages", messages_type, day_segment, feature_name, sep = "_") := n()) %>%
|
summarise(!!paste("messages", messages_type, feature_name, sep = "_") := n()) %>%
|
||||||
replace(is.na(.), 0)
|
replace(is.na(.), 0)
|
||||||
features <- merge(features, feature, by="local_date", all = TRUE)
|
features <- merge(features, feature, by="local_segment", all = TRUE)
|
||||||
} else {
|
} else {
|
||||||
feature <- messages %>%
|
feature <- messages %>%
|
||||||
group_by(local_date)
|
group_by(local_segment)
|
||||||
|
|
||||||
feature <- switch(feature_name,
|
feature <- switch(feature_name,
|
||||||
"count" = feature %>% summarise(!!paste("messages", messages_type, day_segment, feature_name, sep = "_") := n()),
|
"count" = feature %>% summarise(!!paste("messages", messages_type, feature_name, sep = "_") := n()),
|
||||||
"distinctcontacts" = feature %>% summarise(!!paste("messages", messages_type, day_segment, feature_name, sep = "_") := n_distinct(trace)),
|
"distinctcontacts" = feature %>% summarise(!!paste("messages", messages_type, feature_name, sep = "_") := n_distinct(trace)),
|
||||||
"timefirstmessage" = feature %>% summarise(!!paste("messages", messages_type, day_segment, feature_name, sep = "_") := first(local_hour) * 60 + first(local_minute)),
|
"timefirstmessage" = feature %>% summarise(!!paste("messages", messages_type, feature_name, sep = "_") := first(local_hour) * 60 + first(local_minute)),
|
||||||
"timelastmessage" = feature %>% summarise(!!paste("messages", messages_type, day_segment, feature_name, sep = "_") := last(local_hour) * 60 + last(local_minute)))
|
"timelastmessage" = feature %>% summarise(!!paste("messages", messages_type, feature_name, sep = "_") := last(local_hour) * 60 + last(local_minute)))
|
||||||
|
|
||||||
features <- merge(features, feature, by="local_date", all = TRUE)
|
features <- merge(features, feature, by="local_segment", all = TRUE)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
features <- features %>% mutate_at(vars(contains("countmostfrequentcontact")), list( ~ replace_na(., 0)))
|
features <- features %>% mutate_at(vars(contains("countmostfrequentcontact")), list( ~ replace_na(., 0)))
|
||||||
|
|
|
@ -3,18 +3,24 @@
|
||||||
|
|
||||||
source("renv/activate.R")
|
source("renv/activate.R")
|
||||||
source("src/features/messages/messages_base.R")
|
source("src/features/messages/messages_base.R")
|
||||||
library(dplyr, warn.conflicts = FALSE)
|
library("dplyr", warn.conflicts = FALSE)
|
||||||
|
|
||||||
messages <- read.csv(snakemake@input[[1]])
|
messages <- read.csv(snakemake@input[[1]])
|
||||||
day_segment <- snakemake@params[["day_segment"]]
|
day_segments_labels <- read.csv(snakemake@input[["day_segments_labels"]])
|
||||||
requested_features <- snakemake@params[["features"]]
|
requested_features <- snakemake@params[["features"]]
|
||||||
messages_type <- snakemake@params[["messages_type"]]
|
messages_type <- snakemake@params[["messages_type"]]
|
||||||
features <- data.frame(local_date = character(), stringsAsFactors = FALSE)
|
features <- data.frame(local_segment = character(), stringsAsFactors = FALSE)
|
||||||
|
|
||||||
# Compute base SMS features
|
day_segments <- day_segments_labels %>% pull(label)
|
||||||
features <- merge(features, base_messages_features(messages, messages_type, day_segment, requested_features), by="local_date", all = TRUE)
|
for (day_segment in day_segments)
|
||||||
|
features <- merge(features, base_messages_features(messages, messages_type, day_segment, requested_features), all = TRUE)
|
||||||
|
|
||||||
if(ncol(features) != length(requested_features) + 1)
|
if(ncol(features) != length(requested_features) + 1)
|
||||||
stop(paste0("The number of features in the output dataframe (=", ncol(features),") does not match the expected value (=", length(requested_features)," + 1). Verify your Messages (SMS) feature extraction functions"))
|
stop(paste0("The number of features in the output dataframe (=", ncol(features),") does not match the expected value (=", length(requested_features)," + 1). Verify your Messages (SMS) feature extraction functions"))
|
||||||
|
|
||||||
|
features <- features %>% separate(col = local_segment,
|
||||||
|
into = c("local_segment_label", "local_start_date", "local_start_time", "local_end_date", "local_end_time"),
|
||||||
|
sep = "#",
|
||||||
|
remove = FALSE)
|
||||||
|
|
||||||
write.csv(features, snakemake@output[[1]], row.names = FALSE)
|
write.csv(features, snakemake@output[[1]], row.names = FALSE)
|
||||||
|
|
Loading…
Reference in New Issue