From 132311da77348c3b8362b3fe89c4d7afb14e5e62 Mon Sep 17 00:00:00 2001 From: JulioV Date: Mon, 31 Aug 2020 13:34:15 -0400 Subject: [PATCH] Migrate messages to new file structure --- Snakefile | 10 +++-- config.yaml | 24 +++++----- rules/features.smk | 28 ++++++++---- src/features/messages/messages_entry.R | 13 ++++++ src/features/messages/messages_entry.py | 18 ++++++++ .../{messages_base.R => rapids/main.R} | 44 ++++++++++++------- 6 files changed, 98 insertions(+), 39 deletions(-) create mode 100644 src/features/messages/messages_entry.R create mode 100644 src/features/messages/messages_entry.py rename src/features/messages/{messages_base.R => rapids/main.R} (55%) diff --git a/Snakefile b/Snakefile index cb45efc0..865efe43 100644 --- a/Snakefile +++ b/Snakefile @@ -32,10 +32,12 @@ if config["PHONE_VALID_SENSED_DAYS"]["COMPUTE"]: min_valid_hours_per_day=config["PHONE_VALID_SENSED_DAYS"]["MIN_VALID_HOURS_PER_DAY"], min_valid_bins_per_hour=config["PHONE_VALID_SENSED_DAYS"]["MIN_VALID_BINS_PER_HOUR"])) -if config["MESSAGES"]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"])) - files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"])) - files_to_compute.extend(expand("data/processed/{pid}/messages_{messages_type}.csv", pid=config["PIDS"], messages_type = config["MESSAGES"]["TYPES"])) +for provider in config["MESSAGES"]["PROVIDERS"].keys(): + if config["MESSAGES"]["PROVIDERS"][provider]["COMPUTE"]: + files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"])) + files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"])) + files_to_compute.extend(expand("data/interim/{pid}/{sensor_key}_features/{sensor_key}_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["MESSAGES"]["PROVIDERS"][provider]["SRC_LANGUAGE"], provider_key=provider, sensor_key="MESSAGES".lower())) + files_to_compute.extend(expand("data/processed/features/{pid}/{sensor_key}.csv", pid=config["PIDS"], sensor_key="MESSAGES".lower())) for provider in config["CALLS"]["PROVIDERS"].keys(): if config["CALLS"]["PROVIDERS"][provider]["COMPUTE"]: diff --git a/config.yaml b/config.yaml index 671c501d..2fec20ae 100644 --- a/config.yaml +++ b/config.yaml @@ -42,28 +42,30 @@ PHONE_VALID_SENSED_DAYS: # Communication SMS features config, TYPES and FEATURES keys need to match MESSAGES: - COMPUTE: False DB_TABLE: messages - TYPES : [received, sent] - FEATURES: - received: [count, distinctcontacts, timefirstmessage, timelastmessage, countmostfrequentcontact] - sent: [count, distinctcontacts, timefirstmessage, timelastmessage, countmostfrequentcontact] - DAY_SEGMENTS: *day_segments + PROVIDERS: + RAPIDS: + COMPUTE: False + MESSAGES_TYPES : [received, sent] + FEATURES: + received: [count, distinctcontacts, timefirstmessage, timelastmessage, countmostfrequentcontact] + sent: [count, distinctcontacts, timefirstmessage, timelastmessage, countmostfrequentcontact] + SRC_LANGUAGE: "r" + SRC_FOLDER: "rapids" # inside src/features/messages # Communication call features config, TYPES and FEATURES keys need to match CALLS: DB_TABLE: calls PROVIDERS: RAPIDS: - COMPUTE: True + COMPUTE: False CALL_TYPES: [missed, incoming, outgoing] FEATURES: missed: [count, distinctcontacts, timefirstcall, timelastcall, countmostfrequentcontact] incoming: [count, distinctcontacts, meanduration, sumduration, minduration, maxduration, stdduration, modeduration, entropyduration, timefirstcall, timelastcall, countmostfrequentcontact] outgoing: [count, distinctcontacts, meanduration, sumduration, minduration, maxduration, stdduration, modeduration, entropyduration, timefirstcall, timelastcall, countmostfrequentcontact] - DAY_SEGMENTS: *day_segments SRC_LANGUAGE: "r" - SRC_FOLDER: "rapids" + SRC_FOLDER: "rapids" # inside src/features/calls APPLICATION_GENRES: CATALOGUE_SOURCE: FILE # FILE (genres are read from CATALOGUE_FILE) or GOOGLE (genres are scrapped from the Play Store) @@ -87,7 +89,7 @@ LOCATIONS: MAXIMUM_GAP_ALLOWED: 300 MINUTES_DATA_USED: False SAMPLING_FREQUENCY: 0 - SRC_FOLDER: "doryab" + SRC_FOLDER: "doryab" # inside src/features/locations SRC_LANGUAGE: "python" BARNETT: @@ -96,7 +98,7 @@ LOCATIONS: ACCURACY_LIMIT: 51 # meters, drops location coordinates with an accuracy higher than this. This number means there's a 68% probability the true location is within this radius TIMEZONE: *timezone MINUTES_DATA_USED: False # Use this for quality control purposes, how many minutes of data (location coordinates gruped by minute) were used to compute features - SRC_FOLDER: "barnett" + SRC_FOLDER: "barnett" # inside src/features/locations SRC_LANGUAGE: "r" BLUETOOTH: diff --git a/rules/features.smk b/rules/features.smk index fcd3adcf..664f2abb 100644 --- a/rules/features.smk +++ b/rules/features.smk @@ -6,17 +6,29 @@ rule join_features_from_providers: script: "../src/features/join_features_from_providers.R" -rule messages_features: - input: - expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["MESSAGES"]["DB_TABLE"]), - day_segments_labels = expand("data/interim/{sensor}_day_segments_labels.csv", sensor=config["MESSAGES"]["DB_TABLE"]) +rule messages_r_features: + input: + sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["MESSAGES"]["DB_TABLE"]), + day_segments_labels = "data/interim/day_segments_labels.csv" params: - messages_type = "{messages_type}", - features = lambda wildcards: config["MESSAGES"]["FEATURES"][wildcards.messages_type] + provider = lambda wildcards: config["MESSAGES"]["PROVIDERS"][wildcards.provider_key], + provider_key = "{provider_key}" output: - "data/processed/{pid}/messages_{messages_type}.csv" + "data/interim/{pid}/messages_features/messages_r_{provider_key}.csv" script: - "../src/features/messages_features.R" + "../src/features/messages/messages_entry.R" + +rule messages_python_features: + input: + sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["MESSAGES"]["DB_TABLE"]), + day_segments_labels = "data/interim/day_segments_labels.csv" + params: + provider = lambda wildcards: config["MESSAGES"]["PROVIDERS"][wildcards.provider_key], + provider_key = "{provider_key}" + output: + "data/interim/{pid}/messages_features/messages_python_{provider_key}.csv" + script: + "../src/features/messages/messages_entry.py" rule calls_python_features: input: diff --git a/src/features/messages/messages_entry.R b/src/features/messages/messages_entry.R new file mode 100644 index 00000000..63b0fa47 --- /dev/null +++ b/src/features/messages/messages_entry.R @@ -0,0 +1,13 @@ +source("renv/activate.R") +source("src/features/utils/utils.R") +library("dplyr") +library("tidyr") + +sensor_data_file <- snakemake@input[["sensor_data"]] +day_segments_file <- snakemake@input[["day_segments_labels"]] +provider <- snakemake@params["provider"][["provider"]] +provider_key <- snakemake@params["provider_key"] + +sensor_features <- fetch_provider_features(provider, provider_key, "messages", sensor_data_file, day_segments_file) + +write.csv(sensor_features, snakemake@output[[1]], row.names = FALSE) diff --git a/src/features/messages/messages_entry.py b/src/features/messages/messages_entry.py new file mode 100644 index 00000000..ab46b28f --- /dev/null +++ b/src/features/messages/messages_entry.py @@ -0,0 +1,18 @@ +import pandas as pd +from importlib import import_module, util +from pathlib import Path + +# import fetch_provider_features from src/features/utils/utils.py +spec = util.spec_from_file_location("util", str(Path(snakemake.scriptdir).parent / "utils" / "utils.py")) +mod = util.module_from_spec(spec) +spec.loader.exec_module(mod) +fetch_provider_features = getattr(mod, "fetch_provider_features") + +sensor_data_file = snakemake.input["sensor_data"][0] +day_segments_file = snakemake.input["day_segments_labels"] +provider = snakemake.params["provider"] +provider_key = snakemake.params["provider_key"] + +sensor_features = fetch_provider_features(provider, provider_key, "messages", sensor_data_file, day_segments_file) + +sensor_features.to_csv(snakemake.output[0], index=False) \ No newline at end of file diff --git a/src/features/messages/messages_base.R b/src/features/messages/rapids/main.R similarity index 55% rename from src/features/messages/messages_base.R rename to src/features/messages/rapids/main.R index 1d6cecdf..7ec08232 100644 --- a/src/features/messages/messages_base.R +++ b/src/features/messages/rapids/main.R @@ -1,7 +1,7 @@ library('tidyr') library('stringr') -base_messages_features <- function(messages, messages_type, day_segment, requested_features){ +message_features_of_type <- function(messages, messages_type, day_segment, requested_features){ # Output dataframe features = data.frame(local_segment = character(), stringsAsFactors = FALSE) @@ -10,21 +10,12 @@ base_messages_features <- function(messages, messages_type, day_segment, request # The subset of requested features this function can compute features_to_compute <- intersect(base_features_names, requested_features) - - # Filter the rows that belong to day_segment, and put the segment full name in a new column for grouping - date_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2}" - hour_regex = "[0-9]{2}:[0-9]{2}:[0-9]{2}" - messages <- messages %>% - filter(message_type == ifelse(messages_type == "received", "1", ifelse(messages_type == "sent", 2, NA))) %>% - filter(grepl(paste0("\\[", day_segment, "#"),assigned_segments)) %>% - mutate(local_segment = str_extract(assigned_segments, paste0("\\[", day_segment, "#", date_regex, "#", hour_regex, "#", date_regex, "#", hour_regex, "\\]")), - local_segment = str_sub(local_segment, 2, -2)) # get rid of first and last character([]) # If there are not features or data to work with, return an empty df with appropiate columns names if(length(features_to_compute) == 0) return(features) if(nrow(messages) < 1) - return(cbind(features, read.csv(text = paste(paste("messages", messages_type, features_to_compute, sep = "_"), collapse = ","), stringsAsFactors = FALSE))) + return(cbind(features, read.csv(text = paste(paste("messages_rapids", messages_type, features_to_compute, sep = "_"), collapse = ","), stringsAsFactors = FALSE))) for(feature_name in features_to_compute){ if(feature_name == "countmostfrequentcontact"){ @@ -39,7 +30,7 @@ base_messages_features <- function(messages, messages_type, day_segment, request feature <- messages %>% filter(trace == mostfrequentcontact) %>% group_by(local_segment) %>% - summarise(!!paste("messages", messages_type, feature_name, sep = "_") := n()) %>% + summarise(!!paste("messages_rapids", messages_type, feature_name, sep = "_") := n()) %>% replace(is.na(.), 0) features <- merge(features, feature, by="local_segment", all = TRUE) } else { @@ -47,14 +38,35 @@ base_messages_features <- function(messages, messages_type, day_segment, request group_by(local_segment) feature <- switch(feature_name, - "count" = feature %>% summarise(!!paste("messages", messages_type, feature_name, sep = "_") := n()), - "distinctcontacts" = feature %>% summarise(!!paste("messages", messages_type, feature_name, sep = "_") := n_distinct(trace)), - "timefirstmessage" = feature %>% summarise(!!paste("messages", messages_type, feature_name, sep = "_") := first(local_hour) * 60 + first(local_minute)), - "timelastmessage" = feature %>% summarise(!!paste("messages", messages_type, feature_name, sep = "_") := last(local_hour) * 60 + last(local_minute))) + "count" = feature %>% summarise(!!paste("messages_rapids", messages_type, feature_name, sep = "_") := n()), + "distinctcontacts" = feature %>% summarise(!!paste("messages_rapids", messages_type, feature_name, sep = "_") := n_distinct(trace)), + "timefirstmessage" = feature %>% summarise(!!paste("messages_rapids", messages_type, feature_name, sep = "_") := first(local_hour) * 60 + first(local_minute)), + "timelastmessage" = feature %>% summarise(!!paste("messages_rapids", messages_type, feature_name, sep = "_") := last(local_hour) * 60 + last(local_minute))) features <- merge(features, feature, by="local_segment", all = TRUE) } } features <- features %>% mutate_at(vars(contains("countmostfrequentcontact")), list( ~ replace_na(., 0))) return(features) +} + +rapids_features <- function(messages, day_segment, provider){ + messages <- messages %>% filter_data_by_segment(day_segment) + messages_types = provider[["MESSAGES_TYPES"]] + messages_features <- setNames(data.frame(matrix(ncol=1, nrow=0)), c("local_segment")) + + for(message_type in messages_types){ + # Filter rows that belong to the message type and day segment of interest + message_type_label = ifelse(message_type == "received", "1", ifelse(message_type == "sent", "2", NA)) + if(is.na(message_type_label)) + stop(paste("Message type can online be received or sent but instead you typed: ", message_type, " in config[MESSAGES][MESSAGES_TYPES]")) + + requested_features <- provider[["FEATURES"]][[message_type]] + messages_of_type <- messages %>% filter(message_type == message_type_label) + + features <- message_features_of_type(messages_of_type, message_type, day_segment, requested_features) + messages_features <- merge(messages_features, features, all=TRUE) + } + + return(messages_features) } \ No newline at end of file