diff --git a/Snakefile b/Snakefile index 9c91e7ae..0b1fa5c3 100644 --- a/Snakefile +++ b/Snakefile @@ -169,6 +169,8 @@ for provider in config["FITBIT_STEPS"]["PROVIDERS"].keys(): files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_{fitbit_data_type}_raw.csv", pid=config["PIDS"], fitbit_data_type=(["json"] if config["FITBIT_STEPS"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"]))) files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_{fitbit_data_type}_parsed.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_{fitbit_data_type}_parsed_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) + files_to_compute.extend(expand("data/interim/{pid}/fitbit_steps_features/fitbit_steps_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["FITBIT_STEPS"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower())) + files_to_compute.extend(expand("data/processed/features/{pid}/fitbit_steps.csv", pid=config["PIDS"])) for provider in config["FITBIT_CALORIES"]["PROVIDERS"].keys(): if config["FITBIT_CALORIES"]["PROVIDERS"][provider]["COMPUTE"]: diff --git a/config.yaml b/config.yaml index f2b2effc..3506cd19 100644 --- a/config.yaml +++ b/config.yaml @@ -272,7 +272,6 @@ FITBIT_HEARTRATE: SRC_FOLDER: "rapids" # inside src/features/fitbit_heartrate SRC_LANGUAGE: "python" - FITBIT_STEPS: TABLE_FORMAT: JSON # JSON or CSV. If your JSON or CSV data are files change [SENSOR_DATA][FITBIT][SOURCE][TYPE] to FILES TABLE: @@ -286,16 +285,19 @@ FITBIT_STEPS: FIXED: START: "23:00" END: "07:00" - PROVIDERS: RAPIDS: COMPUTE: False FEATURES: - ALL_STEPS: ["sumallsteps", "maxallsteps", "minallsteps", "avgallsteps", "stdallsteps"] - SEDENTARY_BOUT: ["countepisode", "sumduration", "maxduration", "minduration", "avgduration", "stdduration"] - ACTIVE_BOUT: ["countepisode", "sumduration", "maxduration", "minduration", "avgduration", "stdduration"] + SUMMARY: ["maxsumsteps", "minsumsteps", "avgsumsteps", "mediansumsteps", "stdsumsteps"] + INTRADAY: + STEPS: ["sum", "max", "min", "avg", "std"] + SEDENTARY_BOUT: ["countepisode", "sumduration", "maxduration", "minduration", "avgduration", "stdduration"] + ACTIVE_BOUT: ["countepisode", "sumduration", "maxduration", "minduration", "avgduration", "stdduration"] THRESHOLD_ACTIVE_BOUT: 10 # steps INCLUDE_ZERO_STEP_ROWS: False + SRC_FOLDER: "rapids" # inside src/features/fitbit_steps + SRC_LANGUAGE: "python" FITBIT_SLEEP: TABLE_FORMAT: JSON # JSON or CSV. If your JSON or CSV data are files change [SENSOR_DATA][FITBIT][SOURCE][TYPE] to FILES diff --git a/rules/features.smk b/rules/features.smk index 10e8b353..5e659236 100644 --- a/rules/features.smk +++ b/rules/features.smk @@ -229,6 +229,19 @@ rule fitbit_heartrate_python_features: script: "../src/features/entry.py" +rule fitbit_steps_python_features: + input: + sensor_data = expand("data/raw/{{pid}}/fitbit_steps_{fitbit_data_type}_parsed_with_datetime.csv", fitbit_data_type=["summary", "intraday"]), + day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv" + params: + provider = lambda wildcards: config["FITBIT_STEPS"]["PROVIDERS"][wildcards.provider_key.upper()], + provider_key = "{provider_key}", + sensor_key = "fitbit_steps" + output: + "data/interim/{pid}/fitbit_steps_features/fitbit_steps_python_{provider_key}.csv" + script: + "../src/features/entry.py" + # rule fitbit_heartrate_features: # input: # heartrate_summary_data = "data/raw/{pid}/fitbit_heartrate_summary_with_datetime.csv", diff --git a/src/data/fitbit_parse_steps.py b/src/data/fitbit_parse_steps.py index f0d067f2..e7c94987 100644 --- a/src/data/fitbit_parse_steps.py +++ b/src/data/fitbit_parse_steps.py @@ -4,15 +4,7 @@ import numpy as np from datetime import datetime, timezone from math import trunc -STEPS_SUMMARY_COLUMNS = ("device_id", - "steps_rapids_intradaycountallsteps", - "local_date_time", - "timestamp") - -STEPS_INTRADAY_COLUMNS = ("device_id", - "steps", - "local_date_time", - "timestamp") +STEPS_COLUMNS = ("device_id", "steps", "local_date_time", "timestamp") def parseStepsData(steps_data): @@ -48,7 +40,7 @@ def parseStepsData(steps_data): records_intraday.append(row_intraday) - return pd.DataFrame(data=records_summary, columns=STEPS_SUMMARY_COLUMNS), pd.DataFrame(data=records_intraday, columns=STEPS_INTRADAY_COLUMNS) + return pd.DataFrame(data=records_summary, columns=STEPS_COLUMNS), pd.DataFrame(data=records_intraday, columns=STEPS_COLUMNS) table_format = snakemake.params["table_format"] timezone = snakemake.params["timezone"] diff --git a/src/features/fitbit_step/fitbit_step_base.py b/src/features/fitbit_step/fitbit_step_base.py deleted file mode 100644 index ba08a43a..00000000 --- a/src/features/fitbit_step/fitbit_step_base.py +++ /dev/null @@ -1,103 +0,0 @@ -import pandas as pd -import numpy as np - -def getBouts(step_data, time_interval): - # resample the data into time_interval minute bins, set "isactivebout" column to be NA if it is missing - resampled_step_minute = pd.DataFrame(step_data.resample(str(time_interval) + "T", on="local_date_time")["isactivebout"].sum(min_count=1)) - - # group rows by consecutive values of "isactivebout" column - group = pd.DataFrame(resampled_step_minute["isactivebout"] != resampled_step_minute["isactivebout"].shift()).cumsum().rename(columns={"isactivebout": "group_idx"}) - - # combine resampled_acc_minute and group column - resampled_step_minute = pd.concat([resampled_step_minute, group], axis=1) - - # drop rows where "isactivebout" column is missing and reset the index - resampled_step_minute.dropna(subset=["isactivebout"], inplace=True) - resampled_step_minute.reset_index(inplace=True) - resampled_step_minute.loc[:, "local_date"] = resampled_step_minute["local_date_time"].dt.date - - # duration column contains the number of minutes (rows) of active and sedentary bout - bouts = resampled_step_minute.groupby(["isactivebout", "group_idx", "local_date"]).count().rename(columns={"local_date_time": "duration"}).reset_index() - bouts["duration"] = bouts["duration"] * time_interval - - return bouts - -def statsFeatures(step_data, day_segment, features_to_compute, features_type, step_features): - if features_type == "allsteps": - col_name = "steps" - elif features_type == "durationsedentarybout" or features_type == "durationactivebout": - col_name = "duration" - else: - raise ValueError("features_type can only be one of ['allsteps', 'durationsedentarybout', 'durationactivebout'].") - - if "count" + features_type.replace("duration", "episode") in features_to_compute: - step_features["step_" + day_segment + "_count" + features_type.replace("duration", "episode")] = step_data.groupby(["local_date"])[col_name].count() - if "sum" + features_type in features_to_compute: - step_features["step_" + day_segment + "_sum" + features_type] = step_data.groupby(["local_date"])[col_name].sum() - if "max" + features_type in features_to_compute: - step_features["step_" + day_segment + "_max" + features_type] = step_data.groupby(["local_date"])[col_name].max() - if "min" + features_type in features_to_compute: - step_features["step_" + day_segment + "_min" + features_type] = step_data.groupby(["local_date"])[col_name].min() - if "avg" + features_type in features_to_compute: - step_features["step_" + day_segment + "_avg" + features_type] = step_data.groupby(["local_date"])[col_name].mean() - if "median" + features_type in features_to_compute: - step_features["step_" + day_segment + "_median" + features_type] = step_data.groupby(["local_date"])[col_name].median() - if "std" + features_type in features_to_compute: - step_features["step_" + day_segment + "_std" + features_type] = step_data.groupby(["local_date"])[col_name].std() - - return step_features - -def base_fitbit_step_features(step_data, day_segment, requested_features, threshold_active_bout, include_zero_step_rows): - requested_features_allsteps = requested_features["features_all_steps"] - requested_features_sedentarybout = requested_features["features_sedentary_bout"] - requested_features_activebout = requested_features["features_active_bout"] - - # name of the features this function can compute - base_features_allsteps = ["sumallsteps", "maxallsteps", "minallsteps", "avgallsteps", "stdallsteps"] - base_features_sedentarybout = ["countepisodesedentarybout", "sumdurationsedentarybout", "maxdurationsedentarybout", "mindurationsedentarybout", "avgdurationsedentarybout", "stddurationsedentarybout"] - base_features_activebout = ["countepisodeactivebout", "sumdurationactivebout", "maxdurationactivebout", "mindurationactivebout", "avgdurationactivebout", "stddurationactivebout"] - # the subset of requested features this function can compute - features_to_compute_allsteps = list(set(requested_features_allsteps) & set(base_features_allsteps)) - features_to_compute_sedentarybout = list(set(requested_features_sedentarybout) & set(base_features_sedentarybout)) - features_to_compute_activebout = list(set(requested_features_activebout) & set(base_features_activebout)) - - features_to_compute = features_to_compute_allsteps + features_to_compute_sedentarybout + features_to_compute_activebout - - step_features = pd.DataFrame(columns=["local_date"] + ["step_" + day_segment + "_" + x for x in features_to_compute]) - if not step_data.empty: - if day_segment != "daily": - step_data =step_data[step_data["local_day_segment"] == day_segment] - - if not step_data.empty: - step_features = pd.DataFrame() - - # statistics features of step count - step_features = statsFeatures(step_data, day_segment, features_to_compute_allsteps, "allsteps", step_features) - - # calculate time interval between two records in minutes - time_interval = step_data["local_date_time"].diff().min().total_seconds() / 60 - - # sedentary bout: less than THRESHOLD_ACTIVE_BOUT (default: 10) steps in a minute - # active bout: greater or equal to THRESHOLD_ACTIVE_BOUT (default: 10) steps in a minute - isactivebout = np.where(step_data["steps"] < int(threshold_active_bout) * time_interval, 0, 1) - step_data = step_data.assign(isactivebout = isactivebout) - - bouts = getBouts(step_data, time_interval) - - # statistics features of sedentary bout - sedentary_bout = bouts[bouts["isactivebout"] == 0] - step_features = statsFeatures(sedentary_bout, day_segment, features_to_compute_sedentarybout, "durationsedentarybout", step_features) - - # statistics features of active bout - active_bout = bouts[bouts["isactivebout"] == 1] - step_features = statsFeatures(active_bout, day_segment, features_to_compute_activebout, "durationactivebout", step_features) - - # exclude data when the total step count is ZERO during the whole epoch - if not include_zero_step_rows: - step_features["sumallsteps_aux"] = step_data.groupby(["local_date"])["steps"].sum() - step_features = step_features.query("sumallsteps_aux != 0") - del step_features["sumallsteps_aux"] - - step_features = step_features.reset_index() - - return step_features diff --git a/src/features/fitbit_steps/rapids/main.py b/src/features/fitbit_steps/rapids/main.py new file mode 100644 index 00000000..66777724 --- /dev/null +++ b/src/features/fitbit_steps/rapids/main.py @@ -0,0 +1,141 @@ +import pandas as pd +import numpy as np + +def statsFeatures(steps_data, features_to_compute, features_type, steps_features): + if features_type == "steps" or features_type == "sumsteps": + col_name = "steps" + elif features_type == "durationsedentarybout" or features_type == "durationactivebout": + col_name = "duration" + else: + raise ValueError("features_type can only be one of ['steps', 'sumsteps', 'durationsedentarybout', 'durationactivebout'].") + + if ("summarycount" if features_type == "sumsteps" else "intradaycount") + features_type.replace("duration", "episode") in features_to_compute: + steps_features["steps_rapids_" + ("summarycount" if features_type == "sumsteps" else "intradaycount") + features_type.replace("duration", "episode")] = steps_data.groupby(["local_segment"])[col_name].count() + if ("summarysum" if features_type == "sumsteps" else "intradaysum") + features_type in features_to_compute: + steps_features["steps_rapids_" + ("summarysum" if features_type == "sumsteps" else "intradaysum") + features_type] = steps_data.groupby(["local_segment"])[col_name].sum() + if ("summarymax" if features_type == "sumsteps" else "intradaymax") + features_type in features_to_compute: + steps_features["steps_rapids_" + ("summarymax" if features_type == "sumsteps" else "intradaymax") + features_type] = steps_data.groupby(["local_segment"])[col_name].max() + if ("summarymin" if features_type == "sumsteps" else "intradaymin") + features_type in features_to_compute: + steps_features["steps_rapids_" + ("summarymin" if features_type == "sumsteps" else "intradaymin") + features_type] = steps_data.groupby(["local_segment"])[col_name].min() + if ("summaryavg" if features_type == "sumsteps" else "intradayavg") + features_type in features_to_compute: + steps_features["steps_rapids_" + ("summaryavg" if features_type == "sumsteps" else "intradayavg") + features_type] = steps_data.groupby(["local_segment"])[col_name].mean() + if ("summarymedian" if features_type == "sumsteps" else "intradaymedian") + features_type in features_to_compute: + steps_features["steps_rapids_" + ("summarymedian" if features_type == "sumsteps" else "intradaymedian") + features_type] = steps_data.groupby(["local_segment"])[col_name].median() + if ("summarystd" if features_type == "sumsteps" else "intradaystd") + features_type in features_to_compute: + steps_features["steps_rapids_" + ("summarystd" if features_type == "sumsteps" else "intradaystd") + features_type] = steps_data.groupby(["local_segment"])[col_name].std() + + return steps_features + +def getBouts(steps_data): + + # put consecutive rows into the same group if they have the same values of "isactivebout", "local_timezone", and "local_segment" + steps_data["group_idx"] = (steps_data[["isactivebout", "local_timezone", "local_segment"]].shift() != steps_data[["isactivebout", "local_timezone", "local_segment"]]).any(axis=1).cumsum() + + # get bouts: duration column contains the number of minutes (rows) of sedentary and active activity for each episode + grouped = steps_data.groupby("group_idx") + bouts = grouped["local_segment"].agg(duration="count") + bouts[["local_segment", "isactivebout"]] = grouped[["local_segment", "isactivebout"]].first() + + return bouts + +def extractStepsFeaturesFromSummaryData(steps_summary_data, summary_features_to_compute): + steps_summary_features = pd.DataFrame() + + # statistics features of daily steps count + steps_summary_features = statsFeatures(steps_summary_data, summary_features_to_compute, "sumsteps", steps_summary_features) + + steps_summary_features.reset_index(inplace=True) + + return steps_summary_features + +def extractStepsFeaturesFromIntradayData(steps_intraday_data, threshold_active_bout, intraday_features_to_compute_steps, intraday_features_to_compute_sedentarybout, intraday_features_to_compute_activebout, steps_intraday_features): + steps_intraday_features = pd.DataFrame() + + # statistics features of steps count + steps_intraday_features = statsFeatures(steps_intraday_data, intraday_features_to_compute_steps, "steps", steps_intraday_features) + + # sedentary bout: less than THRESHOLD_ACTIVE_BOUT (default: 10) steps in a minute + # active bout: greater or equal to THRESHOLD_ACTIVE_BOUT (default: 10) steps in a minute + isactivebout = np.where(steps_intraday_data["steps"] < int(threshold_active_bout), 0, 1) + steps_intraday_data = steps_intraday_data.assign(isactivebout = isactivebout) + bouts = getBouts(steps_intraday_data) + + # statistics features of sedentary bout + sedentary_bout = bouts[bouts["isactivebout"] == 0] + steps_intraday_features = statsFeatures(sedentary_bout, intraday_features_to_compute_sedentarybout, "durationsedentarybout", steps_intraday_features) + + # statistics features of active bout + active_bout = bouts[bouts["isactivebout"] == 1] + steps_intraday_features = statsFeatures(active_bout, intraday_features_to_compute_activebout, "durationactivebout", steps_intraday_features) + + steps_intraday_features.reset_index(inplace=True) + + return steps_intraday_features + + +def rapids_features(sensor_data_files, day_segment, provider, filter_data_by_segment, *args, **kwargs): + + threshold_active_bout = provider["THRESHOLD_ACTIVE_BOUT"] + include_zero_step_rows = provider["INCLUDE_ZERO_STEP_ROWS"] + + steps_summary_data = pd.read_csv(sensor_data_files["sensor_data"][0]) + steps_intraday_data = pd.read_csv(sensor_data_files["sensor_data"][1]) + + requested_summary_features = ["summary" + x for x in provider["FEATURES"]["SUMMARY"]] + requested_intraday_features = provider["FEATURES"]["INTRADAY"] + + requested_intraday_features_steps = ["intraday" + x + "steps" for x in requested_intraday_features["STEPS"]] + requested_intraday_features_sedentarybout = ["intraday" + x + "sedentarybout" for x in requested_intraday_features["SEDENTARY_BOUT"]] + requested_intraday_features_activebout = ["intraday" + x + "activebout" for x in requested_intraday_features["ACTIVE_BOUT"]] + # name of the features this function can compute + base_summary_features = ["summarymaxsumsteps", "summaryminsumsteps", "summaryavgsumsteps", "summarymediansumsteps", "summarystdsumsteps"] + base_intraday_features_steps = ["intradaysumsteps", "intradaymaxsteps", "intradayminsteps", "intradayavgsteps", "intradaystdsteps"] + base_intraday_features_sedentarybout = ["intradaycountepisodesedentarybout", "intradaysumdurationsedentarybout", "intradaymaxdurationsedentarybout", "intradaymindurationsedentarybout", "intradayavgdurationsedentarybout", "intradaystddurationsedentarybout"] + base_intraday_features_activebout = ["intradaycountepisodeactivebout", "intradaysumdurationactivebout", "intradaymaxdurationactivebout", "intradaymindurationactivebout", "intradayavgdurationactivebout", "intradaystddurationactivebout"] + # the subset of requested features this function can compute + intraday_features_to_compute_steps = list(set(requested_intraday_features_steps) & set(base_intraday_features_steps)) + intraday_features_to_compute_sedentarybout = list(set(requested_intraday_features_sedentarybout) & set(base_intraday_features_sedentarybout)) + intraday_features_to_compute_activebout = list(set(requested_intraday_features_activebout) & set(base_intraday_features_activebout)) + + summary_features_to_compute = list(set(requested_summary_features) & set(base_summary_features)) + intraday_features_to_compute = intraday_features_to_compute_steps + intraday_features_to_compute_sedentarybout + intraday_features_to_compute_activebout + + # extract features from summary data + steps_summary_features = pd.DataFrame(columns=["local_segment"] + ["steps_rapids_" + x for x in summary_features_to_compute]) + if not steps_summary_data.empty: + steps_summary_data = filter_data_by_segment(steps_summary_data, day_segment) + + if not steps_summary_data.empty: + # only keep the segments start at 00:00:00 and end at 23:59:59 + datetime_start_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2} 00:00:00" + datetime_end_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2} 23:59:59" + + segment_regex = "{}#{},{}".format(day_segment, datetime_start_regex, datetime_end_regex) + steps_summary_data = steps_summary_data[steps_summary_data["local_segment"].str.match(segment_regex)] + + if not steps_summary_data.empty: + steps_summary_features = extractStepsFeaturesFromSummaryData(steps_summary_data, summary_features_to_compute) + + # extract features from intraday features + steps_intraday_features = pd.DataFrame(columns=["local_segment"] + ["steps_rapids_" + x for x in intraday_features_to_compute]) + if not steps_intraday_data.empty: + steps_intraday_data = filter_data_by_segment(steps_intraday_data, day_segment) + + if not steps_intraday_data.empty: + steps_intraday_features = extractStepsFeaturesFromIntradayData(steps_intraday_data, threshold_active_bout, intraday_features_to_compute_steps, intraday_features_to_compute_sedentarybout, intraday_features_to_compute_activebout, steps_intraday_features) + + # merge summary features and intraday features + steps_features = steps_intraday_features.merge(steps_summary_features, on=["local_segment"], how="outer") + + + # exclude rows when the total step count is ZERO during the whole day + if not include_zero_step_rows: + steps_features.index = steps_features["local_segment"].apply(lambda segment: segment.split("#")[1][:10]) + + steps_features["dailycountstep"] = steps_intraday_data.groupby(["local_date"])["steps"].sum() + steps_features = steps_features.query("dailycountstep != 0") + + del steps_features["dailycountstep"] + steps_features.reset_index(drop=True, inplace=True) + + return steps_features