Migrate messages to new file structure
parent
e269062439
commit
132311da77
10
Snakefile
10
Snakefile
|
@ -32,10 +32,12 @@ if config["PHONE_VALID_SENSED_DAYS"]["COMPUTE"]:
|
|||
min_valid_hours_per_day=config["PHONE_VALID_SENSED_DAYS"]["MIN_VALID_HOURS_PER_DAY"],
|
||||
min_valid_bins_per_hour=config["PHONE_VALID_SENSED_DAYS"]["MIN_VALID_BINS_PER_HOUR"]))
|
||||
|
||||
if config["MESSAGES"]["COMPUTE"]:
|
||||
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"]))
|
||||
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"]))
|
||||
files_to_compute.extend(expand("data/processed/{pid}/messages_{messages_type}.csv", pid=config["PIDS"], messages_type = config["MESSAGES"]["TYPES"]))
|
||||
for provider in config["MESSAGES"]["PROVIDERS"].keys():
|
||||
if config["MESSAGES"]["PROVIDERS"][provider]["COMPUTE"]:
|
||||
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"]))
|
||||
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"]))
|
||||
files_to_compute.extend(expand("data/interim/{pid}/{sensor_key}_features/{sensor_key}_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["MESSAGES"]["PROVIDERS"][provider]["SRC_LANGUAGE"], provider_key=provider, sensor_key="MESSAGES".lower()))
|
||||
files_to_compute.extend(expand("data/processed/features/{pid}/{sensor_key}.csv", pid=config["PIDS"], sensor_key="MESSAGES".lower()))
|
||||
|
||||
for provider in config["CALLS"]["PROVIDERS"].keys():
|
||||
if config["CALLS"]["PROVIDERS"][provider]["COMPUTE"]:
|
||||
|
|
24
config.yaml
24
config.yaml
|
@ -42,28 +42,30 @@ PHONE_VALID_SENSED_DAYS:
|
|||
|
||||
# Communication SMS features config, TYPES and FEATURES keys need to match
|
||||
MESSAGES:
|
||||
COMPUTE: False
|
||||
DB_TABLE: messages
|
||||
TYPES : [received, sent]
|
||||
FEATURES:
|
||||
received: [count, distinctcontacts, timefirstmessage, timelastmessage, countmostfrequentcontact]
|
||||
sent: [count, distinctcontacts, timefirstmessage, timelastmessage, countmostfrequentcontact]
|
||||
DAY_SEGMENTS: *day_segments
|
||||
PROVIDERS:
|
||||
RAPIDS:
|
||||
COMPUTE: False
|
||||
MESSAGES_TYPES : [received, sent]
|
||||
FEATURES:
|
||||
received: [count, distinctcontacts, timefirstmessage, timelastmessage, countmostfrequentcontact]
|
||||
sent: [count, distinctcontacts, timefirstmessage, timelastmessage, countmostfrequentcontact]
|
||||
SRC_LANGUAGE: "r"
|
||||
SRC_FOLDER: "rapids" # inside src/features/messages
|
||||
|
||||
# Communication call features config, TYPES and FEATURES keys need to match
|
||||
CALLS:
|
||||
DB_TABLE: calls
|
||||
PROVIDERS:
|
||||
RAPIDS:
|
||||
COMPUTE: True
|
||||
COMPUTE: False
|
||||
CALL_TYPES: [missed, incoming, outgoing]
|
||||
FEATURES:
|
||||
missed: [count, distinctcontacts, timefirstcall, timelastcall, countmostfrequentcontact]
|
||||
incoming: [count, distinctcontacts, meanduration, sumduration, minduration, maxduration, stdduration, modeduration, entropyduration, timefirstcall, timelastcall, countmostfrequentcontact]
|
||||
outgoing: [count, distinctcontacts, meanduration, sumduration, minduration, maxduration, stdduration, modeduration, entropyduration, timefirstcall, timelastcall, countmostfrequentcontact]
|
||||
DAY_SEGMENTS: *day_segments
|
||||
SRC_LANGUAGE: "r"
|
||||
SRC_FOLDER: "rapids"
|
||||
SRC_FOLDER: "rapids" # inside src/features/calls
|
||||
|
||||
APPLICATION_GENRES:
|
||||
CATALOGUE_SOURCE: FILE # FILE (genres are read from CATALOGUE_FILE) or GOOGLE (genres are scrapped from the Play Store)
|
||||
|
@ -87,7 +89,7 @@ LOCATIONS:
|
|||
MAXIMUM_GAP_ALLOWED: 300
|
||||
MINUTES_DATA_USED: False
|
||||
SAMPLING_FREQUENCY: 0
|
||||
SRC_FOLDER: "doryab"
|
||||
SRC_FOLDER: "doryab" # inside src/features/locations
|
||||
SRC_LANGUAGE: "python"
|
||||
|
||||
BARNETT:
|
||||
|
@ -96,7 +98,7 @@ LOCATIONS:
|
|||
ACCURACY_LIMIT: 51 # meters, drops location coordinates with an accuracy higher than this. This number means there's a 68% probability the true location is within this radius
|
||||
TIMEZONE: *timezone
|
||||
MINUTES_DATA_USED: False # Use this for quality control purposes, how many minutes of data (location coordinates gruped by minute) were used to compute features
|
||||
SRC_FOLDER: "barnett"
|
||||
SRC_FOLDER: "barnett" # inside src/features/locations
|
||||
SRC_LANGUAGE: "r"
|
||||
|
||||
BLUETOOTH:
|
||||
|
|
|
@ -6,17 +6,29 @@ rule join_features_from_providers:
|
|||
script:
|
||||
"../src/features/join_features_from_providers.R"
|
||||
|
||||
rule messages_features:
|
||||
input:
|
||||
expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["MESSAGES"]["DB_TABLE"]),
|
||||
day_segments_labels = expand("data/interim/{sensor}_day_segments_labels.csv", sensor=config["MESSAGES"]["DB_TABLE"])
|
||||
rule messages_r_features:
|
||||
input:
|
||||
sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["MESSAGES"]["DB_TABLE"]),
|
||||
day_segments_labels = "data/interim/day_segments_labels.csv"
|
||||
params:
|
||||
messages_type = "{messages_type}",
|
||||
features = lambda wildcards: config["MESSAGES"]["FEATURES"][wildcards.messages_type]
|
||||
provider = lambda wildcards: config["MESSAGES"]["PROVIDERS"][wildcards.provider_key],
|
||||
provider_key = "{provider_key}"
|
||||
output:
|
||||
"data/processed/{pid}/messages_{messages_type}.csv"
|
||||
"data/interim/{pid}/messages_features/messages_r_{provider_key}.csv"
|
||||
script:
|
||||
"../src/features/messages_features.R"
|
||||
"../src/features/messages/messages_entry.R"
|
||||
|
||||
rule messages_python_features:
|
||||
input:
|
||||
sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["MESSAGES"]["DB_TABLE"]),
|
||||
day_segments_labels = "data/interim/day_segments_labels.csv"
|
||||
params:
|
||||
provider = lambda wildcards: config["MESSAGES"]["PROVIDERS"][wildcards.provider_key],
|
||||
provider_key = "{provider_key}"
|
||||
output:
|
||||
"data/interim/{pid}/messages_features/messages_python_{provider_key}.csv"
|
||||
script:
|
||||
"../src/features/messages/messages_entry.py"
|
||||
|
||||
rule calls_python_features:
|
||||
input:
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
source("renv/activate.R")
|
||||
source("src/features/utils/utils.R")
|
||||
library("dplyr")
|
||||
library("tidyr")
|
||||
|
||||
sensor_data_file <- snakemake@input[["sensor_data"]]
|
||||
day_segments_file <- snakemake@input[["day_segments_labels"]]
|
||||
provider <- snakemake@params["provider"][["provider"]]
|
||||
provider_key <- snakemake@params["provider_key"]
|
||||
|
||||
sensor_features <- fetch_provider_features(provider, provider_key, "messages", sensor_data_file, day_segments_file)
|
||||
|
||||
write.csv(sensor_features, snakemake@output[[1]], row.names = FALSE)
|
|
@ -0,0 +1,18 @@
|
|||
import pandas as pd
|
||||
from importlib import import_module, util
|
||||
from pathlib import Path
|
||||
|
||||
# import fetch_provider_features from src/features/utils/utils.py
|
||||
spec = util.spec_from_file_location("util", str(Path(snakemake.scriptdir).parent / "utils" / "utils.py"))
|
||||
mod = util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
fetch_provider_features = getattr(mod, "fetch_provider_features")
|
||||
|
||||
sensor_data_file = snakemake.input["sensor_data"][0]
|
||||
day_segments_file = snakemake.input["day_segments_labels"]
|
||||
provider = snakemake.params["provider"]
|
||||
provider_key = snakemake.params["provider_key"]
|
||||
|
||||
sensor_features = fetch_provider_features(provider, provider_key, "messages", sensor_data_file, day_segments_file)
|
||||
|
||||
sensor_features.to_csv(snakemake.output[0], index=False)
|
|
@ -1,7 +1,7 @@
|
|||
library('tidyr')
|
||||
library('stringr')
|
||||
|
||||
base_messages_features <- function(messages, messages_type, day_segment, requested_features){
|
||||
message_features_of_type <- function(messages, messages_type, day_segment, requested_features){
|
||||
# Output dataframe
|
||||
features = data.frame(local_segment = character(), stringsAsFactors = FALSE)
|
||||
|
||||
|
@ -10,21 +10,12 @@ base_messages_features <- function(messages, messages_type, day_segment, request
|
|||
|
||||
# The subset of requested features this function can compute
|
||||
features_to_compute <- intersect(base_features_names, requested_features)
|
||||
|
||||
# Filter the rows that belong to day_segment, and put the segment full name in a new column for grouping
|
||||
date_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2}"
|
||||
hour_regex = "[0-9]{2}:[0-9]{2}:[0-9]{2}"
|
||||
messages <- messages %>%
|
||||
filter(message_type == ifelse(messages_type == "received", "1", ifelse(messages_type == "sent", 2, NA))) %>%
|
||||
filter(grepl(paste0("\\[", day_segment, "#"),assigned_segments)) %>%
|
||||
mutate(local_segment = str_extract(assigned_segments, paste0("\\[", day_segment, "#", date_regex, "#", hour_regex, "#", date_regex, "#", hour_regex, "\\]")),
|
||||
local_segment = str_sub(local_segment, 2, -2)) # get rid of first and last character([])
|
||||
|
||||
# If there are not features or data to work with, return an empty df with appropiate columns names
|
||||
if(length(features_to_compute) == 0)
|
||||
return(features)
|
||||
if(nrow(messages) < 1)
|
||||
return(cbind(features, read.csv(text = paste(paste("messages", messages_type, features_to_compute, sep = "_"), collapse = ","), stringsAsFactors = FALSE)))
|
||||
return(cbind(features, read.csv(text = paste(paste("messages_rapids", messages_type, features_to_compute, sep = "_"), collapse = ","), stringsAsFactors = FALSE)))
|
||||
|
||||
for(feature_name in features_to_compute){
|
||||
if(feature_name == "countmostfrequentcontact"){
|
||||
|
@ -39,7 +30,7 @@ base_messages_features <- function(messages, messages_type, day_segment, request
|
|||
feature <- messages %>%
|
||||
filter(trace == mostfrequentcontact) %>%
|
||||
group_by(local_segment) %>%
|
||||
summarise(!!paste("messages", messages_type, feature_name, sep = "_") := n()) %>%
|
||||
summarise(!!paste("messages_rapids", messages_type, feature_name, sep = "_") := n()) %>%
|
||||
replace(is.na(.), 0)
|
||||
features <- merge(features, feature, by="local_segment", all = TRUE)
|
||||
} else {
|
||||
|
@ -47,14 +38,35 @@ base_messages_features <- function(messages, messages_type, day_segment, request
|
|||
group_by(local_segment)
|
||||
|
||||
feature <- switch(feature_name,
|
||||
"count" = feature %>% summarise(!!paste("messages", messages_type, feature_name, sep = "_") := n()),
|
||||
"distinctcontacts" = feature %>% summarise(!!paste("messages", messages_type, feature_name, sep = "_") := n_distinct(trace)),
|
||||
"timefirstmessage" = feature %>% summarise(!!paste("messages", messages_type, feature_name, sep = "_") := first(local_hour) * 60 + first(local_minute)),
|
||||
"timelastmessage" = feature %>% summarise(!!paste("messages", messages_type, feature_name, sep = "_") := last(local_hour) * 60 + last(local_minute)))
|
||||
"count" = feature %>% summarise(!!paste("messages_rapids", messages_type, feature_name, sep = "_") := n()),
|
||||
"distinctcontacts" = feature %>% summarise(!!paste("messages_rapids", messages_type, feature_name, sep = "_") := n_distinct(trace)),
|
||||
"timefirstmessage" = feature %>% summarise(!!paste("messages_rapids", messages_type, feature_name, sep = "_") := first(local_hour) * 60 + first(local_minute)),
|
||||
"timelastmessage" = feature %>% summarise(!!paste("messages_rapids", messages_type, feature_name, sep = "_") := last(local_hour) * 60 + last(local_minute)))
|
||||
|
||||
features <- merge(features, feature, by="local_segment", all = TRUE)
|
||||
}
|
||||
}
|
||||
features <- features %>% mutate_at(vars(contains("countmostfrequentcontact")), list( ~ replace_na(., 0)))
|
||||
return(features)
|
||||
}
|
||||
|
||||
rapids_features <- function(messages, day_segment, provider){
|
||||
messages <- messages %>% filter_data_by_segment(day_segment)
|
||||
messages_types = provider[["MESSAGES_TYPES"]]
|
||||
messages_features <- setNames(data.frame(matrix(ncol=1, nrow=0)), c("local_segment"))
|
||||
|
||||
for(message_type in messages_types){
|
||||
# Filter rows that belong to the message type and day segment of interest
|
||||
message_type_label = ifelse(message_type == "received", "1", ifelse(message_type == "sent", "2", NA))
|
||||
if(is.na(message_type_label))
|
||||
stop(paste("Message type can online be received or sent but instead you typed: ", message_type, " in config[MESSAGES][MESSAGES_TYPES]"))
|
||||
|
||||
requested_features <- provider[["FEATURES"]][[message_type]]
|
||||
messages_of_type <- messages %>% filter(message_type == message_type_label)
|
||||
|
||||
features <- message_features_of_type(messages_of_type, message_type, day_segment, requested_features)
|
||||
messages_features <- merge(messages_features, features, all=TRUE)
|
||||
}
|
||||
|
||||
return(messages_features)
|
||||
}
|
Loading…
Reference in New Issue