Add sleep intraday to fitbitjson_mysql

pull/128/head
JulioV 2021-03-10 19:38:09 -05:00
parent a420f5ef92
commit 1b0ee4bbf0
10 changed files with 236 additions and 283 deletions

View File

@ -279,13 +279,6 @@ for provider in config["FITBIT_STEPS_INTRADAY"]["PROVIDERS"].keys():
files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
# for provider in config["FITBIT_CALORIES"]["PROVIDERS"].keys():
# if config["FITBIT_CALORIES"]["PROVIDERS"][provider]["COMPUTE"]:
# files_to_compute.extend(expand("data/raw/{pid}/fitbit_calories_{fitbit_data_type}_raw.csv", pid=config["PIDS"], fitbit_data_type=(["json"] if config["FITBIT_CALORIES"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"])))
# files_to_compute.extend(expand("data/raw/{pid}/fitbit_calories_{fitbit_data_type}_parsed.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"]))
# files_to_compute.extend(expand("data/raw/{pid}/fitbit_calories_{fitbit_data_type}_parsed_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"]))
# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))
# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv")
for provider in config["EMPATICA_ACCELEROMETER"]["PROVIDERS"].keys():
if config["EMPATICA_ACCELEROMETER"]["PROVIDERS"][provider]["COMPUTE"]:

View File

@ -448,19 +448,6 @@ FITBIT_STEPS_INTRADAY:
SRC_FOLDER: "rapids" # inside src/features/fitbit_steps_intraday
SRC_LANGUAGE: "python"
# FITBIT_CALORIES:
# TABLE_FORMAT: JSON # JSON or CSV. If your JSON or CSV data are files change [DEVICE_DATA][FITBIT][SOURCE][TYPE] to FILES
# TABLE:
# JSON: fitbit_calories
# CSV:
# SUMMARY: calories_summary
# INTRADAY: calories_intraday
# PROVIDERS:
# RAPIDS:
# COMPUTE: False
# FEATURES: []
########################################################################################################################
# EMPATICA #
########################################################################################################################

View File

@ -146,6 +146,49 @@ If you want RAPIDS to process Fitbit sensor data using this stream, you will nee
|a748ee1a-1d0b-4ae9-9074-279a2b6ba524 |{"sleep":[{"dateOfSleep":"2020-10-11","duration":41640000,"efficiency":89,"endTime":"2020-10-11T11:47:00.000","infoCode":0,"isMainSleep":true,"levels":{"data":[{"dateTime":"2020-10-11T00:12:30.000","level":"wake","seconds":450},{"dateTime":"2020-10-11T00:20:00.000","level":"light","seconds":870},{"dateTime":"2020-10-11T00:34:30.000","level":"wake","seconds":780},...], "summary":{"deep":{"count":4,"minutes":52,"thirtyDayAvgMinutes":62},"light":{"count":32,"minutes":442,"thirtyDayAvgMinutes":364},"rem":{"count":6,"minutes":68,"thirtyDayAvgMinutes":58},"wake":{"count":29,"minutes":132,"thirtyDayAvgMinutes":94}}},"logId":26589710670,"minutesAfterWakeup":1,"minutesAsleep":562,"minutesAwake":132,"minutesToFallAsleep":0,"startTime":"2020-10-11T00:12:30.000","timeInBed":694,"type":"stages"}],"summary":{"stages":{"deep":52,"light":442,"rem":68,"wake":132},"totalMinutesAsleep":562,"totalSleepRecords":1,"totalTimeInBed":694}}
|a748ee1a-1d0b-4ae9-9074-279a2b6ba524 |{"sleep":[{"dateOfSleep":"2020-10-12","duration":28980000,"efficiency":93,"endTime":"2020-10-12T09:34:30.000","infoCode":0,"isMainSleep":true,"levels":{"data":[{"dateTime":"2020-10-12T01:31:00.000","level":"wake","seconds":600},{"dateTime":"2020-10-12T01:41:00.000","level":"light","seconds":60},{"dateTime":"2020-10-12T01:42:00.000","level":"deep","seconds":2340},...], "summary":{"deep":{"count":4,"minutes":63,"thirtyDayAvgMinutes":59},"light":{"count":27,"minutes":257,"thirtyDayAvgMinutes":364},"rem":{"count":5,"minutes":94,"thirtyDayAvgMinutes":58},"wake":{"count":24,"minutes":69,"thirtyDayAvgMinutes":95}}},"logId":26589710673,"minutesAfterWakeup":0,"minutesAsleep":415,"minutesAwake":68,"minutesToFallAsleep":0,"startTime":"2020-10-12T01:31:00.000","timeInBed":483,"type":"stages"}],"summary":{"stages":{"deep":63,"light":257,"rem":94,"wake":69},"totalMinutesAsleep":415,"totalSleepRecords":1,"totalTimeInBed":483}}
??? info "FITBIT_SLEEP_INTRADAY"
**RAPIDS_COLUMN_MAPPINGS**
| RAPIDS column | Stream column |
|-----------------|-----------------|
| TIMESTAMP | FLAG_TO_MUTATE |
| LOCAL_DATE_TIME | FLAG_TO_MUTATE |
| DEVICE_ID | device_id |
| TYPE_EPISODE_ID | FLAG_TO_MUTATE |
| DURATION | FLAG_TO_MUTATE |
| IS_MAIN_SLEEP | FLAG_TO_MUTATE |
| TYPE | FLAG_TO_MUTATE |
| LEVEL | FLAG_TO_MUTATE |
**MUTATION**
- **COLUMN_MAPPINGS**
| Script column | Stream column |
|-----------------|-----------------|
| JSON_FITBIT_COLUMN | fitbit_data |
- **SCRIPTS**
```bash
src/data/streams/mutations/fitbit/parse_sleep_intraday_json.py
```
!!! note
Fitbit API has two versions for sleep data, v1 and v1.2, we support both.
All columns except `DEVICE_ID` are parsed from `JSON_FITBIT_COLUMN`. `JSON_FITBIT_COLUMN` is a string column containing the JSON objects returned by Fitbit's API. See an example of the raw data RAPIDS expects for this data stream:
??? example "Example of the expected raw data"
|device_id |fitbit_data |
|---------------------------------------- |--------------------------------------------------------- |
|a748ee1a-1d0b-4ae9-9074-279a2b6ba524 |{"sleep":[{"dateOfSleep":"2020-10-10","duration":3600000,"efficiency":92,"endTime":"2020-10-10T16:37:00.000","infoCode":2,"isMainSleep":false,"levels":{"data":[{"dateTime":"2020-10-10T15:36:30.000","level":"restless","seconds":60},{"dateTime":"2020-10-10T15:37:30.000","level":"asleep","seconds":660},{"dateTime":"2020-10-10T15:48:30.000","level":"restless","seconds":60},...], "summary":{"asleep":{"count":0,"minutes":56},"awake":{"count":0,"minutes":0},"restless":{"count":3,"minutes":4}}},"logId":26315914306,"minutesAfterWakeup":0,"minutesAsleep":55,"minutesAwake":5,"minutesToFallAsleep":0,"startTime":"2020-10-10T15:36:30.000","timeInBed":60,"type":"classic"},{"dateOfSleep":"2020-10-10","duration":22980000,"efficiency":88,"endTime":"2020-10-10T08:10:00.000","infoCode":0,"isMainSleep":true,"levels":{"data":[{"dateTime":"2020-10-10T01:46:30.000","level":"light","seconds":420},{"dateTime":"2020-10-10T01:53:30.000","level":"deep","seconds":1230},{"dateTime":"2020-10-10T02:14:00.000","level":"light","seconds":360},...], "summary":{"deep":{"count":3,"minutes":92,"thirtyDayAvgMinutes":0},"light":{"count":29,"minutes":193,"thirtyDayAvgMinutes":0},"rem":{"count":4,"minutes":33,"thirtyDayAvgMinutes":0},"wake":{"count":28,"minutes":65,"thirtyDayAvgMinutes":0}}},"logId":26311786557,"minutesAfterWakeup":0,"minutesAsleep":318,"minutesAwake":65,"minutesToFallAsleep":0,"startTime":"2020-10-10T01:46:30.000","timeInBed":383,"type":"stages"}],"summary":{"stages":{"deep":92,"light":193,"rem":33,"wake":65},"totalMinutesAsleep":373,"totalSleepRecords":2,"totalTimeInBed":443}}
|a748ee1a-1d0b-4ae9-9074-279a2b6ba524 |{"sleep":[{"dateOfSleep":"2020-10-11","duration":41640000,"efficiency":89,"endTime":"2020-10-11T11:47:00.000","infoCode":0,"isMainSleep":true,"levels":{"data":[{"dateTime":"2020-10-11T00:12:30.000","level":"wake","seconds":450},{"dateTime":"2020-10-11T00:20:00.000","level":"light","seconds":870},{"dateTime":"2020-10-11T00:34:30.000","level":"wake","seconds":780},...], "summary":{"deep":{"count":4,"minutes":52,"thirtyDayAvgMinutes":62},"light":{"count":32,"minutes":442,"thirtyDayAvgMinutes":364},"rem":{"count":6,"minutes":68,"thirtyDayAvgMinutes":58},"wake":{"count":29,"minutes":132,"thirtyDayAvgMinutes":94}}},"logId":26589710670,"minutesAfterWakeup":1,"minutesAsleep":562,"minutesAwake":132,"minutesToFallAsleep":0,"startTime":"2020-10-11T00:12:30.000","timeInBed":694,"type":"stages"}],"summary":{"stages":{"deep":52,"light":442,"rem":68,"wake":132},"totalMinutesAsleep":562,"totalSleepRecords":1,"totalTimeInBed":694}}
|a748ee1a-1d0b-4ae9-9074-279a2b6ba524 |{"sleep":[{"dateOfSleep":"2020-10-12","duration":28980000,"efficiency":93,"endTime":"2020-10-12T09:34:30.000","infoCode":0,"isMainSleep":true,"levels":{"data":[{"dateTime":"2020-10-12T01:31:00.000","level":"wake","seconds":600},{"dateTime":"2020-10-12T01:41:00.000","level":"light","seconds":60},{"dateTime":"2020-10-12T01:42:00.000","level":"deep","seconds":2340},...], "summary":{"deep":{"count":4,"minutes":63,"thirtyDayAvgMinutes":59},"light":{"count":27,"minutes":257,"thirtyDayAvgMinutes":364},"rem":{"count":5,"minutes":94,"thirtyDayAvgMinutes":58},"wake":{"count":24,"minutes":69,"thirtyDayAvgMinutes":95}}},"logId":26589710673,"minutesAfterWakeup":0,"minutesAsleep":415,"minutesAwake":68,"minutesToFallAsleep":0,"startTime":"2020-10-12T01:31:00.000","timeInBed":483,"type":"stages"}],"summary":{"stages":{"deep":63,"light":257,"rem":94,"wake":69},"totalMinutesAsleep":415,"totalSleepRecords":1,"totalTimeInBed":483}}
??? info "FITBIT_STEPS_SUMMARY"
**RAPIDS_COLUMN_MAPPINGS**

View File

@ -6,7 +6,7 @@ This is a description of the format RAPIDS needs to process data for the followi
| RAPIDS column | Description |
|-----------------|-----------------|
| TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged |
| TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged (automatically created by RAPIDS) |
| LOCAL_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss` |
| DEVICE_ID | A string that uniquely identifies a device |
| HEARTRATE_DAILY_RESTINGHR | Daily resting heartrate |
@ -19,7 +19,7 @@ This is a description of the format RAPIDS needs to process data for the followi
| RAPIDS column | Description |
|-----------------|-----------------|
| TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged |
| TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged (automatically created by RAPIDS) |
| LOCAL_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss` |
| DEVICE_ID | A string that uniquely identifies a device |
| HEARTRATE | Intraday heartrate |
@ -29,7 +29,7 @@ This is a description of the format RAPIDS needs to process data for the followi
| RAPIDS column | Description |
|-----------------|-----------------|
| TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged |
| TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged (automatically created by RAPIDS) |
| LOCAL_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss`, this either is a copy of LOCAL_START_DATE_TIME or LOCAL_END_DATE_TIME depending on which column is used to assign an episode to a specific day|
| LOCAL_START_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss` representing the start of a daily sleep episode |
| LOCAL_END_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss` representing the end of a daily sleep episode|
@ -43,11 +43,24 @@ This is a description of the format RAPIDS needs to process data for the followi
| IS_MAIN_SLEEP | 0 if this episode is a nap, or 1 if it is a main sleep episode|
| TYPE | stages or classic [sleep data](https://dev.fitbit.com/build/reference/web-api/sleep/)|
??? info "FITBIT_SLEEP_INTRADAY"
| RAPIDS column | Description |
|-----------------|-----------------|
| TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged (automatically created by RAPIDS)|
| LOCAL_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss`, this either is a copy of LOCAL_START_DATE_TIME or LOCAL_END_DATE_TIME depending on which column is used to assign an episode to a specific day|
| DEVICE_ID | A string that uniquely identifies a device |
| TYPE_EPISODE_ID | An id for each unique main or nap episode. Main and nap episodes have different levels, each row in this table is one of such levels, so multiple rows can have the same TYPE_EPISODE_ID|
| DURATION | Duration of the episode level in minutes|
| IS_MAIN_SLEEP | 0 if this episode level belongs to a nap, or 1 if it belongs to a main sleep episode|
| TYPE | type of level: stages or classic [sleep data](https://dev.fitbit.com/build/reference/web-api/sleep/)|
| LEVEL | For stages levels one of `wake`, `deep`, `light`, or `rem`. For classic levels one of `awake`, `restless`, and `asleep`|
??? info "FITBIT_STEPS_SUMMARY"
| RAPIDS column | Description |
|-----------------|-----------------|
| TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged |
| TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged (automatically created by RAPIDS) |
| LOCAL_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss` |
| DEVICE_ID | A string that uniquely identifies a device |
| STEPS | Daily step count |
@ -56,7 +69,7 @@ This is a description of the format RAPIDS needs to process data for the followi
| RAPIDS column | Description |
|-----------------|-----------------|
| TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged |
| TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged (automatically created by RAPIDS) |
| LOCAL_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss` |
| DEVICE_ID | A string that uniquely identifies a device |
| STEPS | Intraday step count (usually every minute)|

View File

@ -1,251 +0,0 @@
import json, yaml
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import dateutil.parser
SLEEP_CODE2LEVEL = ["asleep", "restless", "awake"]
SLEEP_SUMMARY_COLUMNS_V1_2 = ("device_id", "efficiency",
"minutes_after_wakeup", "minutes_asleep", "minutes_awake", "minutes_to_fall_asleep", "minutes_in_bed",
"is_main_sleep", "type",
"local_start_date_time", "local_end_date_time",
"timestamp")
SLEEP_SUMMARY_COLUMNS_V1 = SLEEP_SUMMARY_COLUMNS_V1_2 + ("count_awake", "duration_awake", "count_awakenings", "count_restless", "duration_restless")
SLEEP_INTRADAY_COLUMNS = (# Extract "type_episode_id" field based on summary data: start from 0
"type_episode_id",
"duration",
# For "classic" type, original_level is one of {"awake", "restless", "asleep"}
# For "stages" type, original_level is one of {"wake", "deep", "light", "rem"}
"level",
# For "classic" type, unified_level is one of {0, 1} where 0: awake {"awake" + "restless"}, 1: asleep {"asleep"}
# For "stages" type, unified_level is one of {0, 1} where 0: awake {"wake"}, 1: asleep {"deep" + "light" + "rem"}
"unified_level",
# One of {0, 1} where 0: nap, 1: main sleep
"is_main_sleep",
# One of {"classic", "stages"}
"type",
"local_date_time",
"start_timestamp",
"end_timestamp")
def mergeLongAndShortData(data_intraday):
long_data = pd.DataFrame(columns=["dateTime", "level"])
short_data = pd.DataFrame(columns=["dateTime", "level"])
window_length = 30
for data in data_intraday["data"]:
counter = 0
for times in range(data["seconds"] // window_length):
row = {"dateTime": dateutil.parser.parse(data["dateTime"])+timedelta(seconds=counter*window_length), "level": data["level"]}
long_data = long_data.append(row, ignore_index = True)
counter = counter + 1
for data in data_intraday["shortData"]:
counter = 0
for times in range(data["seconds"] // window_length):
row = {"dateTime": dateutil.parser.parse(data["dateTime"])+timedelta(seconds=counter*window_length), "level": data["level"]}
short_data = short_data.append(row, ignore_index = True)
counter = counter + 1
long_data.set_index("dateTime",inplace=True)
short_data.set_index("dateTime",inplace=True)
long_data["level"] = np.where(long_data.index.isin(short_data.index) == True, "wake", long_data["level"])
long_data.reset_index(inplace=True)
return long_data.values.tolist()
# Parse one record for sleep API version 1
def parseOneRecordForV1(record, device_id, type_episode_id, d_is_main_sleep, records_summary, records_intraday, fitbit_data_type):
sleep_record_type = "classic"
d_start_datetime = datetime.strptime(record["startTime"][:18], "%Y-%m-%dT%H:%M:%S")
d_end_datetime = datetime.strptime(record["endTime"][:18], "%Y-%m-%dT%H:%M:%S")
# Summary data
if fitbit_data_type == "summary":
row_summary = (device_id, record["efficiency"],
record["minutesAfterWakeup"], record["minutesAsleep"], record["minutesAwake"], record["minutesToFallAsleep"], record["timeInBed"],
d_is_main_sleep, sleep_record_type,
d_start_datetime, d_end_datetime,
0,
record["awakeCount"], record["awakeDuration"], record["awakeningsCount"],
record["restlessCount"], record["restlessDuration"])
records_summary.append(row_summary)
# Intraday data
if fitbit_data_type == "intraday":
start_date = d_start_datetime.date()
end_date = d_end_datetime.date()
is_before_midnight = True
curr_date = start_date
for data in record["minuteData"]:
# For overnight episodes, use end_date once we are over midnight
d_time = datetime.strptime(data["dateTime"], '%H:%M:%S').time()
if is_before_midnight and d_time.hour == 0:
curr_date = end_date
d_datetime = datetime.combine(curr_date, d_time)
# API 1.2 stores original_level as strings, so we convert original_levels of API 1 to strings too
# (1: "asleep", 2: "restless", 3: "awake")
d_original_level = SLEEP_CODE2LEVEL[int(data["value"])-1]
row_intraday = (type_episode_id, 60,
d_original_level, -1, d_is_main_sleep, sleep_record_type,
d_datetime, 0, 0)
records_intraday.append(row_intraday)
return records_summary, records_intraday
# Parse one record for sleep API version 1.2
def parseOneRecordForV12(record, device_id, type_episode_id, d_is_main_sleep, records_summary, records_intraday, fitbit_data_type):
sleep_record_type = record['type']
d_start_datetime = datetime.strptime(record["startTime"][:18], "%Y-%m-%dT%H:%M:%S")
d_end_datetime = datetime.strptime(record["endTime"][:18], "%Y-%m-%dT%H:%M:%S")
# Summary data
if fitbit_data_type == "summary":
row_summary = (device_id, record["efficiency"],
record["minutesAfterWakeup"], record["minutesAsleep"], record["minutesAwake"], record["minutesToFallAsleep"], record["timeInBed"],
d_is_main_sleep, sleep_record_type,
d_start_datetime, d_end_datetime,
0)
records_summary.append(row_summary)
# Intraday data
if fitbit_data_type == "intraday":
if sleep_record_type == "classic":
for data in record["levels"]["data"]:
d_datetime = dateutil.parser.parse(data["dateTime"])
row_intraday = (type_episode_id, data["seconds"],
data["level"], -1, d_is_main_sleep, sleep_record_type,
d_datetime, 0, 0)
records_intraday.append(row_intraday)
else:
# For sleep type "stages"
for data in mergeLongAndShortData(record["levels"]):
row_intraday = (type_episode_id, 30,
data[1], -1, d_is_main_sleep, sleep_record_type,
data[0], 0, 0)
records_intraday.append(row_intraday)
return records_summary, records_intraday
def parseSleepData(sleep_data, fitbit_data_type):
SLEEP_SUMMARY_COLUMNS = SLEEP_SUMMARY_COLUMNS_V1_2
if sleep_data.empty:
if fitbit_data_type == "summary":
return pd.DataFrame(columns=SLEEP_SUMMARY_COLUMNS)
elif fitbit_data_type == "intraday":
return pd.DataFrame(columns=SLEEP_INTRADAY_COLUMNS)
device_id = sleep_data["device_id"].iloc[0]
records_summary, records_intraday = [], []
type_episode_id = 0
# Parse JSON into individual records
for multi_record in sleep_data.fitbit_data:
for record in json.loads(multi_record)["sleep"]:
# Whether the sleep episode is nap (0) or main sleep (1)
d_is_main_sleep = 1 if record["isMainSleep"] else 0
# For sleep API version 1
if "awakeCount" in record:
SLEEP_SUMMARY_COLUMNS = SLEEP_SUMMARY_COLUMNS_V1
records_summary, records_intraday = parseOneRecordForV1(record, device_id, type_episode_id, d_is_main_sleep, records_summary, records_intraday, fitbit_data_type)
# For sleep API version 1.2
else:
SLEEP_SUMMARY_COLUMNS = SLEEP_SUMMARY_COLUMNS_V1_2
records_summary, records_intraday = parseOneRecordForV12(record, device_id, type_episode_id, d_is_main_sleep, records_summary, records_intraday, fitbit_data_type)
type_episode_id = type_episode_id + 1
if fitbit_data_type == "summary":
parsed_data = pd.DataFrame(data=records_summary, columns=SLEEP_SUMMARY_COLUMNS)
elif fitbit_data_type == "intraday":
parsed_data = pd.DataFrame(data=records_intraday, columns=SLEEP_INTRADAY_COLUMNS)
return parsed_data
def mergeSleepEpisodes(sleep_data, cols_for_groupby):
sleep_episodes = pd.DataFrame(columns=["type_episode_id", "level_episode_id", "level", "unified_level", "is_main_sleep", "type", "start_timestamp", "end_timestamp"])
if not sleep_data.empty:
sleep_data = sleep_data.groupby(by=cols_for_groupby)
sleep_episodes = sleep_data[["start_timestamp"]].first()
sleep_episodes["end_timestamp"] = sleep_data["end_timestamp"].last()
sleep_episodes.reset_index(inplace=True, drop=False)
return sleep_episodes
timezone = snakemake.params["timezone"]
column_format = snakemake.params["column_format"]
fitbit_data_type = snakemake.params["fitbit_data_type"]
sleep_episode_timestamp = snakemake.params["sleep_episode_timestamp"]
with open(snakemake.input["participant_file"], "r", encoding="utf-8") as f:
participant_file = yaml.safe_load(f)
local_start_date = pd.Timestamp(participant_file["FITBIT"]["START_DATE"])
local_end_date = pd.Timestamp(participant_file["FITBIT"]["END_DATE"]) + pd.DateOffset(1)
if column_format == "JSON":
json_raw = pd.read_csv(snakemake.input["raw_data"])
parsed_data = parseSleepData(json_raw, fitbit_data_type)
elif column_format == "PLAIN_TEXT":
if fitbit_data_type == "summary":
parsed_data = pd.read_csv(snakemake.input["raw_data"], parse_dates=["local_start_date_time", "local_end_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
elif fitbit_data_type == "intraday":
parsed_data = pd.read_csv(snakemake.input["raw_data"], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
else:
raise ValueError("column_format can only be one of ['JSON', 'PLAIN_TEXT'].")
# Drop duplicates
parsed_data.drop_duplicates(inplace=True)
if parsed_data.shape[0] > 0 and fitbit_data_type == "summary":
if sleep_episode_timestamp != "start" and sleep_episode_timestamp != "end":
raise ValueError("SLEEP_EPISODE_TIMESTAMP can only be one of ['start', 'end'].")
# Column name to be considered as the event datetime
datetime_column = "local_" + sleep_episode_timestamp + "_date_time"
if not pd.isnull(local_start_date) and not pd.isnull(local_end_date):
parsed_data = parsed_data.loc[(parsed_data[datetime_column] >= local_start_date) & (parsed_data[datetime_column] < local_end_date)]
# Sort by "local_start_date_time" column
parsed_data.sort_values(by="local_start_date_time", ascending=True, inplace=True)
parsed_data["timestamp"] = parsed_data[datetime_column].dt.tz_localize(timezone, ambiguous=False, nonexistent="NaT").dropna().astype(np.int64) // 10**6
parsed_data.dropna(subset=['timestamp'], inplace=True)
parsed_data.drop(["local_start_date_time", "local_end_date_time"], axis = 1, inplace=True)
if parsed_data.shape[0] > 0 and fitbit_data_type == "intraday":
if not pd.isnull(local_start_date) and not pd.isnull(local_end_date):
parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)]
# Sort by "local_date_time" column
parsed_data.sort_values(by="local_date_time", ascending=True, inplace=True)
parsed_data["start_timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone, ambiguous=False, nonexistent="NaT").dropna().astype(np.int64) // 10**6
parsed_data.dropna(subset=['start_timestamp'], inplace=True)
parsed_data["end_timestamp"] = parsed_data["start_timestamp"] + ((parsed_data["duration"] - 1) * 1000) + 999
parsed_data["unified_level"] = np.where(parsed_data["level"].isin(["awake", "restless", "wake"]), 0, 1)
# Put consecutive rows with the same "level" field together and merge episodes
parsed_data.insert(2, "level_episode_id", (parsed_data[["type_episode_id", "level"]] != parsed_data[["type_episode_id", "level"]].shift()).any(axis=1).cumsum())
parsed_data = mergeSleepEpisodes(parsed_data, ["type_episode_id", "level_episode_id", "level", "unified_level", "is_main_sleep", "type"])
parsed_data.to_csv(snakemake.output[0], index=False)

View File

@ -10,7 +10,7 @@ FITBIT_HEARTRATE_SUMMARY:
HEARTRATE_DAILY_CALORIESPEAK: FLAG_TO_MUTATE
MUTATION:
COLUMN_MAPPINGS:
JSON_FITBIT_COLUMN: fitbit_data # text column with JSON objects
JSON_FITBIT_COLUMN: fitbit_data # string columnwith JSON objects
SCRIPTS: # List any python or r scripts that mutate your raw data
- src/data/streams/mutations/fitbit/parse_heartrate_summary_json.py
@ -23,7 +23,7 @@ FITBIT_HEARTRATE_INTRADAY:
HEARTRATE_ZONE: FLAG_TO_MUTATE
MUTATION:
COLUMN_MAPPINGS:
JSON_FITBIT_COLUMN: fitbit_data # text column with JSON objects
JSON_FITBIT_COLUMN: fitbit_data # string columnwith JSON objects
SCRIPTS: # List any python or r scripts that mutate your raw data
- src/data/streams/mutations/fitbit/parse_heartrate_intraday_json.py
@ -44,10 +44,26 @@ FITBIT_SLEEP_SUMMARY:
TYPE: FLAG_TO_MUTATE
MUTATION:
COLUMN_MAPPINGS:
JSON_FITBIT_COLUMN: fitbit_data # text column with JSON objects
JSON_FITBIT_COLUMN: fitbit_data # string columnwith JSON objects
SCRIPTS: # List any python or r scripts that mutate your raw data
- src/data/streams/mutations/fitbit/parse_sleep_summary_json.py
FITBIT_SLEEP_INTRADAY:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: FLAG_TO_MUTATE
DEVICE_ID: device_id
LOCAL_DATE_TIME: FLAG_TO_MUTATE
TYPE_EPISODE_ID: FLAG_TO_MUTATE
DURATION: FLAG_TO_MUTATE
IS_MAIN_SLEEP: FLAG_TO_MUTATE
TYPE: FLAG_TO_MUTATE
LEVEL: FLAG_TO_MUTATE
MUTATION:
COLUMN_MAPPINGS:
JSON_FITBIT_COLUMN: fitbit_data # string columnwith JSON objects
SCRIPTS: # List any python or r scripts that mutate your raw data
- src/data/streams/mutations/fitbit/parse_sleep_intraday_json.py
FITBIT_STEPS_SUMMARY:
RAPIDS_COLUMN_MAPPINGS:
TIMESTAMP: FLAG_TO_MUTATE
@ -56,7 +72,7 @@ FITBIT_STEPS_SUMMARY:
STEPS: FLAG_TO_MUTATE
MUTATION:
COLUMN_MAPPINGS:
JSON_FITBIT_COLUMN: fitbit_data # text column with JSON objects
JSON_FITBIT_COLUMN: fitbit_data # string columnwith JSON objects
SCRIPTS: # List any python or r scripts that mutate your raw data
- src/data/streams/mutations/fitbit/parse_steps_summary_json.py
@ -68,6 +84,6 @@ FITBIT_STEPS_INTRADAY:
STEPS: FLAG_TO_MUTATE
MUTATION:
COLUMN_MAPPINGS:
JSON_FITBIT_COLUMN: fitbit_data # text column with JSON objects
JSON_FITBIT_COLUMN: fitbit_data # string columnwith JSON objects
SCRIPTS: # List any python or r scripts that mutate your raw data
- src/data/streams/mutations/fitbit/parse_steps_intraday_json.py

View File

@ -0,0 +1,142 @@
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import dateutil.parser
SLEEP_CODE2LEVEL = ["asleep", "restless", "awake"]
SLEEP_INTRADAY_COLUMNS = ("device_id",
"type_episode_id",
"duration",
# For "classic" type, original_level is one of {"awake", "restless", "asleep"}
# For "stages" type, original_level is one of {"wake", "deep", "light", "rem"}
"level",
# one of {0, 1} where 0: nap, 1: main sleep
"is_main_sleep",
# one of {"classic", "stages"}
"type",
"local_date_time",
"timestamp")
def mergeLongAndShortData(data_intraday):
long_data = pd.DataFrame(columns=["dateTime", "level"])
short_data = pd.DataFrame(columns=["dateTime", "level"])
window_length = 30
for data in data_intraday["data"]:
counter = 0
for times in range(data["seconds"] // window_length):
row = {"dateTime": dateutil.parser.parse(data["dateTime"])+timedelta(seconds=counter*window_length), "level": data["level"]}
long_data = long_data.append(row, ignore_index = True)
counter = counter + 1
for data in data_intraday["shortData"]:
counter = 0
for times in range(data["seconds"] // window_length):
row = {"dateTime": dateutil.parser.parse(data["dateTime"])+timedelta(seconds=counter*window_length), "level": data["level"]}
short_data = short_data.append(row, ignore_index = True)
counter = counter + 1
long_data.set_index("dateTime",inplace=True)
short_data.set_index("dateTime",inplace=True)
long_data["level"] = np.where(long_data.index.isin(short_data.index) == True, "wake", long_data["level"])
long_data.reset_index(inplace=True)
return long_data.values.tolist()
# Parse one record for sleep API version 1
def parseOneRecordForV1(record, device_id, d_is_main_sleep, records_intraday, type_episode_id):
sleep_record_type = "classic"
d_start_datetime = datetime.strptime(record["startTime"][:18], "%Y-%m-%dT%H:%M:%S")
d_end_datetime = datetime.strptime(record["endTime"][:18], "%Y-%m-%dT%H:%M:%S")
# Intraday data
start_date = d_start_datetime.date()
end_date = d_end_datetime.date()
is_before_midnight = True
curr_date = start_date
for data in record["minuteData"]:
# For overnight episodes, use end_date once we are over midnight
d_time = datetime.strptime(data["dateTime"], '%H:%M:%S').time()
if is_before_midnight and d_time.hour == 0:
curr_date = end_date
d_datetime = datetime.combine(curr_date, d_time)
# API 1.2 stores original_level as strings, so we convert original_levels of API 1 to strings too
# (1: "asleep", 2: "restless", 3: "awake")
d_original_level = SLEEP_CODE2LEVEL[int(data["value"])-1]
row_intraday = (device_id, type_episode_id, 60,
d_original_level, d_is_main_sleep, sleep_record_type,
d_datetime, 0)
records_intraday.append(row_intraday)
return records_intraday
# Parse one record for sleep API version 1.2
def parseOneRecordForV12(record, device_id, d_is_main_sleep, records_intraday, type_episode_id):
sleep_record_type = record['type']
if sleep_record_type == "classic":
for data in record["levels"]["data"]:
d_datetime = dateutil.parser.parse(data["dateTime"])
row_intraday = (device_id, type_episode_id, data["seconds"],
data["level"], d_is_main_sleep, sleep_record_type,
d_datetime, 0)
records_intraday.append(row_intraday)
else:
# For sleep type "stages"
for data in mergeLongAndShortData(record["levels"]):
row_intraday = (device_id, type_episode_id, 30,
data[1], d_is_main_sleep, sleep_record_type,
data[0], 0)
records_intraday.append(row_intraday)
return records_intraday
def parseSleepData(sleep_data):
if sleep_data.empty:
return pd.DataFrame(columns=SLEEP_INTRADAY_COLUMNS)
device_id = sleep_data["device_id"].iloc[0]
records_intraday = []
type_episode_id = 0
# Parse JSON into individual records
for multi_record in sleep_data.json_fitbit_column:
sleep_record = json.loads(multi_record)
if "sleep" in sleep_record:
for record in json.loads(multi_record)["sleep"]:
# Whether the sleep episode is nap (0) or main sleep (1)
d_is_main_sleep = 1 if record["isMainSleep"] else 0
# For sleep API version 1
if "awakeCount" in record:
records_intraday = parseOneRecordForV1(record, device_id, d_is_main_sleep, records_intraday, type_episode_id)
# For sleep API version 1.2
else:
records_intraday = parseOneRecordForV12(record, device_id, d_is_main_sleep, records_intraday, type_episode_id)
type_episode_id = type_episode_id + 1
parsed_data = pd.DataFrame(data=records_intraday, columns=SLEEP_INTRADAY_COLUMNS)
return parsed_data
def main(json_raw, stream_parameters):
parsed_data = parseSleepData(json_raw)
parsed_data["timestamp"] = 0 # this column is added at readable_datetime.R because we neeed to take into account multiple timezones
parsed_data['local_date_time'] = parsed_data['local_date_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
return(parsed_data)

View File

@ -156,7 +156,7 @@ pull_phone_data <- function(){
mutated_data <- mutate_data(mutation_scripts, renamed_data, data_configuration)
if(!setequal(expected_columns, colnames(mutated_data)))
stop(paste0("The mutated data for ", device, " does not have the columns RAPIDS expects. The container script returned [", paste(colnames(mutated_data), collapse=","),"] but RAPIDS expected [",paste(expected_columns, collapse=","), "]. One ore more mutation scripts in [", sensor,"][MUTATION][SCRIPTS] are adding extra columns or removing or not adding the ones expected"))
stop(paste0("The mutated data for ", device, " does not have the columns RAPIDS expects. The mutation script returned [", paste(colnames(mutated_data), collapse=","),"] but RAPIDS expected [",paste(expected_columns, collapse=","), "]. One ore more mutation scripts in [", sensor,"][MUTATION][SCRIPTS] are adding extra columns or removing or not adding the ones expected"))
participant_data <- rbind(participant_data, mutated_data)
}

View File

@ -110,7 +110,7 @@ pull_wearable_data_main <- function(){
mutated_data <- mutate_data(mutation_scripts, renamed_data, data_configuration)
if(!setequal(expected_columns, colnames(mutated_data)))
stop(paste0("The mutated data for ", device, " does not have the columns RAPIDS expects. The container script returned [", paste(colnames(mutated_data), collapse=","),"] but RAPIDS expected [",paste(expected_columns, collapse=","), "]. One ore more mutation scripts in [", sensor,"][MUTATION][SCRIPTS] are adding extra columns or removing or not adding the ones expected"))
stop(paste0("The mutated data for ", device, " does not have the columns RAPIDS expects. The mutation script returned [", paste(colnames(mutated_data), collapse=","),"] but RAPIDS expected [",paste(expected_columns, collapse=","), "]. One ore more mutation scripts in [", sensor,"][MUTATION][SCRIPTS] are adding extra columns or removing or not adding the ones expected"))
participant_data <- rbind(participant_data, mutated_data)
}

View File

@ -159,6 +159,16 @@ FITBIT_SLEEP_SUMMARY:
- IS_MAIN_SLEEP
- TYPE
FITBIT_SLEEP_INTRADAY:
- TIMESTAMP
- DEVICE_ID
- LOCAL_DATE_TIME
- TYPE_EPISODE_ID
- DURATION
- IS_MAIN_SLEEP
- TYPE
- LEVEL
FITBIT_STEPS_SUMMARY:
- TIMESTAMP
- DEVICE_ID