Simplify window workflow

pull/103/head
JulioV 2020-07-23 12:00:51 -04:00
parent a0b5b5982b
commit 026d056378
11 changed files with 285 additions and 77 deletions

1
.gitignore vendored
View File

@ -107,5 +107,6 @@ reports/
.RData
.Rhistory
sn_profile_*/
!sn_profile_rapids
settings.dcf
tests/fakedata_generation/

View File

@ -4,20 +4,7 @@ PIDS: [t01]
# Global var with common day segments
DAY_SEGMENTS: &day_segments
[daily, morning, afternoon, evening, night]
DAY_SEGMENTS2: &day_segments2
# Day segments can be computed based on three strategies
# Frequency based: Set SEGMENTS to a number representing the length of a segment in minutes: 15. Every day will be divided in n segments of SEGMENTS minutes starting at midnight.
# Interval based: Set SEGMENTS to a string containing a JSON array with an element for each segment containing a label, and start and end time in 24 hour format.
# For example: '{"daily": {"00:00", "23:59"}, "morning": {"06:00", "11:59"}}'. Note the string is single quoted and each value double quoted.
# Event based: Set SEGMENTS to a string with a path to a csv file with two columns, a unix timestamp column in milliseconds called "timestamp" and a string column called "label".
# Every row represents a meaningful event around which features will be extracted, each label should be unique. See EVENT_TIME_SHIFT and EVENT_SEGMENT_DURATION
# If you want daily features, create a segment with label "daily". DO NOT use "daily" to label any other segment
# ------------------------------------------------------------------------------
SEGMENTS: '[["daily", "00:00", "23:59"], ["morning", "06:00", "11:59"], ["evening", "18:00", "23:59"]]'
EVENT_TIME_SHIFT: 0 # Postive or negative number of minutes. A day segment will start EVENT_TIME_SHIFT minutes before or after each meaningful event. Only used if SEGMENTS is a valid event file (see above).
EVENT_SEGMENT_DURATION: 60 # Lengh of every day_segment around each meaningful event. Only used if SEGMENTS is a valid event file (see above).
"data/external/daysegments_default.csv"
# Global timezone
# Use codes from https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
@ -110,7 +97,7 @@ DORYAB_LOCATION:
BLUETOOTH:
COMPUTE: False
DB_TABLE: bluetooth
DAY_SEGMENTS: *day_segments2
DAY_SEGMENTS: *day_segments
FEATURES: ["countscans", "uniquedevices", "countscansmostuniquedevice"]
ACTIVITY_RECOGNITION:

View File

@ -111,29 +111,11 @@ def optional_heatmap_days_by_sensors_input(wildcards):
tables_platform = [table for table in config["HEATMAP_DAYS_BY_SENSORS"]["DB_TABLES"] if table not in [config["CONVERSATION"]["DB_TABLE"]["ANDROID"], config["ACTIVITY_RECOGNITION"]["DB_TABLE"]["ANDROID"]]] # for ios, discard any android tables that may exist
return expand("data/raw/{{pid}}/{table}_with_datetime.csv", table = tables_platform)
def optional_day_segments_input(wildcards):
return []
def find_day_segments_argument(wildcards, argument):
def find_day_segments_input_file(wildcards):
for key, values in config.items():
if "DAY_SEGMENTS" in config[key] and "DB_TABLE" in config[key] and config[key]["DB_TABLE"] == wildcards.sensor:
return config[key]["DAY_SEGMENTS"][argument]
def hash_day_segments(config_section):
# TODO hash the content of the interval file instead of SEGMENTS when SEGMENTS is a path
return hashlib.sha1(config_section["SEGMENTS"].encode('utf-8')).hexdigest()
def is_valid_day_segment_configuration(sensor, config_section):
if not (isinstance(config_section, collections.OrderedDict) or isinstance(config_section, dict)):
raise ValueError("The DAY_SEGMENTS parameter in the {} config section should be a dictionary with three parameters: SEGMENTS (str), EVENT_TIME_SHIFT (int), and EVENT_SEGMENT_DURATION (int)".format(sensor))
for attribute in ["SEGMENTS", "EVENT_TIME_SHIFT", "EVENT_SEGMENT_DURATION"]:
if not attribute in config_section:
raise ValueError("The config[{}][DAY_SEGMENTS] section should have an attribute named {}".format(sensor, attribute))
if not isinstance(config_section["SEGMENTS"], str):
raise ValueError("The config[{}][DAY_SEGMENTS][SEGMENTS] variable should be a string".format(sensor))
if not isinstance(config_section["EVENT_TIME_SHIFT"], int):
raise ValueError("The config[{}][DAY_SEGMENTS][EVENT_TIME_SHIFT] variable should be an integer".format(sensor))
if not isinstance(config_section["EVENT_SEGMENT_DURATION"], int):
raise ValueError("The config[{}][DAY_SEGMENTS][EVENT_SEGMENT_DURATION] variable should be an integer".format(sensor))
return True
if "DB_TABLE" in config[key] and config[key]["DB_TABLE"] == wildcards.sensor:
if "DAY_SEGMENTS" in config[key]:
return config[key]["DAY_SEGMENTS"]
else:
raise ValueError("{} should have a DAY_SEGMENTS parameter containing the path to its day segments file".format(wildcards.sensor))

View File

@ -88,12 +88,12 @@ rule location_doryab_features:
rule bluetooth_features:
input:
expand("data/raw/{{pid}}/{sensor}_with_datetime_{{hash}}.csv", sensor=config["BLUETOOTH"]["DB_TABLE"]),
day_segments = expand("data/interim/{{pid}}/{sensor}_day_segments_{{hash}}.csv", sensor=config["BLUETOOTH"]["DB_TABLE"])
expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["BLUETOOTH"]["DB_TABLE"]),
day_segments = expand("data/interim/{sensor}_day_segments.csv", sensor=config["BLUETOOTH"]["DB_TABLE"])
params:
features = config["BLUETOOTH"]["FEATURES"]
output:
"data/processed/{pid}/bluetooth_{hash}.csv"
"data/processed/{pid}/bluetooth_features.csv"
script:
"../src/features/bluetooth_features.R"
@ -192,12 +192,12 @@ rule applications_foreground_features:
rule wifi_features:
input:
unpack(optional_wifi_input)
expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["WIFI"]["DB_TABLE"]),
day_segments = expand("data/interim/{sensor}_day_segments.csv", sensor=config["WIFI"]["DB_TABLE"])
params:
day_segment = "{day_segment}",
features = config["WIFI"]["FEATURES"]
output:
"data/processed/{pid}/wifi_{day_segment}.csv"
"data/processed/{pid}/wifi_features.csv"
script:
"../src/features/wifi_features.R"

View File

@ -40,13 +40,9 @@ rule download_dataset:
rule compute_day_segments:
input:
optional_day_segments_input,
params:
segments = lambda wildcards: find_day_segments_argument(wildcards, "SEGMENTS"),
event_time_shift = lambda wildcards: find_day_segments_argument(wildcards, "EVENT_TIME_SHIFT"),
event_segment_duration = lambda wildcards: find_day_segments_argument(wildcards, "EVENT_SEGMENT_DURATION"),
find_day_segments_input_file
output:
"data/interim/{pid}/{sensor}_day_segments_{hash}.csv"
segments_file = "data/interim/{sensor}_day_segments.csv",
script:
"../src/data/compute_day_segments.py"
@ -63,14 +59,14 @@ if len(config["WIFI"]["DB_TABLE"]["CONNECTED_ACCESS_POINTS"]) > 0:
rule readable_datetime:
input:
sensor_input = "data/raw/{pid}/{sensor}_raw.csv",
day_segments = "data/interim/{pid}/{sensor}_day_segments_{hash}.csv"
day_segments = "data/interim/{sensor}_day_segments.csv"
params:
timezones = None,
fixed_timezone = config["READABLE_DATETIME"]["FIXED_TIMEZONE"]
wildcard_constraints:
sensor = '.*(' + '|'.join([re.escape(x) for x in PHONE_SENSORS]) + ').*' # only process smartphone sensors, not fitbit
output:
"data/raw/{pid}/{sensor}_with_datetime_{hash}.csv"
"data/raw/{pid}/{sensor}_with_datetime.csv"
script:
"../src/data/readable_datetime.R"

View File

@ -0,0 +1,231 @@
import itertools
import hashlib
import collections
configfile: "config.yaml"
include: "../rules/common.smk"
include: "../rules/renv.snakefile"
include: "../rules/preprocessing.snakefile"
include: "../rules/features.snakefile"
include: "../rules/models.snakefile"
include: "../rules/reports.snakefile"
include: "../rules/mystudy.snakefile" # You can add snakfiles with rules tailored to your project
if len(config["PIDS"]) == 0:
raise ValueError("Add participants IDs to PIDS in config.yaml. Remember to create their participant files in data/external")
files_to_compute = []
if config["PHONE_VALID_SENSED_BINS"]["COMPUTE"]:
if len(config["PHONE_VALID_SENSED_BINS"]["TABLES"]) == 0:
raise ValueError("If you want to compute PHONE_VALID_SENSED_BINS, you need to add at least one table to [PHONE_VALID_SENSED_BINS][TABLES] in config.yaml")
files_to_compute.extend(expand("data/interim/{pid}/phone_sensed_bins.csv", pid=config["PIDS"]))
if config["PHONE_VALID_SENSED_DAYS"]["COMPUTE"]:
if len(config["PHONE_VALID_SENSED_BINS"]["TABLES"]) == 0:
raise ValueError("If you want to compute PHONE_VALID_SENSED_DAYS, you need to add at least one table to [PHONE_VALID_SENSED_BINS][TABLES] in config.yaml")
files_to_compute.extend(expand("data/interim/{pid}/phone_sensed_bins.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/phone_valid_sensed_days.csv", pid=config["PIDS"]))
if config["MESSAGES"]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["MESSAGES"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/messages_{messages_type}_{day_segment}.csv", pid=config["PIDS"], messages_type = config["MESSAGES"]["TYPES"], day_segment = config["MESSAGES"]["DAY_SEGMENTS"]))
if config["CALLS"]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime_unified.csv", pid=config["PIDS"], sensor=config["CALLS"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/calls_{call_type}_{day_segment}.csv", pid=config["PIDS"], call_type=config["CALLS"]["TYPES"], day_segment = config["CALLS"]["DAY_SEGMENTS"]))
if config["BARNETT_LOCATION"]["COMPUTE"]:
# TODO add files_to_compute.extend(optional_location_input(None))
if config["BARNETT_LOCATION"]["LOCATIONS_TO_USE"] == "RESAMPLE_FUSED":
if config["BARNETT_LOCATION"]["DB_TABLE"] in config["PHONE_VALID_SENSED_BINS"]["TABLES"]:
files_to_compute.extend(expand("data/interim/{pid}/phone_sensed_bins.csv", pid=config["PIDS"]))
else:
raise ValueError("Error: Add your locations table (and as many sensor tables as you have) to [PHONE_VALID_SENSED_BINS][TABLES] in config.yaml. This is necessary to compute phone_sensed_bins (bins of time when the smartphone was sensing data) which is used to resample fused location data (RESAMPLED_FUSED)")
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["BARNETT_LOCATION"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["BARNETT_LOCATION"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/location_barnett_{day_segment}.csv", pid=config["PIDS"], day_segment = config["BARNETT_LOCATION"]["DAY_SEGMENTS"]))
if config["BLUETOOTH"]["COMPUTE"]:
files_to_compute.extend(expand("data/interim/{sensor}_day_segments.csv", sensor=config["BLUETOOTH"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["BLUETOOTH"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["BLUETOOTH"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/bluetooth_features.csv", pid=config["PIDS"] ))
if config["ACTIVITY_RECOGNITION"]["COMPUTE"]:
# TODO add files_to_compute.extend(optional_ar_input(None)), the Android or iOS table gets processed depending on each participant
files_to_compute.extend(expand("data/processed/{pid}/activity_recognition_{day_segment}.csv",pid=config["PIDS"], day_segment = config["ACTIVITY_RECOGNITION"]["DAY_SEGMENTS"]))
if config["BATTERY"]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["BATTERY"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["BATTERY"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime_unified.csv", pid=config["PIDS"], sensor=config["BATTERY"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/battery_deltas.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/processed/{pid}/battery_{day_segment}.csv", pid = config["PIDS"], day_segment = config["BATTERY"]["DAY_SEGMENTS"]))
if config["SCREEN"]["COMPUTE"]:
if config["SCREEN"]["DB_TABLE"] in config["PHONE_VALID_SENSED_BINS"]["TABLES"]:
files_to_compute.extend(expand("data/interim/{pid}/phone_sensed_bins.csv", pid=config["PIDS"]))
else:
raise ValueError("Error: Add your screen table (and as many sensor tables as you have) to [PHONE_VALID_SENSED_BINS][TABLES] in config.yaml. This is necessary to compute phone_sensed_bins (bins of time when the smartphone was sensing data)")
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["SCREEN"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["SCREEN"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime_unified.csv", pid=config["PIDS"], sensor=config["SCREEN"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/screen_deltas.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/processed/{pid}/screen_{day_segment}.csv", pid = config["PIDS"], day_segment = config["SCREEN"]["DAY_SEGMENTS"]))
if config["LIGHT"]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["LIGHT"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["LIGHT"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/light_{day_segment}.csv", pid = config["PIDS"], day_segment = config["LIGHT"]["DAY_SEGMENTS"]))
if config["ACCELEROMETER"]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["ACCELEROMETER"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["ACCELEROMETER"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/accelerometer_{day_segment}.csv", pid = config["PIDS"], day_segment = config["ACCELEROMETER"]["DAY_SEGMENTS"]))
if config["APPLICATIONS_FOREGROUND"]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["APPLICATIONS_FOREGROUND"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["APPLICATIONS_FOREGROUND"]["DB_TABLE"]))
files_to_compute.extend(expand("data/interim/{pid}/{sensor}_with_datetime_with_genre.csv", pid=config["PIDS"], sensor=config["APPLICATIONS_FOREGROUND"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/applications_foreground_{day_segment}.csv", pid = config["PIDS"], day_segment = config["APPLICATIONS_FOREGROUND"]["DAY_SEGMENTS"]))
if config["WIFI"]["COMPUTE"]:
files_to_compute.extend(expand("data/interim/{sensor}_day_segments.csv", sensor=config["WIFI"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["WIFI"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["WIFI"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/wifi_features.csv", pid = config["PIDS"], day_segment = config["WIFI"]["DAY_SEGMENTS"]))
if config["HEARTRATE"]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["HEARTRATE"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"]))
files_to_compute.extend(expand("data/processed/{pid}/fitbit_heartrate_{day_segment}.csv", pid = config["PIDS"], day_segment = config["HEARTRATE"]["DAY_SEGMENTS"]))
if config["STEP"]["COMPUTE"]:
if config["STEP"]["EXCLUDE_SLEEP"]["EXCLUDE"] == True and config["STEP"]["EXCLUDE_SLEEP"]["TYPE"] == "FITBIT_BASED":
files_to_compute.extend(expand("data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["summary"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["STEP"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/fitbit_step_{fitbit_data_type}_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["intraday"]))
files_to_compute.extend(expand("data/processed/{pid}/fitbit_step_{day_segment}.csv", pid = config["PIDS"], day_segment = config["STEP"]["DAY_SEGMENTS"]))
if config["SLEEP"]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["SLEEP"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["intraday", "summary"]))
files_to_compute.extend(expand("data/processed/{pid}/fitbit_sleep_{day_segment}.csv", pid = config["PIDS"], day_segment = config["SLEEP"]["DAY_SEGMENTS"]))
if config["CONVERSATION"]["COMPUTE"]:
# TODO add files_to_compute.extend(optional_conversation_input(None)), the Android or iOS table gets processed depending on each participant
files_to_compute.extend(expand("data/processed/{pid}/conversation_{day_segment}.csv",pid=config["PIDS"], day_segment = config["CONVERSATION"]["DAY_SEGMENTS"]))
if config["DORYAB_LOCATION"]["COMPUTE"]:
if config["DORYAB_LOCATION"]["LOCATIONS_TO_USE"] == "RESAMPLE_FUSED":
if config["DORYAB_LOCATION"]["DB_TABLE"] in config["PHONE_VALID_SENSED_BINS"]["TABLES"]:
files_to_compute.extend(expand("data/interim/{pid}/phone_sensed_bins.csv", pid=config["PIDS"]))
else:
raise ValueError("Error: Add your locations table (and as many sensor tables as you have) to [PHONE_VALID_SENSED_BINS][TABLES] in config.yaml. This is necessary to compute phone_sensed_bins (bins of time when the smartphone was sensing data) which is used to resample fused location data (RESAMPLED_FUSED)")
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["DORYAB_LOCATION"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["DORYAB_LOCATION"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/location_doryab_{segment}.csv", pid=config["PIDS"], segment = config["DORYAB_LOCATION"]["DAY_SEGMENTS"]))
if config["PARAMS_FOR_ANALYSIS"]["COMPUTE"]:
rows_nan_threshold = config["PARAMS_FOR_ANALYSIS"]["ROWS_NAN_THRESHOLD"]
cols_nan_threshold = config["PARAMS_FOR_ANALYSIS"]["COLS_NAN_THRESHOLD"]
models, scalers, rows_nan_thresholds, cols_nan_thresholds = [], [], [], []
for model_name in config["PARAMS_FOR_ANALYSIS"]["MODEL_NAMES"]:
models = models + [model_name] * len(config["PARAMS_FOR_ANALYSIS"]["MODEL_SCALER"][model_name]) * len(rows_nan_threshold)
scalers = scalers + config["PARAMS_FOR_ANALYSIS"]["MODEL_SCALER"][model_name] * len(rows_nan_threshold)
rows_nan_thresholds = rows_nan_thresholds + list(itertools.chain.from_iterable([threshold] * len(config["PARAMS_FOR_ANALYSIS"]["MODEL_SCALER"][model_name]) for threshold in rows_nan_threshold))
cols_nan_thresholds = cols_nan_thresholds + list(itertools.chain.from_iterable([threshold] * len(config["PARAMS_FOR_ANALYSIS"]["MODEL_SCALER"][model_name]) for threshold in cols_nan_threshold))
results = config["PARAMS_FOR_ANALYSIS"]["RESULT_COMPONENTS"] + ["merged_population_model_results"]
files_to_compute.extend(expand("data/processed/{pid}/data_for_individual_model/{source}_{day_segment}_original.csv",
pid = config["PIDS"],
source = config["PARAMS_FOR_ANALYSIS"]["SOURCES"],
day_segment = config["PARAMS_FOR_ANALYSIS"]["DAY_SEGMENTS"]))
files_to_compute.extend(expand("data/processed/data_for_population_model/{source}_{day_segment}_original.csv",
source = config["PARAMS_FOR_ANALYSIS"]["SOURCES"],
day_segment = config["PARAMS_FOR_ANALYSIS"]["DAY_SEGMENTS"]))
files_to_compute.extend(expand(
expand("data/processed/{pid}/data_for_individual_model/{{rows_nan_threshold}}|{{cols_nan_threshold}}_{days_before_threshold}|{days_after_threshold}_{cols_var_threshold}/{source}_{day_segment}_clean.csv",
pid = config["PIDS"],
days_before_threshold = config["PARAMS_FOR_ANALYSIS"]["PARTICIPANT_DAYS_BEFORE_THRESHOLD"],
days_after_threshold = config["PARAMS_FOR_ANALYSIS"]["PARTICIPANT_DAYS_AFTER_THRESHOLD"],
cols_var_threshold = config["PARAMS_FOR_ANALYSIS"]["COLS_VAR_THRESHOLD"],
source = config["PARAMS_FOR_ANALYSIS"]["SOURCES"],
day_segment = config["PARAMS_FOR_ANALYSIS"]["DAY_SEGMENTS"]),
zip,
rows_nan_threshold = config["PARAMS_FOR_ANALYSIS"]["ROWS_NAN_THRESHOLD"],
cols_nan_threshold = config["PARAMS_FOR_ANALYSIS"]["COLS_NAN_THRESHOLD"]))
files_to_compute.extend(expand(
expand("data/processed/data_for_population_model/{{rows_nan_threshold}}|{{cols_nan_threshold}}_{days_before_threshold}|{days_after_threshold}_{cols_var_threshold}/{source}_{day_segment}_clean.csv",
days_before_threshold = config["PARAMS_FOR_ANALYSIS"]["PARTICIPANT_DAYS_BEFORE_THRESHOLD"],
days_after_threshold = config["PARAMS_FOR_ANALYSIS"]["PARTICIPANT_DAYS_AFTER_THRESHOLD"],
cols_var_threshold = config["PARAMS_FOR_ANALYSIS"]["COLS_VAR_THRESHOLD"],
source = config["PARAMS_FOR_ANALYSIS"]["SOURCES"],
day_segment = config["PARAMS_FOR_ANALYSIS"]["DAY_SEGMENTS"]),
zip,
rows_nan_threshold = config["PARAMS_FOR_ANALYSIS"]["ROWS_NAN_THRESHOLD"],
cols_nan_threshold = config["PARAMS_FOR_ANALYSIS"]["COLS_NAN_THRESHOLD"]))
files_to_compute.extend(expand("data/processed/data_for_population_model/demographic_features.csv"))
files_to_compute.extend(expand("data/processed/data_for_population_model/targets_{summarised}.csv",
summarised = config["PARAMS_FOR_ANALYSIS"]["SUMMARISED"]))
files_to_compute.extend(expand(
expand("data/processed/data_for_population_model/{{rows_nan_threshold}}|{{cols_nan_threshold}}_{days_before_threshold}|{days_after_threshold}_{cols_var_threshold}/{source}_{day_segment}_nancellsratio.csv",
days_before_threshold = config["PARAMS_FOR_ANALYSIS"]["PARTICIPANT_DAYS_BEFORE_THRESHOLD"],
days_after_threshold = config["PARAMS_FOR_ANALYSIS"]["PARTICIPANT_DAYS_AFTER_THRESHOLD"],
cols_var_threshold = config["PARAMS_FOR_ANALYSIS"]["COLS_VAR_THRESHOLD"],
source = config["PARAMS_FOR_ANALYSIS"]["SOURCES"],
day_segment = config["PARAMS_FOR_ANALYSIS"]["DAY_SEGMENTS"]),
zip,
rows_nan_threshold = config["PARAMS_FOR_ANALYSIS"]["ROWS_NAN_THRESHOLD"],
cols_nan_threshold = config["PARAMS_FOR_ANALYSIS"]["COLS_NAN_THRESHOLD"]))
files_to_compute.extend(expand(
expand("data/processed/data_for_population_model/{{rows_nan_threshold}}|{{cols_nan_threshold}}_{days_before_threshold}|{days_after_threshold}_{cols_var_threshold}/{source}_{day_segment}_{summarised}.csv",
days_before_threshold = config["PARAMS_FOR_ANALYSIS"]["PARTICIPANT_DAYS_BEFORE_THRESHOLD"],
days_after_threshold = config["PARAMS_FOR_ANALYSIS"]["PARTICIPANT_DAYS_AFTER_THRESHOLD"],
cols_var_threshold = config["PARAMS_FOR_ANALYSIS"]["COLS_VAR_THRESHOLD"],
source = config["PARAMS_FOR_ANALYSIS"]["SOURCES"],
day_segment = config["PARAMS_FOR_ANALYSIS"]["DAY_SEGMENTS"],
summarised = config["PARAMS_FOR_ANALYSIS"]["SUMMARISED"]),
zip,
rows_nan_threshold = config["PARAMS_FOR_ANALYSIS"]["ROWS_NAN_THRESHOLD"],
cols_nan_threshold = config["PARAMS_FOR_ANALYSIS"]["COLS_NAN_THRESHOLD"]))
files_to_compute.extend(expand(
expand("data/processed/output_population_model/{{rows_nan_threshold}}|{{cols_nan_threshold}}_{days_before_threshold}|{days_after_threshold}_{cols_var_threshold}/{source}_{day_segment}_{summarised}_{cv_method}_baseline.csv",
days_before_threshold = config["PARAMS_FOR_ANALYSIS"]["PARTICIPANT_DAYS_BEFORE_THRESHOLD"],
days_after_threshold = config["PARAMS_FOR_ANALYSIS"]["PARTICIPANT_DAYS_AFTER_THRESHOLD"],
cols_var_threshold = config["PARAMS_FOR_ANALYSIS"]["COLS_VAR_THRESHOLD"],
cv_method = config["PARAMS_FOR_ANALYSIS"]["CV_METHODS"],
source = config["PARAMS_FOR_ANALYSIS"]["SOURCES"],
day_segment = config["PARAMS_FOR_ANALYSIS"]["DAY_SEGMENTS"],
summarised = config["PARAMS_FOR_ANALYSIS"]["SUMMARISED"]),
zip,
rows_nan_threshold = config["PARAMS_FOR_ANALYSIS"]["ROWS_NAN_THRESHOLD"],
cols_nan_threshold = config["PARAMS_FOR_ANALYSIS"]["COLS_NAN_THRESHOLD"]))
files_to_compute.extend(expand(
expand("data/processed/output_population_model/{{rows_nan_threshold}}|{{cols_nan_threshold}}_{days_before_threshold}|{days_after_threshold}_{cols_var_threshold}/{{model}}/{cv_method}/{source}_{day_segment}_{summarised}_{{scaler}}/{result}.csv",
days_before_threshold = config["PARAMS_FOR_ANALYSIS"]["PARTICIPANT_DAYS_BEFORE_THRESHOLD"],
days_after_threshold = config["PARAMS_FOR_ANALYSIS"]["PARTICIPANT_DAYS_AFTER_THRESHOLD"],
cols_var_threshold = config["PARAMS_FOR_ANALYSIS"]["COLS_VAR_THRESHOLD"],
cv_method = config["PARAMS_FOR_ANALYSIS"]["CV_METHODS"],
source = config["PARAMS_FOR_ANALYSIS"]["SOURCES"],
day_segment = config["PARAMS_FOR_ANALYSIS"]["DAY_SEGMENTS"],
summarised = config["PARAMS_FOR_ANALYSIS"]["SUMMARISED"],
result = results),
zip,
rows_nan_threshold = rows_nan_thresholds,
cols_nan_threshold = cols_nan_thresholds,
model = models,
scaler = scalers))
rule all:
input:
files_to_compute
rule clean:
shell:
"rm -rf data/raw/* && rm -rf data/interim/* && rm -rf data/processed/* && rm -rf reports/figures/* && rm -rf reports/*.zip && rm -rf reports/compliance/*"

View File

@ -0,0 +1,5 @@
configfile: ./sn_profile_rapids/pipeline_config.yaml
directory: ./
snakefile: ./sn_profile_rapids/Snakefile
cores: 1
# forcerun: compute_day_segments

View File

@ -0,0 +1,8 @@
PIDS: [t01]
DOWNLOAD_DATASET:
GROUP: RAPIDS
BLUETOOTH:
COMPUTE: True
DAY_SEGMENTS: "data/external/daysegments_bluetooth.csv"
WIFI:
COMPUTE: True

View File

@ -1,25 +1,19 @@
import pandas as pd
import json
def parse_day_segments(segments, event_time_shift, event_segment_duration):
# Temporal code to parse segments, should substitute with the code to parse
# frequencies, intervals, and events
data = json.loads(segments)
label = []
start = []
end = []
for d in data:
label.append(d[0])
start.append(d[1])
end.append(d[2])
day_segments = pd.DataFrame(list(zip([1]*len(label), start, end, label)), columns =['local_date','start_time','end_time','label'])
def parse_day_segments(day_segments):
# Add code to parse frequencies, intervals, and events
# Expected formats:
# Frequency: label, length columns (e.g. my_prefix, 5) length has to be in minutes (int)
# Interval: label, start, end columns (e.g. daily, 00:00, 23:59) start and end should be valid hours in 24 hour format
# Event: label, timestamp, length, shift (e.g., survey1, 1532313215463, 60, -30), timestamp is a UNIX timestamp in ms, length is in minutes (int), shift is in minutes (+/-int) and is add/substracted from timestamp
# Our output should have local_date, start_time, end_time, label. In the readable_datetime script, If local_date has the same value for all rows, every segment will be applied for all days, otherwise each segment will be applied only to its local_date
day_segments["local_date"] = 1
day_segments = day_segments.rename(columns={"start": "start_time", "end":"end_time"})
return day_segments
##########################
segments = snakemake.params["segments"]
event_time_shift = snakemake.params["event_time_shift"]
event_segment_duration = snakemake.params["event_segment_duration"]
day_segments = parse_day_segments(segments, event_time_shift, event_segment_duration)
day_segments.to_csv(snakemake.output[0], index=False)
day_segments = pd.read_csv(snakemake.input[0])
day_segments = parse_day_segments(day_segments)
day_segments.to_csv(snakemake.output["segments_file"], index=False)

View File

@ -1,7 +1,7 @@
library(dplyr)
filter_by_day_segment <- function(data, day_segment) {
if(day_segment %in% c("morning", "afternoon", "evening", "night"))
if(day_segment != "daily")
data <- data %>% filter(local_day_segment == day_segment)
return(data %>% group_by(local_date))

View File

@ -16,14 +16,18 @@ if(!is.null(snakemake@input[["visible_access_points"]]) && is.null(snakemake@inp
wifi_data <- bind_rows(visible_access_points, connected_access_points) %>% arrange(timestamp)
}
day_segment <- snakemake@params[["day_segment"]]
wifi_data <- read.csv(snakemake@input[[1]], stringsAsFactors = FALSE)
day_segments <- read.csv(snakemake@input[["day_segments"]])
requested_features <- snakemake@params[["features"]]
features = data.frame(local_date = character(), stringsAsFactors = FALSE)
# Compute base wifi features
features <- merge(features, base_wifi_features(wifi_data, day_segment, requested_features), by="local_date", all = TRUE)
if(ncol(features) != length(requested_features) + 1)
day_segments <- day_segments %>% distinct(label) %>% pull(label)
# Compute base wifi features
for (day_segment in day_segments)
features <- merge(features, base_wifi_features(wifi_data, day_segment, requested_features), by="local_date", all = TRUE)
if(ncol(features) != (length(requested_features)) * length(day_segments) + 1)
stop(paste0("The number of features in the output dataframe (=", ncol(features),") does not match the expected value (=", length(requested_features)," + 1). Verify your wifi feature extraction functions"))
write.csv(features, snakemake@output[[1]], row.names = FALSE)