From cff83a7ceba977c6cd61c2f9703d35621f73035e Mon Sep 17 00:00:00 2001 From: JulioV Date: Thu, 22 Oct 2020 13:08:52 -0400 Subject: [PATCH] Implement parse fitbit data --- Snakefile | 41 +++++--- config.yaml | 57 +++++++++--- rules/preprocessing.smk | 93 ++++++++++++------- .../fitbit_parse_calories.py | 27 ++++-- .../fitbit_parse_heartrate.py | 45 ++++++--- .../fitbit_parse_steps.py | 35 ------- .../fitbit_parse_sleep.py | 70 +++++++------- src/data/fitbit_parse_steps.py | 49 ++++++++++ 8 files changed, 267 insertions(+), 150 deletions(-) rename src/data/{fitbit_parse_sensors => }/fitbit_parse_calories.py (51%) rename src/data/{fitbit_parse_sensors => }/fitbit_parse_heartrate.py (78%) delete mode 100644 src/data/fitbit_parse_sensors/fitbit_parse_steps.py rename src/data/{fitbit_parse_sensors => }/fitbit_parse_sleep.py (78%) create mode 100644 src/data/fitbit_parse_steps.py diff --git a/Snakefile b/Snakefile index 0bd59766..ecf3dba4 100644 --- a/Snakefile +++ b/Snakefile @@ -144,29 +144,42 @@ for provider in config["PHONE_LOCATIONS"]["PROVIDERS"].keys(): files_to_compute.extend(expand("data/interim/{pid}/phone_locations_features/phone_locations_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["PHONE_LOCATIONS"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) files_to_compute.extend(expand("data/processed/features/{pid}/phone_locations.csv", pid=config["PIDS"])) +if config["FITBIT_HEARTRATE"]["TABLE_FORMAT"] not in ["JSON", "CSV"]: + raise ValueError("config['FITBIT_HEARTRATE']['TABLE_FORMAT'] should be JSON or CSV but you typed" + config["FITBIT_HEARTRATE"]["TABLE_FORMAT"]) + +if config["FITBIT_STEPS"]["TABLE_FORMAT"] not in ["JSON", "CSV"]: + raise ValueError("config['FITBIT_STEPS']['TABLE_FORMAT'] should be JSON or CSV but you typed" + config["FITBIT_STEPS"]["TABLE_FORMAT"]) + +if config["FITBIT_CALORIES"]["TABLE_FORMAT"] not in ["JSON", "CSV"]: + raise ValueError("config['FITBIT_CALORIES']['TABLE_FORMAT'] should be JSON or CSV but you typed" + config["FITBIT_CALORIES"]["TABLE_FORMAT"]) + +if config["FITBIT_SLEEP"]["TABLE_FORMAT"] not in ["JSON", "CSV"]: + raise ValueError("config['FITBIT_SLEEP']['TABLE_FORMAT'] should be JSON or CSV but you typed" + config["FITBIT_SLEEP"]["TABLE_FORMAT"]) for provider in config["FITBIT_HEARTRATE"]["PROVIDERS"].keys(): if config["FITBIT_HEARTRATE"]["PROVIDERS"][provider]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/fitbit_heartrate_raw.csv", pid=config["PIDS"])) - # files_to_compute.extend(expand("data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) - # files_to_compute.extend(expand("data/processed/{pid}/fitbit_heartrate_{day_segment}.csv", pid = config["PIDS"], day_segment = config["HEARTRATE"]["DAY_SEGMENTS"])) + files_to_compute.extend(expand("data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_raw.csv", pid=config["PIDS"], fitbit_data_type=(["json"] if config["FITBIT_HEARTRATE"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"]))) + files_to_compute.extend(expand("data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_parsed.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) + files_to_compute.extend(expand("data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_parsed_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) for provider in config["FITBIT_STEPS"]["PROVIDERS"].keys(): if config["FITBIT_STEPS"]["PROVIDERS"][provider]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_raw.csv", pid=config["PIDS"])) -# if config["STEP"]["COMPUTE"]: -# if config["STEP"]["EXCLUDE_SLEEP"]["EXCLUDE"] == True and config["STEP"]["EXCLUDE_SLEEP"]["TYPE"] == "FITBIT_BASED": -# files_to_compute.extend(expand("data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["summary"])) -# files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["STEP"]["TABLE"])) -# files_to_compute.extend(expand("data/raw/{pid}/fitbit_step_{fitbit_data_type}_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["intraday"])) -# files_to_compute.extend(expand("data/processed/{pid}/fitbit_step_{day_segment}.csv", pid = config["PIDS"], day_segment = config["STEP"]["DAY_SEGMENTS"])) + files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_{fitbit_data_type}_raw.csv", pid=config["PIDS"], fitbit_data_type=(["json"] if config["FITBIT_STEPS"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"]))) + files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_{fitbit_data_type}_parsed.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) + files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_{fitbit_data_type}_parsed_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) + +for provider in config["FITBIT_CALORIES"]["PROVIDERS"].keys(): + if config["FITBIT_CALORIES"]["PROVIDERS"][provider]["COMPUTE"]: + files_to_compute.extend(expand("data/raw/{pid}/fitbit_calories_{fitbit_data_type}_raw.csv", pid=config["PIDS"], fitbit_data_type=(["json"] if config["FITBIT_CALORIES"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"]))) + files_to_compute.extend(expand("data/raw/{pid}/fitbit_calories_{fitbit_data_type}_parsed.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) + files_to_compute.extend(expand("data/raw/{pid}/fitbit_calories_{fitbit_data_type}_parsed_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) for provider in config["FITBIT_SLEEP"]["PROVIDERS"].keys(): if config["FITBIT_SLEEP"]["PROVIDERS"][provider]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/fitbit_sleep_raw.csv", pid=config["PIDS"])) -# files_to_compute.extend(expand("data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["intraday", "summary"])) -# files_to_compute.extend(expand("data/processed/{pid}/fitbit_sleep_{day_segment}.csv", pid = config["PIDS"], day_segment = config["SLEEP"]["DAY_SEGMENTS"])) - + files_to_compute.extend(expand("data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_raw.csv", pid=config["PIDS"], fitbit_data_type=(["json"] if config["FITBIT_SLEEP"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"]))) + files_to_compute.extend(expand("data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_parsed_episodes.csv", pid=config["PIDS"], fitbit_data_type=["summary"])) + files_to_compute.extend(expand("data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_parsed.csv", pid=config["PIDS"], fitbit_data_type=["intraday"])) + files_to_compute.extend(expand("data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_parsed_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["intraday"])) # visualization for data exploration if config["HEATMAP_FEATURES_CORRELATIONS"]["PLOT"]: diff --git a/config.yaml b/config.yaml index 55b69a9d..a5e14f30 100644 --- a/config.yaml +++ b/config.yaml @@ -15,14 +15,15 @@ TIMEZONE: &timezone DATABASE_GROUP: &database_group MY_GROUP +# config section for the script that creates participant files automatically PARTICIPANT_FILES: # run snakemake -j1 -R parse_participant_files PHONE_SECTION: - INCLUDE: TRUE + ADD: TRUE PARSED_FROM: AWARE_DEVICE_TABLE #AWARE_DEVICE_TABLE or CSV_FILE PARSED_SOURCE: *database_group # DB credentials group or CSV file path. If CSV file, it should have: device_id, pid (optional), label (optional), start_date (optional), end_date (optional) IGNORED_DEVICE_IDS: [] FITBIT_SECTION: - INCLUDE: FALSE + ADD: FALSE SAME_AS_PHONE: FALSE # If TRUE, all config below is ignored PARSED_FROM: CSV_FILE PARSED_SOURCE: "external/my_fitbit_participants.csv" # CSV file should have: device_id, pid (optional), label (optional), start_date (optional), end_date (optional) @@ -241,28 +242,40 @@ PHONE_CONVERSATION: SRC_FOLDER: "rapids" # inside src/features/phone_conversation SRC_LANGUAGE: "python" +############## FITBIT ########################################################## +################################################################################ FITBIT_HEARTRATE: - TABLE: "fitbit_data" - PARSE_JSON: TRUE + TABLE_FORMAT: JSON # JSON or CSV + TABLE: + JSON: fitbit_heartrate + CSV: + SUMMARY: heartrate_summary.csv + INTRADAY: heartrate_intraday.csv PROVIDERS: RAPIDS: - COMPUTE: True + COMPUTE: False SUMMARY_FEATURES: ["restinghr"] # calories features' accuracy depend on the accuracy of the participants fitbit profile (e.g. height, weight) use these with care: ["caloriesoutofrange", "caloriesfatburn", "caloriescardio", "caloriespeak"] INTRADAY_FEATURES: ["maxhr", "minhr", "avghr", "medianhr", "modehr", "stdhr", "diffmaxmodehr", "diffminmodehr", "entropyhr", "minutesonoutofrangezone", "minutesonfatburnzone", "minutesoncardiozone", "minutesonpeakzone"] + FITBIT_STEPS: - TABLE: fitbit_data - PARSE_JSON: TRUE - EXCLUDE_SLEEP: + TABLE_FORMAT: JSON # JSON or CSV + TABLE: + JSON: fitbit_steps + CSV: + SUMMARY: steps_summary.csv + INTRADAY: steps_intraday.csv + EXCLUDE_SLEEP: # you can exclude sleep periods from the step features computation EXCLUDE: False TYPE: FIXED # FIXED OR FITBIT_BASED (configure FITBIT_SLEEP section) FIXED: START: "23:00" END: "07:00" + PROVIDERS: RAPIDS: - COMPUTE: TRUE + COMPUTE: False FEATURES: ALL_STEPS: ["sumallsteps", "maxallsteps", "minallsteps", "avgallsteps", "stdallsteps"] SEDENTARY_BOUT: ["countepisode", "sumduration", "maxduration", "minduration", "avgduration", "stdduration"] @@ -271,15 +284,33 @@ FITBIT_STEPS: INCLUDE_ZERO_STEP_ROWS: False FITBIT_SLEEP: - TABLE: fitbit_data - PARSE_JSON: TRUE + TABLE_FORMAT: JSON # JSON or CSV + TABLE: + JSON: fitbit_sleep + CSV: + SUMMARY: sleep_summary.csv + INTRADAY: sleep_intraday.csv PROVIDERS: RAPIDS: - COMPUTE: TRUE + COMPUTE: False SLEEP_TYPES: ["main", "nap", "all"] SUMMARY_FEATURES: ["sumdurationafterwakeup", "sumdurationasleep", "sumdurationawake", "sumdurationtofallasleep", "sumdurationinbed", "avgefficiency", "countepisode"] -### Visualizations ################################################################ +FITBIT_CALORIES: + TABLE_FORMAT: JSON # JSON or CSV + TABLE: + JSON: fitbit_calories + CSV: + SUMMARY: calories_summary.csv + INTRADAY: calories_intraday.csv + PROVIDERS: + RAPIDS: + COMPUTE: False + FEATURES: [] + +### Visualizations ############################################################# +################################################################################ + HEATMAP_FEATURES_CORRELATIONS: PLOT: False MIN_ROWS_RATIO: 0.5 diff --git a/rules/preprocessing.smk b/rules/preprocessing.smk index 04e1088d..5a71fab3 100644 --- a/rules/preprocessing.smk +++ b/rules/preprocessing.smk @@ -44,9 +44,10 @@ rule download_fitbit_data: params: source = config["SENSOR_DATA"]["FITBIT"]["SOURCE"], sensor = "fitbit_" + "{sensor}", + type = "{fitbit_data_type}", table = lambda wildcards: config["FITBIT_" + str(wildcards.sensor).upper()]["TABLE"], output: - "data/raw/{pid}/fitbit_{sensor}_raw.csv" + "data/raw/{pid}/fitbit_{sensor}_{fitbit_data_type}_raw.csv" script: "../src/data/download_fitbit_data.R" @@ -179,37 +180,63 @@ rule phone_application_categories: script: "../src/data/application_categories.R" -# rule fitbit_heartrate_with_datetime: -# input: -# expand("data/raw/{{pid}}/{fitbit_table}_raw.csv", fitbit_table=config["HEARTRATE"]["TABLE"]) -# params: -# local_timezone = config["READABLE_DATETIME"]["FIXED_TIMEZONE"], -# fitbit_sensor = "heartrate" -# output: -# summary_data = "data/raw/{pid}/fitbit_heartrate_summary_with_datetime.csv", -# intraday_data = "data/raw/{pid}/fitbit_heartrate_intraday_with_datetime.csv" -# script: -# "../src/data/fitbit_readable_datetime.py" +rule fitbit_parse_heartrate: + input: + data = expand("data/raw/{{pid}}/fitbit_heartrate_{fitbit_data_type}_raw.csv", fitbit_data_type = (["json"] if config["FITBIT_HEARTRATE"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"])) + params: + table = config["FITBIT_HEARTRATE"]["TABLE"], + table_format = config["FITBIT_HEARTRATE"]["TABLE_FORMAT"] + output: + summary_data = "data/raw/{pid}/fitbit_heartrate_summary_parsed.csv", + intraday_data = "data/raw/{pid}/fitbit_heartrate_intraday_parsed.csv" + script: + "../src/data/fitbit_parse_heartrate.py" -# rule fitbit_step_with_datetime: -# input: -# expand("data/raw/{{pid}}/{fitbit_table}_raw.csv", fitbit_table=config["STEP"]["TABLE"]) -# params: -# local_timezone = config["READABLE_DATETIME"]["FIXED_TIMEZONE"], -# fitbit_sensor = "steps" -# output: -# intraday_data = "data/raw/{pid}/fitbit_step_intraday_with_datetime.csv" -# script: -# "../src/data/fitbit_readable_datetime.py" +rule fitbit_parse_steps: + input: + data = expand("data/raw/{{pid}}/fitbit_steps_{fitbit_data_type}_raw.csv", fitbit_data_type = (["json"] if config["FITBIT_STEPS"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"])) + params: + table = config["FITBIT_STEPS"]["TABLE"], + table_format = config["FITBIT_STEPS"]["TABLE_FORMAT"] + output: + summary_data = "data/raw/{pid}/fitbit_steps_summary_parsed.csv", + intraday_data = "data/raw/{pid}/fitbit_steps_intraday_parsed.csv" + script: + "../src/data/fitbit_parse_steps.py" -# rule fitbit_sleep_with_datetime: -# input: -# expand("data/raw/{{pid}}/{fitbit_table}_raw.csv", fitbit_table=config["SLEEP"]["TABLE"]) -# params: -# local_timezone = config["READABLE_DATETIME"]["FIXED_TIMEZONE"], -# fitbit_sensor = "sleep" -# output: -# summary_data = "data/raw/{pid}/fitbit_sleep_summary_with_datetime.csv", -# intraday_data = "data/raw/{pid}/fitbit_sleep_intraday_with_datetime.csv" -# script: -# "../src/data/fitbit_readable_datetime.py" +rule fitbit_parse_calories: + input: + data = expand("data/raw/{{pid}}/fitbit_calories_{fitbit_data_type}_raw.csv", fitbit_data_type = (["json"] if config["FITBIT_CALORIES"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"])) + params: + table = config["FITBIT_CALORIES"]["TABLE"], + table_format = config["FITBIT_CALORIES"]["TABLE_FORMAT"] + output: + summary_data = "data/raw/{pid}/fitbit_calories_summary_parsed.csv", + intraday_data = "data/raw/{pid}/fitbit_calories_intraday_parsed.csv" + script: + "../src/data/fitbit_parse_calories.py" + +rule fitbit_parse_sleep: + input: + data = expand("data/raw/{{pid}}/fitbit_sleep_{fitbit_data_type}_raw.csv", fitbit_data_type = (["json"] if config["FITBIT_SLEEP"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"])) + params: + table = config["FITBIT_SLEEP"]["TABLE"], + table_format = config["FITBIT_SLEEP"]["TABLE_FORMAT"] + output: + summary_data = "data/raw/{pid}/fitbit_sleep_summary_parsed_episodes.csv", + intraday_data = "data/raw/{pid}/fitbit_sleep_intraday_parsed.csv" + script: + "../src/data/fitbit_parse_sleep.py" + +rule fitbit_readable_datetime: + input: + sensor_input = "data/raw/{pid}/fitbit_{sensor}_{fitbit_data_type}_parsed.csv", + day_segments = "data/interim/day_segments/{pid}_day_segments.csv" + params: + fixed_timezone = "UTC", + day_segments_type = config["DAY_SEGMENTS"]["TYPE"], + include_past_periodic_segments = config["DAY_SEGMENTS"]["INCLUDE_PAST_PERIODIC_SEGMENTS"] + output: + "data/raw/{pid}/fitbit_{sensor}_{fitbit_data_type}_parsed_with_datetime.csv" + script: + "../src/data/readable_datetime.R" diff --git a/src/data/fitbit_parse_sensors/fitbit_parse_calories.py b/src/data/fitbit_parse_calories.py similarity index 51% rename from src/data/fitbit_parse_sensors/fitbit_parse_calories.py rename to src/data/fitbit_parse_calories.py index 08a0ed53..1cff0344 100644 --- a/src/data/fitbit_parse_sensors/fitbit_parse_calories.py +++ b/src/data/fitbit_parse_calories.py @@ -5,11 +5,9 @@ from datetime import datetime CALORIES_INTRADAY_COLUMNS = ("device_id", "level", "mets", "value", - "local_date_time", "local_date", "local_month", "local_day", - "local_day_of_week", "local_time", "local_hour", "local_minute", - "local_day_segment") + "local_date_time", "timestamp") -def parseCaloriesData(calories_data, HOUR2EPOCH): +def parseCaloriesData(calories_data): if calories_data.empty: return pd.DataFrame(), pd.DataFrame(columns=CALORIES_INTRADAY_COLUMNS) device_id = calories_data["device_id"].iloc[0] @@ -26,10 +24,23 @@ def parseCaloriesData(calories_data, HOUR2EPOCH): row_intraday = (device_id, data["level"], data["mets"], data["value"], - d_datetime, d_datetime.date(), d_datetime.month, d_datetime.day, - d_datetime.weekday(), d_datetime.time(), d_datetime.hour, d_datetime.minute, - HOUR2EPOCH[d_datetime.hour]) + d_datetime, 0) records_intraday.append(row_intraday) - return pd.DataFrame(), pd.DataFrame(data=records_intraday, columns=CALORIES_INTRADAY_COLUMNS) + return pd.DataFrame(data=[], columns=["local_date_time"]), pd.DataFrame(data=records_intraday, columns=CALORIES_INTRADAY_COLUMNS) + +table_format = snakemake.params["table_format"] + +if table_format == "JSON": + json_raw = pd.read_csv(snakemake.input[0]) + summary, intraday = parseCaloriesData(json_raw) +elif table_format == "CSV": + summary = pd.read_csv(snakemake.input[0]) + intraday = pd.read_csv(snakemake.input[1]) + +summary["timestamp"] = (summary["local_date_time"] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s') * 1000 +intraday["timestamp"] = (intraday["local_date_time"] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s') * 1000 + +summary.to_csv(snakemake.output["summary_data"], index=False) +intraday.to_csv(snakemake.output["intraday_data"], index=False) \ No newline at end of file diff --git a/src/data/fitbit_parse_sensors/fitbit_parse_heartrate.py b/src/data/fitbit_parse_heartrate.py similarity index 78% rename from src/data/fitbit_parse_sensors/fitbit_parse_heartrate.py rename to src/data/fitbit_parse_heartrate.py index 9271025c..490c27a1 100644 --- a/src/data/fitbit_parse_sensors/fitbit_parse_heartrate.py +++ b/src/data/fitbit_parse_heartrate.py @@ -1,10 +1,12 @@ -import json +import json, sys import pandas as pd -from datetime import datetime +from datetime import datetime, timezone +from math import trunc HR_SUMMARY_COLUMNS = ("device_id", - "local_date", + "local_date_time", + "timestamp", "heartrate_daily_restinghr", "heartrate_daily_caloriesoutofrange", "heartrate_daily_caloriesfatburn", @@ -12,10 +14,10 @@ HR_SUMMARY_COLUMNS = ("device_id", "heartrate_daily_caloriespeak") HR_INTRADAY_COLUMNS = ("device_id", - "heartrate", "heartrate_zone", - "local_date_time", "local_date", "local_month", "local_day", - "local_day_of_week", "local_time", "local_hour", "local_minute", - "local_day_segment") + "heartrate", + "heartrate_zone", + "local_date_time", + "timestamp") def parseHeartrateZones(heartrate_data): # Get the range of heartrate zones: outofrange, fatburn, cardio, peak @@ -58,6 +60,7 @@ def parseHeartrateSummaryData(record_summary, device_id, curr_date): row_summary = (device_id, curr_date, + 0, d_resting_heartrate, d_calories_outofrange, d_calories_fatburn, @@ -68,7 +71,7 @@ def parseHeartrateSummaryData(record_summary, device_id, curr_date): -def parseHeartrateIntradayData(records_intraday, dataset, device_id, curr_date, heartrate_zones_range, HOUR2EPOCH): +def parseHeartrateIntradayData(records_intraday, dataset, device_id, curr_date, heartrate_zones_range): for data in dataset: d_time = datetime.strptime(data["time"], '%H:%M:%S').time() d_datetime = datetime.combine(curr_date, d_time) @@ -83,15 +86,16 @@ def parseHeartrateIntradayData(records_intraday, dataset, device_id, curr_date, row_intraday = (device_id, d_hr, d_hrzone, - d_datetime, d_datetime.date(), d_datetime.month, d_datetime.day, - d_datetime.weekday(), d_datetime.time(), d_datetime.hour, d_datetime.minute, - HOUR2EPOCH[d_datetime.hour]) + d_datetime, + 0) records_intraday.append(row_intraday) return records_intraday +# def append_timestamp(data): -def parseHeartrateData(heartrate_data, HOUR2EPOCH): + +def parseHeartrateData(heartrate_data): if heartrate_data.empty: return pd.DataFrame(columns=HR_SUMMARY_COLUMNS), pd.DataFrame(columns=HR_INTRADAY_COLUMNS) device_id = heartrate_data["device_id"].iloc[0] @@ -109,6 +113,21 @@ def parseHeartrateData(heartrate_data, HOUR2EPOCH): records_summary.append(row_summary) dataset = record["activities-heart-intraday"]["dataset"] - records_intraday = parseHeartrateIntradayData(records_intraday, dataset, device_id, curr_date, heartrate_zones_range, HOUR2EPOCH) + records_intraday = parseHeartrateIntradayData(records_intraday, dataset, device_id, curr_date, heartrate_zones_range) return pd.DataFrame(data=records_summary, columns=HR_SUMMARY_COLUMNS), pd.DataFrame(data=records_intraday, columns=HR_INTRADAY_COLUMNS) + +table_format = snakemake.params["table_format"] + +if table_format == "JSON": + json_raw = pd.read_csv(snakemake.input[0]) + summary, intraday = parseHeartrateData(json_raw) +elif table_format == "CSV": + summary = pd.read_csv(snakemake.input[0]) + intraday = pd.read_csv(snakemake.input[1]) + +summary["timestamp"] = (summary["local_date_time"] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s') * 1000 +intraday["timestamp"] = (intraday["local_date_time"] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s') * 1000 + +summary.to_csv(snakemake.output["summary_data"], index=False) +intraday.to_csv(snakemake.output["intraday_data"], index=False) \ No newline at end of file diff --git a/src/data/fitbit_parse_sensors/fitbit_parse_steps.py b/src/data/fitbit_parse_sensors/fitbit_parse_steps.py deleted file mode 100644 index f4bfeef3..00000000 --- a/src/data/fitbit_parse_sensors/fitbit_parse_steps.py +++ /dev/null @@ -1,35 +0,0 @@ -import json -import pandas as pd -from datetime import datetime - -STEPS_INTRADAY_COLUMNS = ("device_id", - "steps", - "local_date_time", "local_date", "local_month", "local_day", - "local_day_of_week", "local_time", "local_hour", "local_minute", - "local_day_segment") - - -def parseStepsData(steps_data, HOUR2EPOCH): - if steps_data.empty: - return pd.DataFrame(), pd.DataFrame(columns=STEPS_INTRADAY_COLUMNS) - device_id = steps_data["device_id"].iloc[0] - records_intraday = [] - # Parse JSON into individual records - for record in steps_data.fitbit_data: - record = json.loads(record) # Parse text into JSON - curr_date = datetime.strptime( - record["activities-steps"][0]["dateTime"], "%Y-%m-%d") - dataset = record["activities-steps-intraday"]["dataset"] - for data in dataset: - d_time = datetime.strptime(data["time"], '%H:%M:%S').time() - d_datetime = datetime.combine(curr_date, d_time) - - row_intraday = (device_id, - data["value"], - d_datetime, d_datetime.date(), d_datetime.month, d_datetime.day, - d_datetime.weekday(), d_datetime.time(), d_datetime.hour, d_datetime.minute, - HOUR2EPOCH[d_datetime.hour]) - - records_intraday.append(row_intraday) - - return pd.DataFrame(), pd.DataFrame(data=records_intraday, columns=STEPS_INTRADAY_COLUMNS) diff --git a/src/data/fitbit_parse_sensors/fitbit_parse_sleep.py b/src/data/fitbit_parse_sleep.py similarity index 78% rename from src/data/fitbit_parse_sensors/fitbit_parse_sleep.py rename to src/data/fitbit_parse_sleep.py index 8f1450c4..622783f9 100644 --- a/src/data/fitbit_parse_sensors/fitbit_parse_sleep.py +++ b/src/data/fitbit_parse_sleep.py @@ -12,14 +12,13 @@ SLEEP_SUMMARY_COLUMNS_V1_2 = ("device_id", "efficiency", "minutes_after_wakeup", "minutes_asleep", "minutes_awake", "minutes_to_fall_asleep", "minutes_in_bed", "is_main_sleep", "type", "local_start_date_time", "local_end_date_time", - "local_start_date", "local_end_date", - "local_start_day_segment", "local_end_day_segment") + "start_timestamp", "end_timestamp") SLEEP_SUMMARY_COLUMNS_V1 = SLEEP_SUMMARY_COLUMNS_V1_2 + ("count_awake", "duration_awake", "count_awakenings", "count_restless", "duration_restless") SLEEP_INTRADAY_COLUMNS = ("device_id", # For "classic" type, original_level is one of {"awake", "restless", "asleep"} # For "stages" type, original_level is one of {"wake", "deep", "light", "rem"} - "original_level", + "level", # For "classic" type, unified_level is one of {0, 1} where 0: awake {"awake" + "restless"}, 1: asleep {"asleep"} # For "stages" type, unified_level is one of {0, 1} where 0: awake {"wake"}, 1: asleep {"deep" + "light" + "rem"} "unified_level", @@ -27,9 +26,8 @@ SLEEP_INTRADAY_COLUMNS = ("device_id", "is_main_sleep", # one of {"classic", "stages"} "type", - "local_date_time", "local_date", "local_month", "local_day", - "local_day_of_week", "local_time", "local_hour", "local_minute", - "local_day_segment") + "local_date_time", + "timestamp") def mergeLongAndShortData(data_summary): longData = pd.DataFrame(columns=['dateTime', 'level', 'seconds']) @@ -76,7 +74,7 @@ def classicData1min(data_summary): # print(dataList) return dataList # Parse one record for sleep API version 1 -def parseOneRecordForV1(record, device_id, d_is_main_sleep, records_summary, records_intraday, HOUR2EPOCH): +def parseOneRecordForV1(record, device_id, d_is_main_sleep, records_summary, records_intraday): # Summary data sleep_record_type = "classic" @@ -89,7 +87,7 @@ def parseOneRecordForV1(record, device_id, d_is_main_sleep, records_summary, rec d_is_main_sleep, sleep_record_type, d_start_datetime, d_end_datetime, d_start_datetime.date(), d_end_datetime.date(), - HOUR2EPOCH[d_start_datetime.hour], HOUR2EPOCH[d_end_datetime.hour], + 0,0, record["awakeCount"], record["awakeDuration"], record["awakeningsCount"], record["restlessCount"], record["restlessDuration"]) @@ -111,23 +109,17 @@ def parseOneRecordForV1(record, device_id, d_is_main_sleep, records_summary, rec # (1: "asleep", 2: "restless", 3: "awake") d_original_level = SLEEP_CODE2LEVEL[int(data["value"])-1] - # unified_level summarises original_level (we came up with this classification) - # 0 is awake, 1 is asleep - # {"awake" + "restless"} are set to 0 and {"asleep"} is set to 1 - d_unified_level = 0 if d_original_level == "awake" or d_original_level == "restless" else 1 row_intraday = (device_id, - d_original_level, d_unified_level, d_is_main_sleep, sleep_record_type, - d_datetime, d_datetime.date(), d_datetime.month, d_datetime.day, - d_datetime.weekday(), d_datetime.time(), d_datetime.hour, d_datetime.minute, - HOUR2EPOCH[d_datetime.hour]) + d_original_level, -1, d_is_main_sleep, sleep_record_type, + d_datetime, 0) records_intraday.append(row_intraday) return records_summary, records_intraday # Parse one record for sleep API version 1.2 -def parseOneRecordForV12(record, device_id, d_is_main_sleep, records_summary, records_intraday, HOUR2EPOCH): +def parseOneRecordForV12(record, device_id, d_is_main_sleep, records_summary, records_intraday): # Summary data sleep_record_type = record['type'] @@ -139,8 +131,7 @@ def parseOneRecordForV12(record, device_id, d_is_main_sleep, records_summary, re record["minutesAfterWakeup"], record["minutesAsleep"], record["minutesAwake"], record["minutesToFallAsleep"], record["timeInBed"], d_is_main_sleep, sleep_record_type, d_start_datetime, d_end_datetime, - d_start_datetime.date(), d_end_datetime.date(), - HOUR2EPOCH[d_start_datetime.hour], HOUR2EPOCH[d_end_datetime.hour]) + 0,0) records_summary.append(row_summary) if sleep_record_type == 'classic': @@ -160,13 +151,9 @@ def parseOneRecordForV12(record, device_id, d_is_main_sleep, records_summary, re d_original_level = data["level"] - d_unified_level = 0 if d_original_level == "awake" or d_original_level == "restless" else 1 - row_intraday = (device_id, - d_original_level, d_unified_level, d_is_main_sleep, sleep_record_type, - d_datetime, d_datetime.date(), d_datetime.month, d_datetime.day, - d_datetime.weekday(), d_datetime.time(), d_datetime.hour, d_datetime.minute, - HOUR2EPOCH[d_datetime.hour]) + d_original_level, -1, d_is_main_sleep, sleep_record_type, + d_datetime, 0) records_intraday.append(row_intraday) else: ## for sleep type "stages" @@ -185,13 +172,9 @@ def parseOneRecordForV12(record, device_id, d_is_main_sleep, records_summary, re d_original_level = data[1] - d_unified_level = 1 if d_original_level == "deep" or d_original_level == "light" or d_original_level == "rem" else 0 - row_intraday = (device_id, - d_original_level, d_unified_level, d_is_main_sleep, sleep_record_type, - d_datetime, d_datetime.date(), d_datetime.month, d_datetime.day, - d_datetime.weekday(), d_datetime.time(), d_datetime.hour, d_datetime.minute, - HOUR2EPOCH[d_datetime.hour]) + d_original_level, -1, d_is_main_sleep, sleep_record_type, + d_datetime, 0) records_intraday.append(row_intraday) @@ -199,7 +182,7 @@ def parseOneRecordForV12(record, device_id, d_is_main_sleep, records_summary, re -def parseSleepData(sleep_data, HOUR2EPOCH): +def parseSleepData(sleep_data): SLEEP_SUMMARY_COLUMNS = SLEEP_SUMMARY_COLUMNS_V1_2 if sleep_data.empty: return pd.DataFrame(columns=SLEEP_SUMMARY_COLUMNS), pd.DataFrame(columns=SLEEP_INTRADAY_COLUMNS) @@ -214,10 +197,29 @@ def parseSleepData(sleep_data, HOUR2EPOCH): # For sleep API version 1 if "awakeCount" in record: SLEEP_SUMMARY_COLUMNS = SLEEP_SUMMARY_COLUMNS_V1 - records_summary, records_intraday = parseOneRecordForV1(record, device_id, d_is_main_sleep, records_summary, records_intraday, HOUR2EPOCH) + records_summary, records_intraday = parseOneRecordForV1(record, device_id, d_is_main_sleep, records_summary, records_intraday) # For sleep API version 1.2 else: SLEEP_SUMMARY_COLUMNS = SLEEP_SUMMARY_COLUMNS_V1_2 - records_summary, records_intraday = parseOneRecordForV12(record, device_id, d_is_main_sleep, records_summary, records_intraday, HOUR2EPOCH) + records_summary, records_intraday = parseOneRecordForV12(record, device_id, d_is_main_sleep, records_summary, records_intraday) return pd.DataFrame(data=records_summary, columns=SLEEP_SUMMARY_COLUMNS), pd.DataFrame(data=records_intraday, columns=SLEEP_INTRADAY_COLUMNS) + +table_format = snakemake.params["table_format"] + +if table_format == "JSON": + json_raw = pd.read_csv(snakemake.input[0]) + summary, intraday = parseSleepData(json_raw) +elif table_format == "CSV": + summary = pd.read_csv(snakemake.input[0]) + intraday = pd.read_csv(snakemake.input[1]) + +summary["start_timestamp"] = (summary["local_start_date_time"] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s') * 1000 +summary["end_timestamp"] = (summary["local_end_date_time"] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s') * 1000 +intraday["timestamp"] = (intraday["local_date_time"] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s') * 1000 + +# Unifying level +intraday["unified_level"] = np.where(intraday["level"].isin(["awake", "wake", "restless"]), 0, 1) + +summary.to_csv(snakemake.output["summary_data"], index=False) +intraday.to_csv(snakemake.output["intraday_data"], index=False) \ No newline at end of file diff --git a/src/data/fitbit_parse_steps.py b/src/data/fitbit_parse_steps.py new file mode 100644 index 00000000..7b765c18 --- /dev/null +++ b/src/data/fitbit_parse_steps.py @@ -0,0 +1,49 @@ +import json +import pandas as pd +from datetime import datetime, timezone +from math import trunc + +STEPS_INTRADAY_COLUMNS = ("device_id", + "steps", + "local_date_time", + "timestamp") + + +def parseStepsData(steps_data): + if steps_data.empty: + return pd.DataFrame(), pd.DataFrame(columns=STEPS_INTRADAY_COLUMNS) + device_id = steps_data["device_id"].iloc[0] + records_intraday = [] + # Parse JSON into individual records + for record in steps_data.fitbit_data: + record = json.loads(record) # Parse text into JSON + curr_date = datetime.strptime( + record["activities-steps"][0]["dateTime"], "%Y-%m-%d") + dataset = record["activities-steps-intraday"]["dataset"] + for data in dataset: + d_time = datetime.strptime(data["time"], '%H:%M:%S').time() + d_datetime = datetime.combine(curr_date, d_time) + + row_intraday = (device_id, + data["value"], + d_datetime, + 0) + + records_intraday.append(row_intraday) + + return pd.DataFrame(data=[], columns=["local_date_time"]), pd.DataFrame(data=records_intraday, columns=STEPS_INTRADAY_COLUMNS) + +table_format = snakemake.params["table_format"] + +if table_format == "JSON": + json_raw = pd.read_csv(snakemake.input[0]) + summary, intraday = parseStepsData(json_raw) +elif table_format == "CSV": + summary = pd.read_csv(snakemake.input[0]) + intraday = pd.read_csv(snakemake.input[1]) + +summary["timestamp"] = (summary["local_date_time"] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s') * 1000 +intraday["timestamp"] = (intraday["local_date_time"] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s') * 1000 + +summary.to_csv(snakemake.output["summary_data"], index=False) +intraday.to_csv(snakemake.output["intraday_data"], index=False) \ No newline at end of file