diff --git a/Snakefile b/Snakefile index 6b753b00..db95ad5e 100644 --- a/Snakefile +++ b/Snakefile @@ -279,13 +279,6 @@ for provider in config["FITBIT_STEPS_INTRADAY"]["PROVIDERS"].keys(): files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") -# for provider in config["FITBIT_CALORIES"]["PROVIDERS"].keys(): -# if config["FITBIT_CALORIES"]["PROVIDERS"][provider]["COMPUTE"]: -# files_to_compute.extend(expand("data/raw/{pid}/fitbit_calories_{fitbit_data_type}_raw.csv", pid=config["PIDS"], fitbit_data_type=(["json"] if config["FITBIT_CALORIES"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"]))) -# files_to_compute.extend(expand("data/raw/{pid}/fitbit_calories_{fitbit_data_type}_parsed.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) -# files_to_compute.extend(expand("data/raw/{pid}/fitbit_calories_{fitbit_data_type}_parsed_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) -# files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"])) -# files_to_compute.append("data/processed/features/all_participants/all_sensor_features.csv") for provider in config["EMPATICA_ACCELEROMETER"]["PROVIDERS"].keys(): if config["EMPATICA_ACCELEROMETER"]["PROVIDERS"][provider]["COMPUTE"]: diff --git a/config.yaml b/config.yaml index 1c3abcc2..65a6d52a 100644 --- a/config.yaml +++ b/config.yaml @@ -448,19 +448,6 @@ FITBIT_STEPS_INTRADAY: SRC_FOLDER: "rapids" # inside src/features/fitbit_steps_intraday SRC_LANGUAGE: "python" -# FITBIT_CALORIES: -# TABLE_FORMAT: JSON # JSON or CSV. If your JSON or CSV data are files change [DEVICE_DATA][FITBIT][SOURCE][TYPE] to FILES -# TABLE: -# JSON: fitbit_calories -# CSV: -# SUMMARY: calories_summary -# INTRADAY: calories_intraday -# PROVIDERS: -# RAPIDS: -# COMPUTE: False -# FEATURES: [] - - ######################################################################################################################## # EMPATICA # ######################################################################################################################## diff --git a/docs/datastreams/fitbitjson-mysql.md b/docs/datastreams/fitbitjson-mysql.md index 6b61741e..e0e0ad78 100644 --- a/docs/datastreams/fitbitjson-mysql.md +++ b/docs/datastreams/fitbitjson-mysql.md @@ -146,6 +146,49 @@ If you want RAPIDS to process Fitbit sensor data using this stream, you will nee |a748ee1a-1d0b-4ae9-9074-279a2b6ba524 |{"sleep":[{"dateOfSleep":"2020-10-11","duration":41640000,"efficiency":89,"endTime":"2020-10-11T11:47:00.000","infoCode":0,"isMainSleep":true,"levels":{"data":[{"dateTime":"2020-10-11T00:12:30.000","level":"wake","seconds":450},{"dateTime":"2020-10-11T00:20:00.000","level":"light","seconds":870},{"dateTime":"2020-10-11T00:34:30.000","level":"wake","seconds":780},...], "summary":{"deep":{"count":4,"minutes":52,"thirtyDayAvgMinutes":62},"light":{"count":32,"minutes":442,"thirtyDayAvgMinutes":364},"rem":{"count":6,"minutes":68,"thirtyDayAvgMinutes":58},"wake":{"count":29,"minutes":132,"thirtyDayAvgMinutes":94}}},"logId":26589710670,"minutesAfterWakeup":1,"minutesAsleep":562,"minutesAwake":132,"minutesToFallAsleep":0,"startTime":"2020-10-11T00:12:30.000","timeInBed":694,"type":"stages"}],"summary":{"stages":{"deep":52,"light":442,"rem":68,"wake":132},"totalMinutesAsleep":562,"totalSleepRecords":1,"totalTimeInBed":694}} |a748ee1a-1d0b-4ae9-9074-279a2b6ba524 |{"sleep":[{"dateOfSleep":"2020-10-12","duration":28980000,"efficiency":93,"endTime":"2020-10-12T09:34:30.000","infoCode":0,"isMainSleep":true,"levels":{"data":[{"dateTime":"2020-10-12T01:31:00.000","level":"wake","seconds":600},{"dateTime":"2020-10-12T01:41:00.000","level":"light","seconds":60},{"dateTime":"2020-10-12T01:42:00.000","level":"deep","seconds":2340},...], "summary":{"deep":{"count":4,"minutes":63,"thirtyDayAvgMinutes":59},"light":{"count":27,"minutes":257,"thirtyDayAvgMinutes":364},"rem":{"count":5,"minutes":94,"thirtyDayAvgMinutes":58},"wake":{"count":24,"minutes":69,"thirtyDayAvgMinutes":95}}},"logId":26589710673,"minutesAfterWakeup":0,"minutesAsleep":415,"minutesAwake":68,"minutesToFallAsleep":0,"startTime":"2020-10-12T01:31:00.000","timeInBed":483,"type":"stages"}],"summary":{"stages":{"deep":63,"light":257,"rem":94,"wake":69},"totalMinutesAsleep":415,"totalSleepRecords":1,"totalTimeInBed":483}} +??? info "FITBIT_SLEEP_INTRADAY" + + **RAPIDS_COLUMN_MAPPINGS** + + | RAPIDS column | Stream column | + |-----------------|-----------------| + | TIMESTAMP | FLAG_TO_MUTATE | + | LOCAL_DATE_TIME | FLAG_TO_MUTATE | + | DEVICE_ID | device_id | + | TYPE_EPISODE_ID | FLAG_TO_MUTATE | + | DURATION | FLAG_TO_MUTATE | + | IS_MAIN_SLEEP | FLAG_TO_MUTATE | + | TYPE | FLAG_TO_MUTATE | + | LEVEL | FLAG_TO_MUTATE | + + **MUTATION** + + - **COLUMN_MAPPINGS** + + | Script column | Stream column | + |-----------------|-----------------| + | JSON_FITBIT_COLUMN | fitbit_data | + + - **SCRIPTS** + + ```bash + src/data/streams/mutations/fitbit/parse_sleep_intraday_json.py + ``` + + !!! note + + Fitbit API has two versions for sleep data, v1 and v1.2, we support both. + + All columns except `DEVICE_ID` are parsed from `JSON_FITBIT_COLUMN`. `JSON_FITBIT_COLUMN` is a string column containing the JSON objects returned by Fitbit's API. See an example of the raw data RAPIDS expects for this data stream: + + ??? example "Example of the expected raw data" + + |device_id |fitbit_data | + |---------------------------------------- |--------------------------------------------------------- | + |a748ee1a-1d0b-4ae9-9074-279a2b6ba524 |{"sleep":[{"dateOfSleep":"2020-10-10","duration":3600000,"efficiency":92,"endTime":"2020-10-10T16:37:00.000","infoCode":2,"isMainSleep":false,"levels":{"data":[{"dateTime":"2020-10-10T15:36:30.000","level":"restless","seconds":60},{"dateTime":"2020-10-10T15:37:30.000","level":"asleep","seconds":660},{"dateTime":"2020-10-10T15:48:30.000","level":"restless","seconds":60},...], "summary":{"asleep":{"count":0,"minutes":56},"awake":{"count":0,"minutes":0},"restless":{"count":3,"minutes":4}}},"logId":26315914306,"minutesAfterWakeup":0,"minutesAsleep":55,"minutesAwake":5,"minutesToFallAsleep":0,"startTime":"2020-10-10T15:36:30.000","timeInBed":60,"type":"classic"},{"dateOfSleep":"2020-10-10","duration":22980000,"efficiency":88,"endTime":"2020-10-10T08:10:00.000","infoCode":0,"isMainSleep":true,"levels":{"data":[{"dateTime":"2020-10-10T01:46:30.000","level":"light","seconds":420},{"dateTime":"2020-10-10T01:53:30.000","level":"deep","seconds":1230},{"dateTime":"2020-10-10T02:14:00.000","level":"light","seconds":360},...], "summary":{"deep":{"count":3,"minutes":92,"thirtyDayAvgMinutes":0},"light":{"count":29,"minutes":193,"thirtyDayAvgMinutes":0},"rem":{"count":4,"minutes":33,"thirtyDayAvgMinutes":0},"wake":{"count":28,"minutes":65,"thirtyDayAvgMinutes":0}}},"logId":26311786557,"minutesAfterWakeup":0,"minutesAsleep":318,"minutesAwake":65,"minutesToFallAsleep":0,"startTime":"2020-10-10T01:46:30.000","timeInBed":383,"type":"stages"}],"summary":{"stages":{"deep":92,"light":193,"rem":33,"wake":65},"totalMinutesAsleep":373,"totalSleepRecords":2,"totalTimeInBed":443}} + |a748ee1a-1d0b-4ae9-9074-279a2b6ba524 |{"sleep":[{"dateOfSleep":"2020-10-11","duration":41640000,"efficiency":89,"endTime":"2020-10-11T11:47:00.000","infoCode":0,"isMainSleep":true,"levels":{"data":[{"dateTime":"2020-10-11T00:12:30.000","level":"wake","seconds":450},{"dateTime":"2020-10-11T00:20:00.000","level":"light","seconds":870},{"dateTime":"2020-10-11T00:34:30.000","level":"wake","seconds":780},...], "summary":{"deep":{"count":4,"minutes":52,"thirtyDayAvgMinutes":62},"light":{"count":32,"minutes":442,"thirtyDayAvgMinutes":364},"rem":{"count":6,"minutes":68,"thirtyDayAvgMinutes":58},"wake":{"count":29,"minutes":132,"thirtyDayAvgMinutes":94}}},"logId":26589710670,"minutesAfterWakeup":1,"minutesAsleep":562,"minutesAwake":132,"minutesToFallAsleep":0,"startTime":"2020-10-11T00:12:30.000","timeInBed":694,"type":"stages"}],"summary":{"stages":{"deep":52,"light":442,"rem":68,"wake":132},"totalMinutesAsleep":562,"totalSleepRecords":1,"totalTimeInBed":694}} + |a748ee1a-1d0b-4ae9-9074-279a2b6ba524 |{"sleep":[{"dateOfSleep":"2020-10-12","duration":28980000,"efficiency":93,"endTime":"2020-10-12T09:34:30.000","infoCode":0,"isMainSleep":true,"levels":{"data":[{"dateTime":"2020-10-12T01:31:00.000","level":"wake","seconds":600},{"dateTime":"2020-10-12T01:41:00.000","level":"light","seconds":60},{"dateTime":"2020-10-12T01:42:00.000","level":"deep","seconds":2340},...], "summary":{"deep":{"count":4,"minutes":63,"thirtyDayAvgMinutes":59},"light":{"count":27,"minutes":257,"thirtyDayAvgMinutes":364},"rem":{"count":5,"minutes":94,"thirtyDayAvgMinutes":58},"wake":{"count":24,"minutes":69,"thirtyDayAvgMinutes":95}}},"logId":26589710673,"minutesAfterWakeup":0,"minutesAsleep":415,"minutesAwake":68,"minutesToFallAsleep":0,"startTime":"2020-10-12T01:31:00.000","timeInBed":483,"type":"stages"}],"summary":{"stages":{"deep":63,"light":257,"rem":94,"wake":69},"totalMinutesAsleep":415,"totalSleepRecords":1,"totalTimeInBed":483}} + ??? info "FITBIT_STEPS_SUMMARY" **RAPIDS_COLUMN_MAPPINGS** diff --git a/docs/datastreams/mandatory-fitbit-format.md b/docs/datastreams/mandatory-fitbit-format.md index 5b463b86..87072872 100644 --- a/docs/datastreams/mandatory-fitbit-format.md +++ b/docs/datastreams/mandatory-fitbit-format.md @@ -6,7 +6,7 @@ This is a description of the format RAPIDS needs to process data for the followi | RAPIDS column | Description | |-----------------|-----------------| - | TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged | + | TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged (automatically created by RAPIDS) | | LOCAL_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss` | | DEVICE_ID | A string that uniquely identifies a device | | HEARTRATE_DAILY_RESTINGHR | Daily resting heartrate | @@ -19,7 +19,7 @@ This is a description of the format RAPIDS needs to process data for the followi | RAPIDS column | Description | |-----------------|-----------------| - | TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged | + | TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged (automatically created by RAPIDS) | | LOCAL_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss` | | DEVICE_ID | A string that uniquely identifies a device | | HEARTRATE | Intraday heartrate | @@ -29,7 +29,7 @@ This is a description of the format RAPIDS needs to process data for the followi | RAPIDS column | Description | |-----------------|-----------------| - | TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged | + | TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged (automatically created by RAPIDS) | | LOCAL_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss`, this either is a copy of LOCAL_START_DATE_TIME or LOCAL_END_DATE_TIME depending on which column is used to assign an episode to a specific day| | LOCAL_START_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss` representing the start of a daily sleep episode | | LOCAL_END_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss` representing the end of a daily sleep episode| @@ -43,11 +43,24 @@ This is a description of the format RAPIDS needs to process data for the followi | IS_MAIN_SLEEP | 0 if this episode is a nap, or 1 if it is a main sleep episode| | TYPE | stages or classic [sleep data](https://dev.fitbit.com/build/reference/web-api/sleep/)| +??? info "FITBIT_SLEEP_INTRADAY" + + | RAPIDS column | Description | + |-----------------|-----------------| + | TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged (automatically created by RAPIDS)| + | LOCAL_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss`, this either is a copy of LOCAL_START_DATE_TIME or LOCAL_END_DATE_TIME depending on which column is used to assign an episode to a specific day| + | DEVICE_ID | A string that uniquely identifies a device | + | TYPE_EPISODE_ID | An id for each unique main or nap episode. Main and nap episodes have different levels, each row in this table is one of such levels, so multiple rows can have the same TYPE_EPISODE_ID| + | DURATION | Duration of the episode level in minutes| + | IS_MAIN_SLEEP | 0 if this episode level belongs to a nap, or 1 if it belongs to a main sleep episode| + | TYPE | type of level: stages or classic [sleep data](https://dev.fitbit.com/build/reference/web-api/sleep/)| + | LEVEL | For stages levels one of `wake`, `deep`, `light`, or `rem`. For classic levels one of `awake`, `restless`, and `asleep`| + ??? info "FITBIT_STEPS_SUMMARY" | RAPIDS column | Description | |-----------------|-----------------| - | TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged | + | TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged (automatically created by RAPIDS) | | LOCAL_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss` | | DEVICE_ID | A string that uniquely identifies a device | | STEPS | Daily step count | @@ -56,7 +69,7 @@ This is a description of the format RAPIDS needs to process data for the followi | RAPIDS column | Description | |-----------------|-----------------| - | TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged | + | TIMESTAMP | An UNIX timestamp (13 digits) when a row of data was logged (automatically created by RAPIDS) | | LOCAL_DATE_TIME | Date time string with format `yyyy-mm-dd hh:mm:ss` | | DEVICE_ID | A string that uniquely identifies a device | | STEPS | Intraday step count (usually every minute)| \ No newline at end of file diff --git a/src/data/fitbit_parse_sleep.py b/src/data/fitbit_parse_sleep.py deleted file mode 100644 index bf573437..00000000 --- a/src/data/fitbit_parse_sleep.py +++ /dev/null @@ -1,251 +0,0 @@ -import json, yaml -import pandas as pd -import numpy as np -from datetime import datetime, timedelta -import dateutil.parser - -SLEEP_CODE2LEVEL = ["asleep", "restless", "awake"] - - -SLEEP_SUMMARY_COLUMNS_V1_2 = ("device_id", "efficiency", - "minutes_after_wakeup", "minutes_asleep", "minutes_awake", "minutes_to_fall_asleep", "minutes_in_bed", - "is_main_sleep", "type", - "local_start_date_time", "local_end_date_time", - "timestamp") -SLEEP_SUMMARY_COLUMNS_V1 = SLEEP_SUMMARY_COLUMNS_V1_2 + ("count_awake", "duration_awake", "count_awakenings", "count_restless", "duration_restless") - -SLEEP_INTRADAY_COLUMNS = (# Extract "type_episode_id" field based on summary data: start from 0 - "type_episode_id", - "duration", - # For "classic" type, original_level is one of {"awake", "restless", "asleep"} - # For "stages" type, original_level is one of {"wake", "deep", "light", "rem"} - "level", - # For "classic" type, unified_level is one of {0, 1} where 0: awake {"awake" + "restless"}, 1: asleep {"asleep"} - # For "stages" type, unified_level is one of {0, 1} where 0: awake {"wake"}, 1: asleep {"deep" + "light" + "rem"} - "unified_level", - # One of {0, 1} where 0: nap, 1: main sleep - "is_main_sleep", - # One of {"classic", "stages"} - "type", - "local_date_time", - "start_timestamp", - "end_timestamp") - - -def mergeLongAndShortData(data_intraday): - long_data = pd.DataFrame(columns=["dateTime", "level"]) - short_data = pd.DataFrame(columns=["dateTime", "level"]) - - window_length = 30 - - for data in data_intraday["data"]: - counter = 0 - for times in range(data["seconds"] // window_length): - row = {"dateTime": dateutil.parser.parse(data["dateTime"])+timedelta(seconds=counter*window_length), "level": data["level"]} - long_data = long_data.append(row, ignore_index = True) - counter = counter + 1 - - for data in data_intraday["shortData"]: - counter = 0 - for times in range(data["seconds"] // window_length): - row = {"dateTime": dateutil.parser.parse(data["dateTime"])+timedelta(seconds=counter*window_length), "level": data["level"]} - short_data = short_data.append(row, ignore_index = True) - counter = counter + 1 - long_data.set_index("dateTime",inplace=True) - short_data.set_index("dateTime",inplace=True) - long_data["level"] = np.where(long_data.index.isin(short_data.index) == True, "wake", long_data["level"]) - - long_data.reset_index(inplace=True) - - return long_data.values.tolist() - -# Parse one record for sleep API version 1 -def parseOneRecordForV1(record, device_id, type_episode_id, d_is_main_sleep, records_summary, records_intraday, fitbit_data_type): - - sleep_record_type = "classic" - - d_start_datetime = datetime.strptime(record["startTime"][:18], "%Y-%m-%dT%H:%M:%S") - d_end_datetime = datetime.strptime(record["endTime"][:18], "%Y-%m-%dT%H:%M:%S") - - # Summary data - if fitbit_data_type == "summary": - row_summary = (device_id, record["efficiency"], - record["minutesAfterWakeup"], record["minutesAsleep"], record["minutesAwake"], record["minutesToFallAsleep"], record["timeInBed"], - d_is_main_sleep, sleep_record_type, - d_start_datetime, d_end_datetime, - 0, - record["awakeCount"], record["awakeDuration"], record["awakeningsCount"], - record["restlessCount"], record["restlessDuration"]) - - records_summary.append(row_summary) - - # Intraday data - if fitbit_data_type == "intraday": - start_date = d_start_datetime.date() - end_date = d_end_datetime.date() - is_before_midnight = True - curr_date = start_date - for data in record["minuteData"]: - # For overnight episodes, use end_date once we are over midnight - d_time = datetime.strptime(data["dateTime"], '%H:%M:%S').time() - if is_before_midnight and d_time.hour == 0: - curr_date = end_date - d_datetime = datetime.combine(curr_date, d_time) - - # API 1.2 stores original_level as strings, so we convert original_levels of API 1 to strings too - # (1: "asleep", 2: "restless", 3: "awake") - d_original_level = SLEEP_CODE2LEVEL[int(data["value"])-1] - - - row_intraday = (type_episode_id, 60, - d_original_level, -1, d_is_main_sleep, sleep_record_type, - d_datetime, 0, 0) - - records_intraday.append(row_intraday) - - return records_summary, records_intraday - -# Parse one record for sleep API version 1.2 -def parseOneRecordForV12(record, device_id, type_episode_id, d_is_main_sleep, records_summary, records_intraday, fitbit_data_type): - - sleep_record_type = record['type'] - - d_start_datetime = datetime.strptime(record["startTime"][:18], "%Y-%m-%dT%H:%M:%S") - d_end_datetime = datetime.strptime(record["endTime"][:18], "%Y-%m-%dT%H:%M:%S") - - # Summary data - if fitbit_data_type == "summary": - row_summary = (device_id, record["efficiency"], - record["minutesAfterWakeup"], record["minutesAsleep"], record["minutesAwake"], record["minutesToFallAsleep"], record["timeInBed"], - d_is_main_sleep, sleep_record_type, - d_start_datetime, d_end_datetime, - 0) - - records_summary.append(row_summary) - - # Intraday data - if fitbit_data_type == "intraday": - if sleep_record_type == "classic": - for data in record["levels"]["data"]: - d_datetime = dateutil.parser.parse(data["dateTime"]) - - row_intraday = (type_episode_id, data["seconds"], - data["level"], -1, d_is_main_sleep, sleep_record_type, - d_datetime, 0, 0) - records_intraday.append(row_intraday) - else: - # For sleep type "stages" - for data in mergeLongAndShortData(record["levels"]): - row_intraday = (type_episode_id, 30, - data[1], -1, d_is_main_sleep, sleep_record_type, - data[0], 0, 0) - - records_intraday.append(row_intraday) - - return records_summary, records_intraday - -def parseSleepData(sleep_data, fitbit_data_type): - SLEEP_SUMMARY_COLUMNS = SLEEP_SUMMARY_COLUMNS_V1_2 - if sleep_data.empty: - if fitbit_data_type == "summary": - return pd.DataFrame(columns=SLEEP_SUMMARY_COLUMNS) - elif fitbit_data_type == "intraday": - return pd.DataFrame(columns=SLEEP_INTRADAY_COLUMNS) - device_id = sleep_data["device_id"].iloc[0] - records_summary, records_intraday = [], [] - type_episode_id = 0 - # Parse JSON into individual records - for multi_record in sleep_data.fitbit_data: - for record in json.loads(multi_record)["sleep"]: - # Whether the sleep episode is nap (0) or main sleep (1) - d_is_main_sleep = 1 if record["isMainSleep"] else 0 - - # For sleep API version 1 - if "awakeCount" in record: - SLEEP_SUMMARY_COLUMNS = SLEEP_SUMMARY_COLUMNS_V1 - records_summary, records_intraday = parseOneRecordForV1(record, device_id, type_episode_id, d_is_main_sleep, records_summary, records_intraday, fitbit_data_type) - # For sleep API version 1.2 - else: - SLEEP_SUMMARY_COLUMNS = SLEEP_SUMMARY_COLUMNS_V1_2 - records_summary, records_intraday = parseOneRecordForV12(record, device_id, type_episode_id, d_is_main_sleep, records_summary, records_intraday, fitbit_data_type) - - type_episode_id = type_episode_id + 1 - - if fitbit_data_type == "summary": - parsed_data = pd.DataFrame(data=records_summary, columns=SLEEP_SUMMARY_COLUMNS) - elif fitbit_data_type == "intraday": - parsed_data = pd.DataFrame(data=records_intraday, columns=SLEEP_INTRADAY_COLUMNS) - - return parsed_data - -def mergeSleepEpisodes(sleep_data, cols_for_groupby): - sleep_episodes = pd.DataFrame(columns=["type_episode_id", "level_episode_id", "level", "unified_level", "is_main_sleep", "type", "start_timestamp", "end_timestamp"]) - if not sleep_data.empty: - sleep_data = sleep_data.groupby(by=cols_for_groupby) - sleep_episodes = sleep_data[["start_timestamp"]].first() - sleep_episodes["end_timestamp"] = sleep_data["end_timestamp"].last() - - sleep_episodes.reset_index(inplace=True, drop=False) - - return sleep_episodes - - - -timezone = snakemake.params["timezone"] -column_format = snakemake.params["column_format"] -fitbit_data_type = snakemake.params["fitbit_data_type"] -sleep_episode_timestamp = snakemake.params["sleep_episode_timestamp"] - -with open(snakemake.input["participant_file"], "r", encoding="utf-8") as f: - participant_file = yaml.safe_load(f) -local_start_date = pd.Timestamp(participant_file["FITBIT"]["START_DATE"]) -local_end_date = pd.Timestamp(participant_file["FITBIT"]["END_DATE"]) + pd.DateOffset(1) - -if column_format == "JSON": - json_raw = pd.read_csv(snakemake.input["raw_data"]) - parsed_data = parseSleepData(json_raw, fitbit_data_type) -elif column_format == "PLAIN_TEXT": - if fitbit_data_type == "summary": - parsed_data = pd.read_csv(snakemake.input["raw_data"], parse_dates=["local_start_date_time", "local_end_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None)) - elif fitbit_data_type == "intraday": - parsed_data = pd.read_csv(snakemake.input["raw_data"], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None)) -else: - raise ValueError("column_format can only be one of ['JSON', 'PLAIN_TEXT'].") - -# Drop duplicates -parsed_data.drop_duplicates(inplace=True) - -if parsed_data.shape[0] > 0 and fitbit_data_type == "summary": - if sleep_episode_timestamp != "start" and sleep_episode_timestamp != "end": - raise ValueError("SLEEP_EPISODE_TIMESTAMP can only be one of ['start', 'end'].") - # Column name to be considered as the event datetime - datetime_column = "local_" + sleep_episode_timestamp + "_date_time" - - if not pd.isnull(local_start_date) and not pd.isnull(local_end_date): - parsed_data = parsed_data.loc[(parsed_data[datetime_column] >= local_start_date) & (parsed_data[datetime_column] < local_end_date)] - - # Sort by "local_start_date_time" column - parsed_data.sort_values(by="local_start_date_time", ascending=True, inplace=True) - - parsed_data["timestamp"] = parsed_data[datetime_column].dt.tz_localize(timezone, ambiguous=False, nonexistent="NaT").dropna().astype(np.int64) // 10**6 - parsed_data.dropna(subset=['timestamp'], inplace=True) - parsed_data.drop(["local_start_date_time", "local_end_date_time"], axis = 1, inplace=True) - -if parsed_data.shape[0] > 0 and fitbit_data_type == "intraday": - if not pd.isnull(local_start_date) and not pd.isnull(local_end_date): - parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)] - - # Sort by "local_date_time" column - parsed_data.sort_values(by="local_date_time", ascending=True, inplace=True) - - parsed_data["start_timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone, ambiguous=False, nonexistent="NaT").dropna().astype(np.int64) // 10**6 - parsed_data.dropna(subset=['start_timestamp'], inplace=True) - parsed_data["end_timestamp"] = parsed_data["start_timestamp"] + ((parsed_data["duration"] - 1) * 1000) + 999 - parsed_data["unified_level"] = np.where(parsed_data["level"].isin(["awake", "restless", "wake"]), 0, 1) - - # Put consecutive rows with the same "level" field together and merge episodes - parsed_data.insert(2, "level_episode_id", (parsed_data[["type_episode_id", "level"]] != parsed_data[["type_episode_id", "level"]].shift()).any(axis=1).cumsum()) - parsed_data = mergeSleepEpisodes(parsed_data, ["type_episode_id", "level_episode_id", "level", "unified_level", "is_main_sleep", "type"]) - - -parsed_data.to_csv(snakemake.output[0], index=False) diff --git a/src/data/streams/fitbitjson_mysql/format.yaml b/src/data/streams/fitbitjson_mysql/format.yaml index ac022c05..16c3b73e 100644 --- a/src/data/streams/fitbitjson_mysql/format.yaml +++ b/src/data/streams/fitbitjson_mysql/format.yaml @@ -10,7 +10,7 @@ FITBIT_HEARTRATE_SUMMARY: HEARTRATE_DAILY_CALORIESPEAK: FLAG_TO_MUTATE MUTATION: COLUMN_MAPPINGS: - JSON_FITBIT_COLUMN: fitbit_data # text column with JSON objects + JSON_FITBIT_COLUMN: fitbit_data # string columnwith JSON objects SCRIPTS: # List any python or r scripts that mutate your raw data - src/data/streams/mutations/fitbit/parse_heartrate_summary_json.py @@ -23,7 +23,7 @@ FITBIT_HEARTRATE_INTRADAY: HEARTRATE_ZONE: FLAG_TO_MUTATE MUTATION: COLUMN_MAPPINGS: - JSON_FITBIT_COLUMN: fitbit_data # text column with JSON objects + JSON_FITBIT_COLUMN: fitbit_data # string columnwith JSON objects SCRIPTS: # List any python or r scripts that mutate your raw data - src/data/streams/mutations/fitbit/parse_heartrate_intraday_json.py @@ -44,10 +44,26 @@ FITBIT_SLEEP_SUMMARY: TYPE: FLAG_TO_MUTATE MUTATION: COLUMN_MAPPINGS: - JSON_FITBIT_COLUMN: fitbit_data # text column with JSON objects + JSON_FITBIT_COLUMN: fitbit_data # string columnwith JSON objects SCRIPTS: # List any python or r scripts that mutate your raw data - src/data/streams/mutations/fitbit/parse_sleep_summary_json.py +FITBIT_SLEEP_INTRADAY: + RAPIDS_COLUMN_MAPPINGS: + TIMESTAMP: FLAG_TO_MUTATE + DEVICE_ID: device_id + LOCAL_DATE_TIME: FLAG_TO_MUTATE + TYPE_EPISODE_ID: FLAG_TO_MUTATE + DURATION: FLAG_TO_MUTATE + IS_MAIN_SLEEP: FLAG_TO_MUTATE + TYPE: FLAG_TO_MUTATE + LEVEL: FLAG_TO_MUTATE + MUTATION: + COLUMN_MAPPINGS: + JSON_FITBIT_COLUMN: fitbit_data # string columnwith JSON objects + SCRIPTS: # List any python or r scripts that mutate your raw data + - src/data/streams/mutations/fitbit/parse_sleep_intraday_json.py + FITBIT_STEPS_SUMMARY: RAPIDS_COLUMN_MAPPINGS: TIMESTAMP: FLAG_TO_MUTATE @@ -56,7 +72,7 @@ FITBIT_STEPS_SUMMARY: STEPS: FLAG_TO_MUTATE MUTATION: COLUMN_MAPPINGS: - JSON_FITBIT_COLUMN: fitbit_data # text column with JSON objects + JSON_FITBIT_COLUMN: fitbit_data # string columnwith JSON objects SCRIPTS: # List any python or r scripts that mutate your raw data - src/data/streams/mutations/fitbit/parse_steps_summary_json.py @@ -68,6 +84,6 @@ FITBIT_STEPS_INTRADAY: STEPS: FLAG_TO_MUTATE MUTATION: COLUMN_MAPPINGS: - JSON_FITBIT_COLUMN: fitbit_data # text column with JSON objects + JSON_FITBIT_COLUMN: fitbit_data # string columnwith JSON objects SCRIPTS: # List any python or r scripts that mutate your raw data - src/data/streams/mutations/fitbit/parse_steps_intraday_json.py diff --git a/src/data/streams/mutations/fitbit/parse_sleep_intraday_json.py b/src/data/streams/mutations/fitbit/parse_sleep_intraday_json.py new file mode 100644 index 00000000..5f07c1eb --- /dev/null +++ b/src/data/streams/mutations/fitbit/parse_sleep_intraday_json.py @@ -0,0 +1,142 @@ +import json +import pandas as pd +import numpy as np +from datetime import datetime, timedelta +import dateutil.parser + +SLEEP_CODE2LEVEL = ["asleep", "restless", "awake"] + +SLEEP_INTRADAY_COLUMNS = ("device_id", + "type_episode_id", + "duration", + # For "classic" type, original_level is one of {"awake", "restless", "asleep"} + # For "stages" type, original_level is one of {"wake", "deep", "light", "rem"} + "level", + # one of {0, 1} where 0: nap, 1: main sleep + "is_main_sleep", + # one of {"classic", "stages"} + "type", + "local_date_time", + "timestamp") + +def mergeLongAndShortData(data_intraday): + long_data = pd.DataFrame(columns=["dateTime", "level"]) + short_data = pd.DataFrame(columns=["dateTime", "level"]) + + window_length = 30 + + for data in data_intraday["data"]: + counter = 0 + for times in range(data["seconds"] // window_length): + row = {"dateTime": dateutil.parser.parse(data["dateTime"])+timedelta(seconds=counter*window_length), "level": data["level"]} + long_data = long_data.append(row, ignore_index = True) + counter = counter + 1 + + for data in data_intraday["shortData"]: + counter = 0 + for times in range(data["seconds"] // window_length): + row = {"dateTime": dateutil.parser.parse(data["dateTime"])+timedelta(seconds=counter*window_length), "level": data["level"]} + short_data = short_data.append(row, ignore_index = True) + counter = counter + 1 + long_data.set_index("dateTime",inplace=True) + short_data.set_index("dateTime",inplace=True) + long_data["level"] = np.where(long_data.index.isin(short_data.index) == True, "wake", long_data["level"]) + + long_data.reset_index(inplace=True) + + return long_data.values.tolist() + +# Parse one record for sleep API version 1 +def parseOneRecordForV1(record, device_id, d_is_main_sleep, records_intraday, type_episode_id): + + sleep_record_type = "classic" + + d_start_datetime = datetime.strptime(record["startTime"][:18], "%Y-%m-%dT%H:%M:%S") + d_end_datetime = datetime.strptime(record["endTime"][:18], "%Y-%m-%dT%H:%M:%S") + + # Intraday data + start_date = d_start_datetime.date() + end_date = d_end_datetime.date() + is_before_midnight = True + curr_date = start_date + for data in record["minuteData"]: + # For overnight episodes, use end_date once we are over midnight + d_time = datetime.strptime(data["dateTime"], '%H:%M:%S').time() + if is_before_midnight and d_time.hour == 0: + curr_date = end_date + d_datetime = datetime.combine(curr_date, d_time) + + # API 1.2 stores original_level as strings, so we convert original_levels of API 1 to strings too + # (1: "asleep", 2: "restless", 3: "awake") + d_original_level = SLEEP_CODE2LEVEL[int(data["value"])-1] + + + row_intraday = (device_id, type_episode_id, 60, + d_original_level, d_is_main_sleep, sleep_record_type, + d_datetime, 0) + + records_intraday.append(row_intraday) + + return records_intraday + +# Parse one record for sleep API version 1.2 +def parseOneRecordForV12(record, device_id, d_is_main_sleep, records_intraday, type_episode_id): + + sleep_record_type = record['type'] + + if sleep_record_type == "classic": + for data in record["levels"]["data"]: + d_datetime = dateutil.parser.parse(data["dateTime"]) + + row_intraday = (device_id, type_episode_id, data["seconds"], + data["level"], d_is_main_sleep, sleep_record_type, + d_datetime, 0) + records_intraday.append(row_intraday) + else: + # For sleep type "stages" + for data in mergeLongAndShortData(record["levels"]): + row_intraday = (device_id, type_episode_id, 30, + data[1], d_is_main_sleep, sleep_record_type, + data[0], 0) + + records_intraday.append(row_intraday) + + return records_intraday + + + +def parseSleepData(sleep_data): + if sleep_data.empty: + return pd.DataFrame(columns=SLEEP_INTRADAY_COLUMNS) + device_id = sleep_data["device_id"].iloc[0] + records_intraday = [] + type_episode_id = 0 + # Parse JSON into individual records + for multi_record in sleep_data.json_fitbit_column: + sleep_record = json.loads(multi_record) + if "sleep" in sleep_record: + for record in json.loads(multi_record)["sleep"]: + # Whether the sleep episode is nap (0) or main sleep (1) + d_is_main_sleep = 1 if record["isMainSleep"] else 0 + + # For sleep API version 1 + if "awakeCount" in record: + records_intraday = parseOneRecordForV1(record, device_id, d_is_main_sleep, records_intraday, type_episode_id) + # For sleep API version 1.2 + else: + records_intraday = parseOneRecordForV12(record, device_id, d_is_main_sleep, records_intraday, type_episode_id) + + type_episode_id = type_episode_id + 1 + + parsed_data = pd.DataFrame(data=records_intraday, columns=SLEEP_INTRADAY_COLUMNS) + + return parsed_data + + + +def main(json_raw, stream_parameters): + parsed_data = parseSleepData(json_raw) + parsed_data["timestamp"] = 0 # this column is added at readable_datetime.R because we neeed to take into account multiple timezones + parsed_data['local_date_time'] = parsed_data['local_date_time'].dt.strftime('%Y-%m-%d %H:%M:%S') + + return(parsed_data) diff --git a/src/data/streams/pull_phone_data.R b/src/data/streams/pull_phone_data.R index c1d7997a..0126a8bd 100644 --- a/src/data/streams/pull_phone_data.R +++ b/src/data/streams/pull_phone_data.R @@ -156,7 +156,7 @@ pull_phone_data <- function(){ mutated_data <- mutate_data(mutation_scripts, renamed_data, data_configuration) if(!setequal(expected_columns, colnames(mutated_data))) - stop(paste0("The mutated data for ", device, " does not have the columns RAPIDS expects. The container script returned [", paste(colnames(mutated_data), collapse=","),"] but RAPIDS expected [",paste(expected_columns, collapse=","), "]. One ore more mutation scripts in [", sensor,"][MUTATION][SCRIPTS] are adding extra columns or removing or not adding the ones expected")) + stop(paste0("The mutated data for ", device, " does not have the columns RAPIDS expects. The mutation script returned [", paste(colnames(mutated_data), collapse=","),"] but RAPIDS expected [",paste(expected_columns, collapse=","), "]. One ore more mutation scripts in [", sensor,"][MUTATION][SCRIPTS] are adding extra columns or removing or not adding the ones expected")) participant_data <- rbind(participant_data, mutated_data) } diff --git a/src/data/streams/pull_wearable_data.R b/src/data/streams/pull_wearable_data.R index 1c1b69dd..09bf5fa3 100644 --- a/src/data/streams/pull_wearable_data.R +++ b/src/data/streams/pull_wearable_data.R @@ -110,7 +110,7 @@ pull_wearable_data_main <- function(){ mutated_data <- mutate_data(mutation_scripts, renamed_data, data_configuration) if(!setequal(expected_columns, colnames(mutated_data))) - stop(paste0("The mutated data for ", device, " does not have the columns RAPIDS expects. The container script returned [", paste(colnames(mutated_data), collapse=","),"] but RAPIDS expected [",paste(expected_columns, collapse=","), "]. One ore more mutation scripts in [", sensor,"][MUTATION][SCRIPTS] are adding extra columns or removing or not adding the ones expected")) + stop(paste0("The mutated data for ", device, " does not have the columns RAPIDS expects. The mutation script returned [", paste(colnames(mutated_data), collapse=","),"] but RAPIDS expected [",paste(expected_columns, collapse=","), "]. One ore more mutation scripts in [", sensor,"][MUTATION][SCRIPTS] are adding extra columns or removing or not adding the ones expected")) participant_data <- rbind(participant_data, mutated_data) } diff --git a/src/data/streams/rapids_columns.yaml b/src/data/streams/rapids_columns.yaml index 99fc2377..d6e5ac36 100644 --- a/src/data/streams/rapids_columns.yaml +++ b/src/data/streams/rapids_columns.yaml @@ -159,6 +159,16 @@ FITBIT_SLEEP_SUMMARY: - IS_MAIN_SLEEP - TYPE +FITBIT_SLEEP_INTRADAY: + - TIMESTAMP + - DEVICE_ID + - LOCAL_DATE_TIME + - TYPE_EPISODE_ID + - DURATION + - IS_MAIN_SLEEP + - TYPE + - LEVEL + FITBIT_STEPS_SUMMARY: - TIMESTAMP - DEVICE_ID