rapids/rules/common.smk

179 lines
10 KiB
Plaintext
Raw Normal View History

def get_script_language(script_path):
from pathlib import Path
script_path = Path(script_path)
if not script_path.exists():
raise ValueError("The following provider feature script does not exist: " + str(script_path))
if script_path.name.endswith(".py"):
return "python"
elif script_path.name.endswith(".R"):
return "r"
# Features.smk #########################################################################################################
def optional_phone_yield_input_for_locations(wildcards):
if config["PHONE_LOCATIONS"]["LOCATIONS_TO_USE"] in ["ALL_RESAMPLED","FUSED_RESAMPLED"]:
return "data/interim/{pid}/phone_yielded_timestamps.csv"
return []
def get_barnett_daily(wildcards):
if wildcards.provider_key.upper() == "BARNETT":
return "data/interim/{pid}/phone_locations_barnett_daily.csv"
return []
def get_locations_python_input(wildcards):
if wildcards.provider_key.upper() == "DORYAB":
return "data/interim/{pid}/phone_locations_processed_with_datetime_with_doryab_columns_episodes_resampled_with_datetime.csv"
else:
return "data/interim/{pid}/phone_locations_processed_with_datetime.csv"
def get_calls_input(wildcards):
if (wildcards.provider_key.upper() == "RAPIDS") and (config["PHONE_CALLS"]["PROVIDERS"]["RAPIDS"]["FEATURES_TYPE"] == "EPISODES"):
return "data/interim/{pid}/phone_calls_episodes_resampled_with_datetime.csv"
else:
return "data/raw/{pid}/phone_calls_with_datetime.csv"
def find_features_files(wildcards):
feature_files = []
for provider_key, provider in config[(wildcards.sensor_key).upper()]["PROVIDERS"].items():
if provider["COMPUTE"]:
feature_files.extend(expand("data/interim/{{pid}}/{sensor_key}_features/{sensor_key}_{language}_{provider_key}.csv", sensor_key=wildcards.sensor_key.lower(), language=get_script_language(provider["SRC_SCRIPT"]), provider_key=provider_key.lower()))
return(feature_files)
Squashed commit of the following: commit 31a47a5ee4569264e39d7c445525a6e64bb7700a Author: Primoz <sisko.primoz@gmail.com> Date: Wed Jul 20 13:49:22 2022 +0000 Environment version change. commit 5b274ed8993f58e783bda6d82fce936764209c28 Author: Primoz <sisko.primoz@gmail.com> Date: Tue Jul 19 16:10:07 2022 +0000 Enabled cleaning for all participants + standardization files. commit 203fdb31e0f3c647ef8c8a60cb9531831b7ab924 Author: Primoz <sisko.primoz@gmail.com> Date: Tue Jul 19 14:14:51 2022 +0000 Features cleaning fixes after testing. Visualization script for phone features values. commit 176178d73b154c30b9eb9eb4a67514f00d6a924e Author: Primoz <sisko.primoz@gmail.com> Date: Tue Jul 19 09:05:14 2022 +0000 Revert "Necessary config changes." This reverts commit 6ec1ef50430d2e1f5ce4670d505d5e84ac47f0a0. commit 26ea6512c9d512f95837e7b047fe510c1d196403 Author: Primoz <sisko.primoz@gmail.com> Date: Mon Jul 18 13:19:47 2022 +0000 Adding cleaning function condition and cleaning functionality. commit 575c29eef9c21e6f2d7832871e73bc0941643734 Author: Primoz <sisko.primoz@gmail.com> Date: Mon Jul 18 12:51:56 2022 +0000 Translation of the cleaning individual RAPIDS function from R to py. commit 6ec1ef50430d2e1f5ce4670d505d5e84ac47f0a0 Author: Primoz <sisko.primoz@gmail.com> Date: Mon Jul 18 12:02:18 2022 +0000 Necessary config changes. commit b5669f51612fbd8378848615d639677851ab032f Author: Primoz <sisko.primoz@gmail.com> Date: Fri Jul 15 15:26:00 2022 +0000 Modified snakemake rule to dynamically choose script extention. commit 66636be1e8ae4828228b37c59b9df1faf3fc3d3d Author: Primoz <sisko.primoz@gmail.com> Date: Fri Jul 15 14:43:08 2022 +0000 Trying to modify the snakefile rule to execute scripts in two languages depended on the provider. commit 574778b00f3cbb368ef4bc74de15cf5070c65ea9 Author: Primoz <sisko.primoz@gmail.com> Date: Fri Jul 15 09:49:41 2022 +0000 gitignore: adding required files so that RAPIDS can be run successfully. commit 71018ab178256970535e78961602ab8c7f0ebb14 Author: Primoz <sisko.primoz@gmail.com> Date: Fri Jul 15 08:34:19 2022 +0000 Standardization bug fixes commit 6253c470a624e6bfbb02e0c453b652452eb2dbbc Author: Primoz <sisko.primoz@gmail.com> Date: Thu Jul 14 15:28:02 2022 +0000 Seperate rules for empatica vs. nonempatica standardization. Parameter in config that controls the creation of standardized merged files for individual and all participants.. commit 90f902778565e0896d3bae22ae8551be8b487e67 Author: Primoz <sisko.primoz@gmail.com> Date: Tue Jul 12 14:23:03 2022 +0000 Preparing for final csvs' standardization. commit d25dde3998786a9a582f5cda544ee104386778f9 Author: Primoz <sisko.primoz@gmail.com> Date: Mon Jul 11 12:08:47 2022 +0000 Revert "Changes in config to be reverted." This reverts commit bea7608e7095021fb7c53a9afa07074448fe4313. commit 6b23e70857e63deda98eb98d190af9090626c84b Author: Primoz <sisko.primoz@gmail.com> Date: Mon Jul 11 12:08:26 2022 +0000 Enabled standardization for rest (previously active) phone features. Testing still needed. commit 8ec58a6f34ba3d42e5cc71d26e6d91837472ca5f Author: Primoz <sisko.primoz@gmail.com> Date: Mon Jul 11 09:07:55 2022 +0000 Enabled standardization for phone calls. All steps completed and tested. commit bea7608e7095021fb7c53a9afa07074448fe4313 Author: Primoz <sisko.primoz@gmail.com> Date: Mon Jul 11 07:47:51 2022 +0000 Changes in config to be reverted. commit 4e84ca0e51bf709bff56fd09437b95310ec6bedd Author: Primoz <sisko.primoz@gmail.com> Date: Fri Jul 8 14:11:24 2022 +0000 Standardization for the rest of the features. commit cc581aa788e3d5c17131af8f3d5dd6b0c3b5aff7 Author: Primoz <sisko.primoz@gmail.com> Date: Fri Jul 8 14:11:08 2022 +0000 README update again
2022-07-20 15:51:22 +02:00
def find_joint_non_empatica_sensor_files(wildcards):
joined_files = []
for config_key in config.keys():
if config_key.startswith(("PHONE", "FITBIT")) and "PROVIDERS" in config[config_key] and isinstance(config[config_key]["PROVIDERS"], dict):
for provider_key, provider in config[config_key]["PROVIDERS"].items():
if "COMPUTE" in provider.keys() and provider["COMPUTE"]:
joined_files.append("data/processed/features/{pid}/" + config_key.lower() + ".csv")
break
return joined_files
def optional_steps_sleep_input(wildcards):
if config["FITBIT_STEPS_INTRADAY"]["EXCLUDE_SLEEP"]["FITBIT_BASED"]["EXCLUDE"]:
return "data/raw/{pid}/fitbit_sleep_summary_raw.csv"
else:
return []
def optional_steps_intraday_input(wildcards):
if config["FITBIT_STEPS_INTRADAY"]["EXCLUDE_SLEEP"]["TIME_BASED"]["EXCLUDE"] or config["FITBIT_STEPS_INTRADAY"]["EXCLUDE_SLEEP"]["FITBIT_BASED"]["EXCLUDE"]:
return "data/interim/{pid}/fitbit_steps_intraday_with_datetime_exclude_sleep.csv"
else:
return "data/raw/{pid}/fitbit_steps_intraday_with_datetime.csv"
2020-11-25 22:34:05 +01:00
def input_merge_sensor_features_for_individual_participants(wildcards):
feature_files = []
for config_key in config.keys():
2020-12-15 02:30:34 +01:00
if config_key.startswith(("PHONE", "FITBIT", "EMPATICA")) and "PROVIDERS" in config[config_key] and isinstance(config[config_key]["PROVIDERS"], dict):
2020-11-25 22:34:05 +01:00
for provider_key, provider in config[config_key]["PROVIDERS"].items():
if "COMPUTE" in provider.keys() and provider["COMPUTE"]:
feature_files.append("data/processed/features/{pid}/" + config_key.lower() + ".csv")
break
return feature_files
def get_phone_sensor_names():
phone_sensor_names = []
for config_key in config.keys():
if config_key.startswith(("PHONE")) and "PROVIDERS" in config[config_key]:
if config_key != "PHONE_DATA_YIELD" and config_key not in phone_sensor_names:
phone_sensor_names.append(config_key)
return phone_sensor_names
def pull_phone_data_input_with_mutation_scripts(wilcards):
2021-03-16 00:35:58 +01:00
from pathlib import Path
import yaml
input = dict()
phone_stream = config["PHONE_DATA_STREAMS"]["USE"]
input["participant_file"] = "data/external/participant_files/{pid}.yaml"
input["rapids_schema_file"] = "src/data/streams/rapids_columns.yaml"
input["stream_format"] = "src/data/streams/" + phone_stream + "/format.yaml"
if Path("src/data/streams/"+ phone_stream + "/container.R").exists():
input["stream_container"] = "src/data/streams/"+ phone_stream + "/container.R"
elif Path("src/data/streams/"+ phone_stream + "/container.py").exists():
input["stream_container"] = "src/data/streams/"+ phone_stream + "/container.py"
else:
raise ValueError("The container script for {stream} is missing: src/data/streams/{stream}/container.[py|R]".format(stream=empatica_stream))
schema = yaml.load(open(input.get("stream_format"), 'r'), Loader=yaml.FullLoader)
sensor = ("phone_" + wilcards.sensor).upper()
if sensor not in schema:
raise ValueError("{sensor} is not defined in the schema {schema}".format(sensor=sensor, schema=input.get("stream_format")))
2021-03-09 00:19:53 +01:00
for device_os in schema[sensor].keys():
2021-03-09 22:42:02 +01:00
if "MUTATION" not in schema[sensor][device_os]:
raise ValueError("MUTATION is missing from [{sensor}][{device_os}] of {schema}".format(sensor=sensor, device_os=device_os,schema=input.get("stream_format")))
if "COLUMN_MAPPINGS" not in schema[sensor][device_os]["MUTATION"]:
raise ValueError("COLUMN_MAPPINGS is missing from [{sensor}][{device_os}][MUTATION] of {schema}".format(sensor=sensor, device_os=device_os, schema=input.get("stream_format")))
if "SCRIPTS" not in schema[sensor][device_os]["MUTATION"]:
raise ValueError("SCRIPTS is missing from [{sensor}][{device_os}][MUTATION] of {schema}".format(sensor=sensor, device_os=device_os, schema=input.get("stream_format")))
scripts = schema[sensor][device_os]["MUTATION"]["SCRIPTS"]
if isinstance(scripts, list):
for idx, script in enumerate(scripts):
if not script.lower().endswith((".py", ".r")):
raise ValueError("Mutate scripts can only be Python or R scripts (.py, .R).\n Instead we got {script} in \n [{sensor}][{device_os}] of {schema}".format(script=script, sensor=sensor, device_os=device_os, schema=input.get("stream_format")))
input["mutationscript"+str(idx)] = script
return input
2021-03-05 23:49:37 +01:00
def input_tzcodes_file(wilcards):
from pathlib import Path
if config["TIMEZONE"]["TYPE"] == "MULTIPLE":
if not config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"].lower().endswith(".csv"):
raise ValueError("[TIMEZONE][MULTIPLE][TZCODES_FILE] should point to a CSV file, instead you typed: " + config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"])
if not Path(config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"]).exists():
try:
config["TIMEZONE"]["MULTIPLE"]["TZ_FILE"]
except KeyError:
raise ValueError("To create TZCODES_FILE, a list of timezones should be created " +
"with the rule preprocessing.smk/prepare_tzcodes_file " +
"which will create a file specified as config['TIMEZONE']['MULTIPLE']['TZ_FILE']." +
"\n An alternative is to provide the file manually:" +
"[TIMEZONE][MULTIPLE][TZCODES_FILE] should point to a CSV file," +
"but the file in the path you typed does not exist: " +
config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"])
2021-03-05 23:49:37 +01:00
return [config["TIMEZONE"]["MULTIPLE"]["TZCODES_FILE"]]
return []
2021-03-09 22:42:02 +01:00
def pull_wearable_data_input_with_mutation_scripts(wilcards):
import yaml
from pathlib import Path
input = dict()
2021-03-09 22:42:02 +01:00
device = wilcards.device_type.upper()
device_stream = config[device+"_DATA_STREAMS"]["USE"]
input["participant_file"] = "data/external/participant_files/{pid}.yaml"
input["rapids_schema_file"] = "src/data/streams/rapids_columns.yaml"
2021-03-09 22:42:02 +01:00
input["stream_format"] = "src/data/streams/" + device_stream + "/format.yaml"
2021-03-09 22:42:02 +01:00
if Path("src/data/streams/"+ device_stream + "/container.R").exists():
input["stream_container"] = "src/data/streams/"+ device_stream + "/container.R"
elif Path("src/data/streams/"+ device_stream + "/container.py").exists():
input["stream_container"] = "src/data/streams/"+ device_stream + "/container.py"
else:
2021-03-09 22:42:02 +01:00
raise ValueError("The container script for {stream} is missing: src/data/streams/{stream}/container.[py|R]".format(stream=device_stream))
2021-03-09 17:20:21 +01:00
schema = yaml.load(open(input.get("stream_format"), 'r'), Loader=yaml.FullLoader)
2021-03-09 22:42:02 +01:00
sensor = (device + "_" + wilcards.sensor).upper()
2021-03-09 17:20:21 +01:00
if sensor not in schema:
raise ValueError("{sensor} is not defined in the schema {schema}".format(sensor=sensor, schema=input.get("stream_format")))
2021-03-09 22:42:02 +01:00
if "MUTATION" not in schema[sensor]:
raise ValueError("MUTATION is missing from [{sensor}] of {schema}".format(sensor=sensor, schema=input.get("stream_format")))
if "COLUMN_MAPPINGS" not in schema[sensor]["MUTATION"]:
raise ValueError("COLUMN_MAPPINGS is missing from [{sensor}][MUTATION] of {schema}".format(sensor=sensor, schema=input.get("stream_format")))
if "SCRIPTS" not in schema[sensor]["MUTATION"]:
raise ValueError("SCRIPTS is missing from [{sensor}][MUTATION] of {schema}".format(sensor=sensor, schema=input.get("stream_format")))
scripts = schema[sensor]["MUTATION"]["SCRIPTS"]
2021-03-09 17:20:21 +01:00
if isinstance(scripts, list):
for idx, script in enumerate(scripts):
if not script.lower().endswith((".py", ".r")):
2021-03-09 22:42:02 +01:00
raise ValueError("Mutate scripts can only be Python or R scripts (.py, .R).\n Instead we got {script} in [{sensor}] of {schema}".format(script=script, sensor=sensor, schema=input.get("stream_format")))
2021-03-09 17:20:21 +01:00
input["mutationscript"+str(idx)] = script
return input