2021-03-15 00:52:14 +01:00
def get_script_language(script_path):
from pathlib import Path
script_path = Path(script_path)
if not script_path.exists():
raise ValueError("The following provider feature script does not exist: " + str(script_path))
if script_path.name.endswith(".py"):
return "python"
elif script_path.name.endswith(".R"):
return "r"
2020-08-12 00:45:50 +02:00
# Features.smk #########################################################################################################
2021-05-26 20:04:29 +02:00
def optional_phone_yield_input_for_locations(wildcards):
if config["PHONE_LOCATIONS"]["LOCATIONS_TO_USE"] in ["ALL_RESAMPLED","FUSED_RESAMPLED"]:
return "data/interim/{pid}/phone_yielded_timestamps.csv"
return []
2021-03-23 19:04:01 +01:00
def get_barnett_daily(wildcards):
if wildcards.provider_key.upper() == "BARNETT":
return "data/interim/{pid}/phone_locations_barnett_daily.csv"
return []
2021-04-09 18:05:25 +02:00
def get_locations_python_input(wildcards):
if wildcards.provider_key.upper() == "DORYAB":
2021-09-15 20:28:09 +02:00
return "data/interim/{pid}/phone_locations_processed_with_datetime_with_doryab_columns_episodes_resampled_with_datetime.csv"
2021-04-09 18:05:25 +02:00
else:
return "data/interim/{pid}/phone_locations_processed_with_datetime.csv"
2021-09-02 00:54:39 +02:00
def get_calls_input(wildcards):
if (wildcards.provider_key.upper() == "RAPIDS") and (config["PHONE_CALLS"]["PROVIDERS"]["RAPIDS"]["FEATURES_TYPE"] == "EPISODES"):
return "data/interim/{pid}/phone_calls_episodes_resampled_with_datetime.csv"
else:
return "data/raw/{pid}/phone_calls_with_datetime.csv"
2020-08-28 19:53:00 +02:00
def find_features_files(wildcards):
feature_files = []
for provider_key, provider in config[(wildcards.sensor_key).upper()]["PROVIDERS"].items():
if provider["COMPUTE"]:
2021-03-15 00:52:14 +01:00
feature_files.extend(expand("data/interim/{{pid}}/{sensor_key}_features/{sensor_key}_{language}_{provider_key}.csv", sensor_key=wildcards.sensor_key.lower(), language=get_script_language(provider["SRC_SCRIPT"]), provider_key=provider_key.lower()))
2020-08-28 19:53:00 +02:00
return(feature_files)
2020-08-12 00:45:50 +02:00
2022-07-20 15:51:22 +02:00
def find_empaticas_standardized_features_files(wildcards):
feature_files = []
if "empatica" in wildcards.sensor_key:
for provider_key, provider in config[(wildcards.sensor_key).upper()]["PROVIDERS"].items():
if provider["COMPUTE"] and provider.get("WINDOWS", False) and provider["WINDOWS"]["COMPUTE"]:
if "empatica" in wildcards.sensor_key:
feature_files.extend(expand("data/interim/{{pid}}/{sensor_key}_features/z_{sensor_key}_{language}_{provider_key}.csv", sensor_key=wildcards.sensor_key.lower(), language=get_script_language(provider["SRC_SCRIPT"]), provider_key=provider_key.lower()))
return(feature_files)
def find_joint_non_empatica_sensor_files(wildcards):
joined_files = []
for config_key in config.keys():
if config_key.startswith(("PHONE", "FITBIT")) and "PROVIDERS" in config[config_key] and isinstance(config[config_key]["PROVIDERS"], dict):
for provider_key, provider in config[config_key]["PROVIDERS"].items():
if "COMPUTE" in provider.keys() and provider["COMPUTE"]:
joined_files.append("data/processed/features/{pid}/" + config_key.lower() + ".csv")
break
return joined_files
2020-08-12 00:45:50 +02:00
def optional_steps_sleep_input(wildcards):
2021-05-19 00:27:12 +02:00
if config["FITBIT_STEPS_INTRADAY"]["EXCLUDE_SLEEP"]["FITBIT_BASED"]["EXCLUDE"]:
return "data/raw/{pid}/fitbit_sleep_summary_raw.csv"
2020-08-12 00:45:50 +02:00
else:
return []
2021-05-19 00:27:12 +02:00
def optional_steps_intraday_input(wildcards):
if config["FITBIT_STEPS_INTRADAY"]["EXCLUDE_SLEEP"]["TIME_BASED"]["EXCLUDE"] or config["FITBIT_STEPS_INTRADAY"]["EXCLUDE_SLEEP"]["FITBIT_BASED"]["EXCLUDE"]:
return "data/interim/{pid}/fitbit_steps_intraday_with_datetime_exclude_sleep.csv"
else:
return "data/raw/{pid}/fitbit_steps_intraday_with_datetime.csv"
2020-11-25 22:34:05 +01:00
def input_merge_sensor_features_for_individual_participants(wildcards):
feature_files = []
for config_key in config.keys():
2020-12-15 02:30:34 +01:00
if config_key.startswith(("PHONE", "FITBIT", "EMPATICA")) and "PROVIDERS" in config[config_key] and isinstance(config[config_key]["PROVIDERS"], dict):
2020-11-25 22:34:05 +01:00
for provider_key, provider in config[config_key]["PROVIDERS"].items():
if "COMPUTE" in provider.keys() and provider["COMPUTE"]:
feature_files.append("data/processed/features/{pid}/" + config_key.lower() + ".csv")
break
return feature_files
2020-08-12 00:45:50 +02:00
2022-07-20 15:51:22 +02:00
def input_merge_standardized_sensor_features_for_individual_participants(wildcards):
feature_files = []
for config_key in config.keys():
if config_key.startswith(("PHONE", "FITBIT", "EMPATICA")) and "PROVIDERS" in config[config_key] and isinstance(config[config_key]["PROVIDERS"], dict):
for provider_key, provider in config[config_key]["PROVIDERS"].items():
if "COMPUTE" in provider.keys() and provider["COMPUTE"] and ("STANDARDIZE_FEATURES" in provider.keys() and provider["STANDARDIZE_FEATURES"] or
"WINDOWS" in provider.keys() and "STANDARDIZE_FEATURES" in provider["WINDOWS"].keys() and provider["WINDOWS"]["STANDARDIZE_FEATURES"]):
feature_files.append("data/processed/features/{pid}/z_" + config_key.lower() + ".csv")
break
return feature_files
2021-01-14 17:11:41 +01:00
def get_phone_sensor_names():
phone_sensor_names = []
for config_key in config.keys():
if config_key.startswith(("PHONE")) and "PROVIDERS" in config[config_key]:
if config_key != "PHONE_DATA_YIELD" and config_key not in phone_sensor_names:
phone_sensor_names.append(config_key)
return phone_sensor_names
2021-03-08 21:58:26 +01:00
def pull_phone_data_input_with_mutation_scripts(wilcards):
2021-03-16 00:35:58 +01:00
from pathlib import Path
2021-03-02 23:57:22 +01:00
import yaml
input = dict()
2021-03-08 21:58:26 +01:00
phone_stream = config["PHONE_DATA_STREAMS"]["USE"]
2021-03-02 23:57:22 +01:00
input["participant_file"] = "data/external/participant_files/{pid}.yaml"
input["rapids_schema_file"] = "src/data/streams/rapids_columns.yaml"
2021-03-08 21:58:26 +01:00
input["stream_format"] = "src/data/streams/" + phone_stream + "/format.yaml"
2021-03-02 23:57:22 +01:00
2021-03-08 21:58:26 +01:00
if Path("src/data/streams/"+ phone_stream + "/container.R").exists():
input["stream_container"] = "src/data/streams/"+ phone_stream + "/container.R"
elif Path("src/data/streams/"+ phone_stream + "/container.py").exists():
input["stream_container"] = "src/data/streams/"+ phone_stream + "/container.py"
else:
raise ValueError("The container script for {stream} is missing: src/data/streams/{stream}/container.[py|R]".format(stream=empatica_stream))
schema = yaml.load(open(input.get("stream_format"), 'r'), Loader=yaml.FullLoader)
2021-03-02 23:57:22 +01:00
sensor = ("phone_" + wilcards.sensor).upper()
if sensor not in schema:
2021-03-08 21:58:26 +01:00
raise ValueError("{sensor} is not defined in the schema {schema}".format(sensor=sensor, schema=input.get("stream_format")))
2021-03-09 00:19:53 +01:00
for device_os in schema[sensor].keys():
2021-03-09 22:42:02 +01:00
if "MUTATION" not in schema[sensor][device_os]:
raise ValueError("MUTATION is missing from [{sensor}][{device_os}] of {schema}".format(sensor=sensor, device_os=device_os,schema=input.get("stream_format")))
if "COLUMN_MAPPINGS" not in schema[sensor][device_os]["MUTATION"]:
raise ValueError("COLUMN_MAPPINGS is missing from [{sensor}][{device_os}][MUTATION] of {schema}".format(sensor=sensor, device_os=device_os, schema=input.get("stream_format")))
if "SCRIPTS" not in schema[sensor][device_os]["MUTATION"]:
raise ValueError("SCRIPTS is missing from [{sensor}][{device_os}][MUTATION] of {schema}".format(sensor=sensor, device_os=device_os, schema=input.get("stream_format")))
scripts = schema[sensor][device_os]["MUTATION"]["SCRIPTS"]
2021-03-02 23:57:22 +01:00
if isinstance(scripts, list):
for idx, script in enumerate(scripts):
if not script.lower().endswith((".py", ".r")):
2021-03-08 21:58:26 +01:00
raise ValueError("Mutate scripts can only be Python or R scripts (.py, .R).\n Instead we got {script} in \n [{sensor}][{device_os}] of {schema}".format(script=script, sensor=sensor, device_os=device_os, schema=input.get("stream_format")))
2021-03-02 23:57:22 +01:00
input["mutationscript"+str(idx)] = script
return input
2021-03-05 23:49:37 +01:00
def input_tzcodes_file(wilcards):
from pathlib import Path
if config["TIMEZONE"]["TYPE"] == "MULTIPLE":
if not config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"].lower().endswith(".csv"):
raise ValueError("[TIMEZONE][MULTIPLE][TZCODES_FILE] should point to a CSV file, instead you typed: " + config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"])
if not Path(config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"]).exists():
2021-12-15 18:09:30 +01:00
try:
config["TIMEZONE"]["MULTIPLE"]["TZ_FILE"]
except KeyError:
raise ValueError("To create TZCODES_FILE, a list of timezones should be created " +
"with the rule preprocessing.smk/prepare_tzcodes_file " +
"which will create a file specified as config['TIMEZONE']['MULTIPLE']['TZ_FILE']." +
"\n An alternative is to provide the file manually:" +
"[TIMEZONE][MULTIPLE][TZCODES_FILE] should point to a CSV file," +
"but the file in the path you typed does not exist: " +
config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"])
2021-03-05 23:49:37 +01:00
return [config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"]]
2021-03-07 05:16:59 +01:00
return []
2021-03-09 22:42:02 +01:00
def pull_wearable_data_input_with_mutation_scripts(wilcards):
2021-03-07 05:16:59 +01:00
import yaml
from pathlib import Path
input = dict()
2021-03-09 22:42:02 +01:00
device = wilcards.device_type.upper()
device_stream = config[device+"_DATA_STREAMS"]["USE"]
2021-03-07 05:16:59 +01:00
input["participant_file"] = "data/external/participant_files/{pid}.yaml"
input["rapids_schema_file"] = "src/data/streams/rapids_columns.yaml"
2021-03-09 22:42:02 +01:00
input["stream_format"] = "src/data/streams/" + device_stream + "/format.yaml"
2021-03-07 05:16:59 +01:00
2021-03-09 22:42:02 +01:00
if Path("src/data/streams/"+ device_stream + "/container.R").exists():
input["stream_container"] = "src/data/streams/"+ device_stream + "/container.R"
elif Path("src/data/streams/"+ device_stream + "/container.py").exists():
input["stream_container"] = "src/data/streams/"+ device_stream + "/container.py"
2021-03-07 05:16:59 +01:00
else:
2021-03-09 22:42:02 +01:00
raise ValueError("The container script for {stream} is missing: src/data/streams/{stream}/container.[py|R]".format(stream=device_stream))
2021-03-09 17:20:21 +01:00
schema = yaml.load(open(input.get("stream_format"), 'r'), Loader=yaml.FullLoader)
2021-03-09 22:42:02 +01:00
sensor = (device + "_" + wilcards.sensor).upper()
2021-03-09 17:20:21 +01:00
if sensor not in schema:
raise ValueError("{sensor} is not defined in the schema {schema}".format(sensor=sensor, schema=input.get("stream_format")))
2021-03-09 22:42:02 +01:00
if "MUTATION" not in schema[sensor]:
raise ValueError("MUTATION is missing from [{sensor}] of {schema}".format(sensor=sensor, schema=input.get("stream_format")))
if "COLUMN_MAPPINGS" not in schema[sensor]["MUTATION"]:
raise ValueError("COLUMN_MAPPINGS is missing from [{sensor}][MUTATION] of {schema}".format(sensor=sensor, schema=input.get("stream_format")))
if "SCRIPTS" not in schema[sensor]["MUTATION"]:
raise ValueError("SCRIPTS is missing from [{sensor}][MUTATION] of {schema}".format(sensor=sensor, schema=input.get("stream_format")))
scripts = schema[sensor]["MUTATION"]["SCRIPTS"]
2021-03-09 17:20:21 +01:00
if isinstance(scripts, list):
for idx, script in enumerate(scripts):
if not script.lower().endswith((".py", ".r")):
2021-03-09 22:42:02 +01:00
raise ValueError("Mutate scripts can only be Python or R scripts (.py, .R).\n Instead we got {script} in [{sensor}] of {schema}".format(script=script, sensor=sensor, schema=input.get("stream_format")))
2021-03-09 17:20:21 +01:00
input["mutationscript"+str(idx)] = script
return input
2021-03-15 00:52:14 +01:00