rapids/src/data/fitbit_parse_heartrate.py

154 lines
6.5 KiB
Python

import yaml, json, sys
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from math import trunc
HR_SUMMARY_COLUMNS = ("device_id",
"local_date_time",
"timestamp",
"heartrate_daily_restinghr",
"heartrate_daily_caloriesoutofrange",
"heartrate_daily_caloriesfatburn",
"heartrate_daily_caloriescardio",
"heartrate_daily_caloriespeak")
HR_INTRADAY_COLUMNS = ("device_id",
"heartrate",
"heartrate_zone",
"local_date_time",
"timestamp")
def parseHeartrateZones(heartrate_data):
# Get the range of heartrate zones: outofrange, fatburn, cardio, peak
# refer to: https://help.fitbit.com/articles/en_US/Help_article/1565
heartrate_fitbit_data = json.loads(heartrate_data["fitbit_data"].iloc[0])["activities-heart"][0]
# API Version X: not sure the exact version
if "heartRateZones" in heartrate_fitbit_data:
heartrate_zones = heartrate_fitbit_data["heartRateZones"]
# API VERSION Y: not sure the exact version
elif "value" in heartrate_fitbit_data:
heartrate_zones = heartrate_fitbit_data["value"]["heartRateZones"]
else:
raise ValueError("Heartrate zone are stored in an unkown format, this could mean Fitbit's heartrate API changed")
heartrate_zones_range = {}
for hrzone in heartrate_zones:
heartrate_zones_range[hrzone["name"].lower().replace(" ", "")] = [hrzone["min"], hrzone["max"]]
return heartrate_zones_range
def parseHeartrateSummaryData(record_summary, device_id, curr_date):
# API Version X: not sure the exact version
if "heartRateZones" in record_summary:
heartrate_zones = record_summary["heartRateZones"]
d_resting_heartrate = record_summary["value"] if "value" in record_summary else None
# API VERSION Y: not sure the exact version
elif "value" in record_summary:
heartrate_zones = record_summary["value"]["heartRateZones"]
d_resting_heartrate = record_summary["value"]["restingHeartRate"] if "restingHeartRate" in record_summary["value"] else None
else:
ValueError("Heartrate zone are stored in an unkown format, this could mean Fitbit's heartrate API changed")
if "caloriesOut" in heartrate_zones[0]:
d_calories_outofrange = heartrate_zones[0]["caloriesOut"]
d_calories_fatburn = heartrate_zones[1]["caloriesOut"]
d_calories_cardio = heartrate_zones[2]["caloriesOut"]
d_calories_peak = heartrate_zones[3]["caloriesOut"]
else:
d_calories_outofrange, d_calories_fatburn, d_calories_cardio, d_calories_peak = None, None, None, None
row_summary = (device_id,
curr_date,
0,
d_resting_heartrate,
d_calories_outofrange,
d_calories_fatburn,
d_calories_cardio,
d_calories_peak)
return row_summary
def parseHeartrateIntradayData(records_intraday, dataset, device_id, curr_date, heartrate_zones_range):
for data in dataset:
d_time = datetime.strptime(data["time"], '%H:%M:%S').time()
d_datetime = datetime.combine(curr_date, d_time)
d_hr = data["value"]
# Get heartrate zone by range: min <= heartrate < max
d_hrzone = None
for hrzone, hrrange in heartrate_zones_range.items():
if d_hr >= hrrange[0] and d_hr < hrrange[1]:
d_hrzone = hrzone
break
row_intraday = (device_id,
d_hr, d_hrzone,
d_datetime,
0)
records_intraday.append(row_intraday)
return records_intraday
def parseHeartrateData(heartrate_data, fitbit_data_type):
if heartrate_data.empty:
return pd.DataFrame(columns=HR_SUMMARY_COLUMNS), pd.DataFrame(columns=HR_INTRADAY_COLUMNS)
device_id = heartrate_data["device_id"].iloc[0]
records_summary, records_intraday = [], []
heartrate_zones_range = parseHeartrateZones(heartrate_data)
# Parse JSON into individual records
for record in heartrate_data.fitbit_data:
record = json.loads(record) # Parse text into JSON
curr_date = datetime.strptime(record["activities-heart"][0]["dateTime"], "%Y-%m-%d")
if fitbit_data_type == "summary":
record_summary = record["activities-heart"][0]
row_summary = parseHeartrateSummaryData(record_summary, device_id, curr_date)
records_summary.append(row_summary)
if fitbit_data_type == "intraday":
dataset = record["activities-heart-intraday"]["dataset"]
records_intraday = parseHeartrateIntradayData(records_intraday, dataset, device_id, curr_date, heartrate_zones_range)
if fitbit_data_type == "summary":
parsed_data = pd.DataFrame(data=records_summary, columns=HR_SUMMARY_COLUMNS)
elif fitbit_data_type == "intraday":
parsed_data = pd.DataFrame(data=records_intraday, columns=HR_INTRADAY_COLUMNS)
else:
raise ValueError("fitbit_data_type can only be one of ['summary', 'intraday'].")
return parsed_data
timezone = snakemake.params["timezone"]
column_format = snakemake.params["column_format"]
fitbit_data_type = snakemake.params["fitbit_data_type"]
with open(snakemake.input["participant_file"], "r", encoding="utf-8") as f:
participant_file = yaml.safe_load(f)
local_start_date = pd.Timestamp(participant_file["FITBIT"]["START_DATE"])
local_end_date = pd.Timestamp(participant_file["FITBIT"]["END_DATE"]) + pd.DateOffset(1)
if column_format == "JSON":
json_raw = pd.read_csv(snakemake.input["raw_data"])
parsed_data = parseHeartrateData(json_raw, fitbit_data_type)
elif column_format == "PLAIN_TEXT":
parsed_data = pd.read_csv(snakemake.input["raw_data"], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
else:
raise ValueError("column_format can only be one of ['JSON', 'PLAIN_TEXT'].")
# Only keep dates in the range of [local_start_date, local_end_date)
parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)]
if parsed_data.shape[0] > 0:
parsed_data["timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6
parsed_data.to_csv(snakemake.output[0], index=False)