From 70991d666766a52e1350912153ac48fe67e7524e Mon Sep 17 00:00:00 2001 From: Meng Li <34143965+Meng6@users.noreply.github.com> Date: Mon, 30 Nov 2020 12:34:14 -0500 Subject: [PATCH] Add dates filter while parsing fitbit data --- rules/preprocessing.smk | 9 ++++++--- src/data/fitbit_parse_heartrate.py | 19 +++++++++++++----- src/data/fitbit_parse_sleep.py | 32 ++++++++++++++++++++---------- src/data/fitbit_parse_steps.py | 14 ++++++++++--- 4 files changed, 53 insertions(+), 21 deletions(-) diff --git a/rules/preprocessing.smk b/rules/preprocessing.smk index 2ed42809..7b03b18e 100644 --- a/rules/preprocessing.smk +++ b/rules/preprocessing.smk @@ -176,7 +176,8 @@ rule phone_application_categories: rule fitbit_parse_heartrate: input: - "data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_raw.csv" + participant_file = "data/external/participant_files/{pid}.yaml", + raw_data = "data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_raw.csv" params: timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"], table = lambda wildcards: config["FITBIT_HEARTRATE_"+str(wildcards.fitbit_data_type).upper()]["TABLE"], @@ -189,7 +190,8 @@ rule fitbit_parse_heartrate: rule fitbit_parse_steps: input: - "data/raw/{pid}/fitbit_steps_{fitbit_data_type}_raw.csv" + participant_file = "data/external/participant_files/{pid}.yaml", + raw_data = "data/raw/{pid}/fitbit_steps_{fitbit_data_type}_raw.csv" params: timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"], table = lambda wildcards: config["FITBIT_STEPS_"+str(wildcards.fitbit_data_type).upper()]["TABLE"], @@ -202,7 +204,8 @@ rule fitbit_parse_steps: rule fitbit_parse_sleep: input: - "data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_raw.csv" + participant_file = "data/external/participant_files/{pid}.yaml", + raw_data = "data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_raw.csv" params: timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"], table = lambda wildcards: config["FITBIT_SLEEP_"+str(wildcards.fitbit_data_type).upper()]["TABLE"], diff --git a/src/data/fitbit_parse_heartrate.py b/src/data/fitbit_parse_heartrate.py index 61082dcb..33e9c484 100644 --- a/src/data/fitbit_parse_heartrate.py +++ b/src/data/fitbit_parse_heartrate.py @@ -1,4 +1,4 @@ -import json, sys +import yaml, json, sys import pandas as pd import numpy as np from datetime import datetime, timezone @@ -93,7 +93,6 @@ def parseHeartrateIntradayData(records_intraday, dataset, device_id, curr_date, records_intraday.append(row_intraday) return records_intraday -# def append_timestamp(data): def parseHeartrateData(heartrate_data, fitbit_data_type): @@ -132,13 +131,23 @@ timezone = snakemake.params["timezone"] column_format = snakemake.params["column_format"] fitbit_data_type = snakemake.params["fitbit_data_type"] +with open(snakemake.input["participant_file"], "r", encoding="utf-8") as f: + participant_file = yaml.safe_load(f) +local_start_date = pd.Timestamp(participant_file["FITBIT"]["START_DATE"]) +local_end_date = pd.Timestamp(participant_file["FITBIT"]["END_DATE"]) + pd.DateOffset(1) + if column_format == "JSON": - json_raw = pd.read_csv(snakemake.input[0]) + json_raw = pd.read_csv(snakemake.input["raw_data"]) parsed_data = parseHeartrateData(json_raw, fitbit_data_type) elif column_format == "PLAIN_TEXT": - parsed_data = pd.read_csv(snakemake.input[0], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None)) + parsed_data = pd.read_csv(snakemake.input["raw_data"], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None)) +else: + raise ValueError("column_format can only be one of ['JSON', 'PLAIN_TEXT'].") + +# Only keep dates in the range of [local_start_date, local_end_date) +parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)] if parsed_data.shape[0] > 0: parsed_data["timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6 -parsed_data.to_csv(snakemake.output[0], index=False) \ No newline at end of file +parsed_data.to_csv(snakemake.output[0], index=False) diff --git a/src/data/fitbit_parse_sleep.py b/src/data/fitbit_parse_sleep.py index c9a79521..a5f49d81 100644 --- a/src/data/fitbit_parse_sleep.py +++ b/src/data/fitbit_parse_sleep.py @@ -1,4 +1,4 @@ -import json +import json, yaml import pandas as pd import numpy as np from datetime import datetime, timedelta @@ -222,32 +222,44 @@ column_format = snakemake.params["column_format"] fitbit_data_type = snakemake.params["fitbit_data_type"] sleep_episode_timestamp = snakemake.params["sleep_episode_timestamp"] +with open(snakemake.input["participant_file"], "r", encoding="utf-8") as f: + participant_file = yaml.safe_load(f) +local_start_date = pd.Timestamp(participant_file["FITBIT"]["START_DATE"]) +local_end_date = pd.Timestamp(participant_file["FITBIT"]["END_DATE"]) + pd.DateOffset(1) + if column_format == "JSON": - json_raw = pd.read_csv(snakemake.input[0]) + json_raw = pd.read_csv(snakemake.input["raw_data"]) parsed_data = parseSleepData(json_raw, fitbit_data_type) elif column_format == "PLAIN_TEXT": if fitbit_data_type == "summary": - parsed_data = pd.read_csv(snakemake.input[0], parse_dates=["local_start_date_time", "local_end_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None)) + parsed_data = pd.read_csv(snakemake.input["raw_data"], parse_dates=["local_start_date_time", "local_end_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None)) elif fitbit_data_type == "intraday": - parsed_data = pd.read_csv(snakemake.input[1], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None)) + parsed_data = pd.read_csv(snakemake.input["raw_data"], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None)) else: raise ValueError("fitbit_data_type can only be one of ['summary', 'intraday'].") else: raise ValueError("column_format can only be one of ['JSON', 'PLAIN_TEXT'].") if parsed_data.shape[0] > 0 and fitbit_data_type == "summary": - if sleep_episode_timestamp == "start": - parsed_data["timestamp"] = parsed_data["local_start_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6 - elif sleep_episode_timestamp == "end": - parsed_data["timestamp"] = parsed_data["local_end_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6 - else: + + if sleep_episode_timestamp != "start" and sleep_episode_timestamp != "end": raise ValueError("SLEEP_EPISODE_TIMESTAMP can only be one of ['start', 'end'].") + + # Column name to be considered as the event datetime + datetime_column = "local_" + sleep_episode_timestamp + "_date_time" + # Only keep dates in the range of [local_start_date, local_end_date) + parsed_data = parsed_data.loc[(parsed_data[datetime_column] >= local_start_date) & (parsed_data[datetime_column] < local_end_date)] + # Convert datetime to timestamp + parsed_data["timestamp"] = parsed_data[datetime_column].dt.tz_localize(timezone).astype(np.int64) // 10**6 # Drop useless columns: local_start_date_time and local_end_date_time parsed_data.drop(["local_start_date_time", "local_end_date_time"], axis = 1, inplace=True) if parsed_data.shape[0] > 0 and fitbit_data_type == "intraday": + # Only keep dates in the range of [local_start_date, local_end_date) + parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)] + # Convert datetime to timestamp parsed_data["timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6 # Unifying level parsed_data["unified_level"] = np.where(parsed_data["level"].isin(["awake", "wake", "restless"]), 0, 1) -parsed_data.to_csv(snakemake.output[0], index=False) \ No newline at end of file +parsed_data.to_csv(snakemake.output[0], index=False) diff --git a/src/data/fitbit_parse_steps.py b/src/data/fitbit_parse_steps.py index 773a02bc..b6f32eb7 100644 --- a/src/data/fitbit_parse_steps.py +++ b/src/data/fitbit_parse_steps.py @@ -1,4 +1,4 @@ -import json +import json, yaml import pandas as pd import numpy as np from datetime import datetime, timezone @@ -57,14 +57,22 @@ timezone = snakemake.params["timezone"] column_format = snakemake.params["column_format"] fitbit_data_type = snakemake.params["fitbit_data_type"] +with open(snakemake.input["participant_file"], "r", encoding="utf-8") as f: + participant_file = yaml.safe_load(f) +local_start_date = pd.Timestamp(participant_file["FITBIT"]["START_DATE"]) +local_end_date = pd.Timestamp(participant_file["FITBIT"]["END_DATE"]) + pd.DateOffset(1) + if column_format == "JSON": - json_raw = pd.read_csv(snakemake.input[0]) + json_raw = pd.read_csv(snakemake.input["raw_data"]) parsed_data = parseStepsData(json_raw, fitbit_data_type) elif column_format == "PLAIN_TEXT": - parsed_data = pd.read_csv(snakemake.input[0], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None)) + parsed_data = pd.read_csv(snakemake.input["raw_data"], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None)) else: raise ValueError("column_format can only be one of ['JSON', 'PLAIN_TEXT'].") +# Only keep dates in the range of [local_start_date, local_end_date) +parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)] + if parsed_data.shape[0] > 0: parsed_data["timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6