Fix bugs in Fitbit data parsing

- Fix the script that was breaking with an empty file
- Fix the script that was breaking when start/end dates were empty
- Ambigous and nonexistent DST times are handled now
- Remove unnecessary else clause
pull/111/head
JulioV 2021-01-06 11:43:01 -05:00
parent 5203aa60d1
commit 4926497ae2
5 changed files with 40 additions and 38 deletions

View File

@ -5,6 +5,8 @@
- Update CI to create a release on a tagged push that passes the tests
- Clarify in DB credential configuration that we only support MySQL
- Add Windows installation instructions
- Fix bugs in the create_participants_file script
- Fix bugs in Fitbit data parsing.
## v0.3.1
- Update installation docs for RAPIDS' docker container
- Fix example analysis use of accelerometer data in a plot

View File

@ -41,10 +41,14 @@ elif table_format == "CSV":
summary = pd.read_csv(snakemake.input[0], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
intraday = pd.read_csv(snakemake.input[1], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
# if not pd.isnull(local_start_date) and not pd.isnull(local_end_date):
if summary.shape[0] > 0:
summary["timestamp"] = summary["local_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6
summary["timestamp"] = summary["local_date_time"].dt.tz_localize(timezone, ambiguous=False, nonexistent="NaT").dropna().astype(np.int64) // 10**6
summary.dropna(subset=['timestamp'], inplace=True)
if intraday.shape[0] > 0:
intraday["timestamp"] = intraday["local_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6
intraday["timestamp"] = intraday["local_date_time"].dt.tz_localize(timezone, ambiguous=False, nonexistent="NaT").dropna().astype(np.int64) // 10**6
intraday.dropna(subset=['timestamp'], inplace=True)
summary.to_csv(snakemake.output["summary_data"], index=False)
intraday.to_csv(snakemake.output["intraday_data"], index=False)

View File

@ -97,7 +97,11 @@ def parseHeartrateIntradayData(records_intraday, dataset, device_id, curr_date,
def parseHeartrateData(heartrate_data, fitbit_data_type):
if heartrate_data.empty:
return pd.DataFrame(columns=HR_SUMMARY_COLUMNS), pd.DataFrame(columns=HR_INTRADAY_COLUMNS)
if fitbit_data_type == "summary":
return pd.DataFrame(columns=HR_SUMMARY_COLUMNS)
elif fitbit_data_type == "intraday":
return pd.DataFrame(columns=HR_INTRADAY_COLUMNS)
device_id = heartrate_data["device_id"].iloc[0]
records_summary, records_intraday = [], []
@ -121,8 +125,6 @@ def parseHeartrateData(heartrate_data, fitbit_data_type):
parsed_data = pd.DataFrame(data=records_summary, columns=HR_SUMMARY_COLUMNS)
elif fitbit_data_type == "intraday":
parsed_data = pd.DataFrame(data=records_intraday, columns=HR_INTRADAY_COLUMNS)
else:
raise ValueError("fitbit_data_type can only be one of ['summary', 'intraday'].")
return parsed_data
@ -145,9 +147,11 @@ else:
raise ValueError("column_format can only be one of ['JSON', 'PLAIN_TEXT'].")
# Only keep dates in the range of [local_start_date, local_end_date)
parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)]
if not pd.isnull(local_start_date) and not pd.isnull(local_end_date):
parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)]
if parsed_data.shape[0] > 0:
parsed_data["timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6
parsed_data["timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone, ambiguous=False, nonexistent="NaT").dropna().astype(np.int64) // 10**6
parsed_data.dropna(subset=['timestamp'], inplace=True)
parsed_data.to_csv(snakemake.output[0], index=False)

View File

@ -188,7 +188,10 @@ def parseOneRecordForV12(record, device_id, d_is_main_sleep, records_summary, re
def parseSleepData(sleep_data, fitbit_data_type):
SLEEP_SUMMARY_COLUMNS = SLEEP_SUMMARY_COLUMNS_V1_2
if sleep_data.empty:
return pd.DataFrame(columns=SLEEP_SUMMARY_COLUMNS), pd.DataFrame(columns=SLEEP_INTRADAY_COLUMNS)
if fitbit_data_type == "summary":
return pd.DataFrame(columns=SLEEP_SUMMARY_COLUMNS)
elif fitbit_data_type == "intraday":
return pd.DataFrame(columns=SLEEP_INTRADAY_COLUMNS)
device_id = sleep_data["device_id"].iloc[0]
records_summary, records_intraday = [], []
# Parse JSON into individual records
@ -210,13 +213,9 @@ def parseSleepData(sleep_data, fitbit_data_type):
parsed_data = pd.DataFrame(data=records_summary, columns=SLEEP_SUMMARY_COLUMNS)
elif fitbit_data_type == "intraday":
parsed_data = pd.DataFrame(data=records_intraday, columns=SLEEP_INTRADAY_COLUMNS)
else:
raise ValueError("fitbit_data_type can only be one of ['summary', 'intraday'].")
return parsed_data
timezone = snakemake.params["timezone"]
column_format = snakemake.params["column_format"]
fitbit_data_type = snakemake.params["fitbit_data_type"]
@ -235,31 +234,26 @@ elif column_format == "PLAIN_TEXT":
parsed_data = pd.read_csv(snakemake.input["raw_data"], parse_dates=["local_start_date_time", "local_end_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
elif fitbit_data_type == "intraday":
parsed_data = pd.read_csv(snakemake.input["raw_data"], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
else:
raise ValueError("fitbit_data_type can only be one of ['summary', 'intraday'].")
else:
raise ValueError("column_format can only be one of ['JSON', 'PLAIN_TEXT'].")
if parsed_data.shape[0] > 0 and fitbit_data_type == "summary":
if sleep_episode_timestamp != "start" and sleep_episode_timestamp != "end":
raise ValueError("SLEEP_EPISODE_TIMESTAMP can only be one of ['start', 'end'].")
# Column name to be considered as the event datetime
datetime_column = "local_" + sleep_episode_timestamp + "_date_time"
# Only keep dates in the range of [local_start_date, local_end_date)
parsed_data = parsed_data.loc[(parsed_data[datetime_column] >= local_start_date) & (parsed_data[datetime_column] < local_end_date)]
# Convert datetime to timestamp
parsed_data["timestamp"] = parsed_data[datetime_column].dt.tz_localize(timezone).astype(np.int64) // 10**6
# Drop useless columns: local_start_date_time and local_end_date_time
if not pd.isnull(local_start_date) and not pd.isnull(local_end_date):
parsed_data = parsed_data.loc[(parsed_data[datetime_column] >= local_start_date) & (parsed_data[datetime_column] < local_end_date)]
parsed_data["timestamp"] = parsed_data[datetime_column].dt.tz_localize(timezone, ambiguous=False, nonexistent="NaT").dropna().astype(np.int64) // 10**6
parsed_data.dropna(subset=['timestamp'], inplace=True)
parsed_data.drop(["local_start_date_time", "local_end_date_time"], axis = 1, inplace=True)
if parsed_data.shape[0] > 0 and fitbit_data_type == "intraday":
# Only keep dates in the range of [local_start_date, local_end_date)
parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)]
# Convert datetime to timestamp
parsed_data["timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6
# Unifying level
if not pd.isnull(local_start_date) and not pd.isnull(local_end_date):
parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)]
parsed_data["timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone, ambiguous=False, nonexistent="NaT").dropna().astype(np.int64) // 10**6
parsed_data.dropna(subset=['timestamp'], inplace=True)
parsed_data["unified_level"] = np.where(parsed_data["level"].isin(["awake", "wake", "restless"]), 0, 1)
parsed_data.to_csv(snakemake.output[0], index=False)

View File

@ -9,9 +9,10 @@ STEPS_COLUMNS = ("device_id", "steps", "local_date_time", "timestamp")
def parseStepsData(steps_data, fitbit_data_type):
if steps_data.empty:
return pd.DataFrame(), pd.DataFrame(columns=STEPS_INTRADAY_COLUMNS)
return pd.DataFrame(columns=STEPS_COLUMNS)
device_id = steps_data["device_id"].iloc[0]
records_summary, records_intraday = [], []
records = []
# Parse JSON into individual records
for record in steps_data.fitbit_data:
@ -26,7 +27,7 @@ def parseStepsData(steps_data, fitbit_data_type):
curr_date,
0)
records_summary.append(row_summary)
records.append(row_summary)
# Parse intraday data
if fitbit_data_type == "intraday":
@ -40,14 +41,9 @@ def parseStepsData(steps_data, fitbit_data_type):
d_datetime,
0)
records_intraday.append(row_intraday)
records.append(row_intraday)
if fitbit_data_type == "summary":
parsed_data = pd.DataFrame(data=records_summary, columns=STEPS_COLUMNS)
elif fitbit_data_type == "intraday":
parsed_data = pd.DataFrame(data=records_intraday, columns=STEPS_COLUMNS)
else:
raise ValueError("fitbit_data_type can only be one of ['summary', 'intraday'].")
parsed_data = pd.DataFrame(data=records, columns=STEPS_COLUMNS)
return parsed_data
@ -71,9 +67,11 @@ else:
raise ValueError("column_format can only be one of ['JSON', 'PLAIN_TEXT'].")
# Only keep dates in the range of [local_start_date, local_end_date)
parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)]
if not pd.isnull(local_start_date) and not pd.isnull(local_end_date):
parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)]
if parsed_data.shape[0] > 0:
parsed_data["timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6
parsed_data["timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone, ambiguous=False, nonexistent="NaT").dropna().astype(np.int64) // 10**6
parsed_data.dropna(subset=['timestamp'], inplace=True)
parsed_data.to_csv(snakemake.output[0], index=False)