Merge branch 'day_segments' of https://github.com/carissalow/rapids into day_segments

pull/103/head
JulioV 2020-11-04 13:14:23 -05:00
commit 40f7ef4935
6 changed files with 165 additions and 118 deletions

View File

@ -169,6 +169,8 @@ for provider in config["FITBIT_STEPS"]["PROVIDERS"].keys():
files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_{fitbit_data_type}_raw.csv", pid=config["PIDS"], fitbit_data_type=(["json"] if config["FITBIT_STEPS"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"]))) files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_{fitbit_data_type}_raw.csv", pid=config["PIDS"], fitbit_data_type=(["json"] if config["FITBIT_STEPS"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"])))
files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_{fitbit_data_type}_parsed.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_{fitbit_data_type}_parsed.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"]))
files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_{fitbit_data_type}_parsed_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"])) files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_{fitbit_data_type}_parsed_with_datetime.csv", pid=config["PIDS"], fitbit_data_type=["summary", "intraday"]))
files_to_compute.extend(expand("data/interim/{pid}/fitbit_steps_features/fitbit_steps_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["FITBIT_STEPS"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/fitbit_steps.csv", pid=config["PIDS"]))
for provider in config["FITBIT_CALORIES"]["PROVIDERS"].keys(): for provider in config["FITBIT_CALORIES"]["PROVIDERS"].keys():
if config["FITBIT_CALORIES"]["PROVIDERS"][provider]["COMPUTE"]: if config["FITBIT_CALORIES"]["PROVIDERS"][provider]["COMPUTE"]:

View File

@ -272,7 +272,6 @@ FITBIT_HEARTRATE:
SRC_FOLDER: "rapids" # inside src/features/fitbit_heartrate SRC_FOLDER: "rapids" # inside src/features/fitbit_heartrate
SRC_LANGUAGE: "python" SRC_LANGUAGE: "python"
FITBIT_STEPS: FITBIT_STEPS:
TABLE_FORMAT: JSON # JSON or CSV. If your JSON or CSV data are files change [SENSOR_DATA][FITBIT][SOURCE][TYPE] to FILES TABLE_FORMAT: JSON # JSON or CSV. If your JSON or CSV data are files change [SENSOR_DATA][FITBIT][SOURCE][TYPE] to FILES
TABLE: TABLE:
@ -286,16 +285,19 @@ FITBIT_STEPS:
FIXED: FIXED:
START: "23:00" START: "23:00"
END: "07:00" END: "07:00"
PROVIDERS: PROVIDERS:
RAPIDS: RAPIDS:
COMPUTE: False COMPUTE: False
FEATURES: FEATURES:
ALL_STEPS: ["sumallsteps", "maxallsteps", "minallsteps", "avgallsteps", "stdallsteps"] SUMMARY: ["maxsumsteps", "minsumsteps", "avgsumsteps", "mediansumsteps", "stdsumsteps"]
INTRADAY:
STEPS: ["sum", "max", "min", "avg", "std"]
SEDENTARY_BOUT: ["countepisode", "sumduration", "maxduration", "minduration", "avgduration", "stdduration"] SEDENTARY_BOUT: ["countepisode", "sumduration", "maxduration", "minduration", "avgduration", "stdduration"]
ACTIVE_BOUT: ["countepisode", "sumduration", "maxduration", "minduration", "avgduration", "stdduration"] ACTIVE_BOUT: ["countepisode", "sumduration", "maxduration", "minduration", "avgduration", "stdduration"]
THRESHOLD_ACTIVE_BOUT: 10 # steps THRESHOLD_ACTIVE_BOUT: 10 # steps
INCLUDE_ZERO_STEP_ROWS: False INCLUDE_ZERO_STEP_ROWS: False
SRC_FOLDER: "rapids" # inside src/features/fitbit_steps
SRC_LANGUAGE: "python"
FITBIT_SLEEP: FITBIT_SLEEP:
TABLE_FORMAT: JSON # JSON or CSV. If your JSON or CSV data are files change [SENSOR_DATA][FITBIT][SOURCE][TYPE] to FILES TABLE_FORMAT: JSON # JSON or CSV. If your JSON or CSV data are files change [SENSOR_DATA][FITBIT][SOURCE][TYPE] to FILES

View File

@ -229,6 +229,19 @@ rule fitbit_heartrate_python_features:
script: script:
"../src/features/entry.py" "../src/features/entry.py"
rule fitbit_steps_python_features:
input:
sensor_data = expand("data/raw/{{pid}}/fitbit_steps_{fitbit_data_type}_parsed_with_datetime.csv", fitbit_data_type=["summary", "intraday"]),
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["FITBIT_STEPS"]["PROVIDERS"][wildcards.provider_key.upper()],
provider_key = "{provider_key}",
sensor_key = "fitbit_steps"
output:
"data/interim/{pid}/fitbit_steps_features/fitbit_steps_python_{provider_key}.csv"
script:
"../src/features/entry.py"
# rule fitbit_heartrate_features: # rule fitbit_heartrate_features:
# input: # input:
# heartrate_summary_data = "data/raw/{pid}/fitbit_heartrate_summary_with_datetime.csv", # heartrate_summary_data = "data/raw/{pid}/fitbit_heartrate_summary_with_datetime.csv",

View File

@ -4,15 +4,7 @@ import numpy as np
from datetime import datetime, timezone from datetime import datetime, timezone
from math import trunc from math import trunc
STEPS_SUMMARY_COLUMNS = ("device_id", STEPS_COLUMNS = ("device_id", "steps", "local_date_time", "timestamp")
"steps_rapids_intradaycountallsteps",
"local_date_time",
"timestamp")
STEPS_INTRADAY_COLUMNS = ("device_id",
"steps",
"local_date_time",
"timestamp")
def parseStepsData(steps_data): def parseStepsData(steps_data):
@ -48,7 +40,7 @@ def parseStepsData(steps_data):
records_intraday.append(row_intraday) records_intraday.append(row_intraday)
return pd.DataFrame(data=records_summary, columns=STEPS_SUMMARY_COLUMNS), pd.DataFrame(data=records_intraday, columns=STEPS_INTRADAY_COLUMNS) return pd.DataFrame(data=records_summary, columns=STEPS_COLUMNS), pd.DataFrame(data=records_intraday, columns=STEPS_COLUMNS)
table_format = snakemake.params["table_format"] table_format = snakemake.params["table_format"]
timezone = snakemake.params["timezone"] timezone = snakemake.params["timezone"]

View File

@ -1,103 +0,0 @@
import pandas as pd
import numpy as np
def getBouts(step_data, time_interval):
# resample the data into time_interval minute bins, set "isactivebout" column to be NA if it is missing
resampled_step_minute = pd.DataFrame(step_data.resample(str(time_interval) + "T", on="local_date_time")["isactivebout"].sum(min_count=1))
# group rows by consecutive values of "isactivebout" column
group = pd.DataFrame(resampled_step_minute["isactivebout"] != resampled_step_minute["isactivebout"].shift()).cumsum().rename(columns={"isactivebout": "group_idx"})
# combine resampled_acc_minute and group column
resampled_step_minute = pd.concat([resampled_step_minute, group], axis=1)
# drop rows where "isactivebout" column is missing and reset the index
resampled_step_minute.dropna(subset=["isactivebout"], inplace=True)
resampled_step_minute.reset_index(inplace=True)
resampled_step_minute.loc[:, "local_date"] = resampled_step_minute["local_date_time"].dt.date
# duration column contains the number of minutes (rows) of active and sedentary bout
bouts = resampled_step_minute.groupby(["isactivebout", "group_idx", "local_date"]).count().rename(columns={"local_date_time": "duration"}).reset_index()
bouts["duration"] = bouts["duration"] * time_interval
return bouts
def statsFeatures(step_data, day_segment, features_to_compute, features_type, step_features):
if features_type == "allsteps":
col_name = "steps"
elif features_type == "durationsedentarybout" or features_type == "durationactivebout":
col_name = "duration"
else:
raise ValueError("features_type can only be one of ['allsteps', 'durationsedentarybout', 'durationactivebout'].")
if "count" + features_type.replace("duration", "episode") in features_to_compute:
step_features["step_" + day_segment + "_count" + features_type.replace("duration", "episode")] = step_data.groupby(["local_date"])[col_name].count()
if "sum" + features_type in features_to_compute:
step_features["step_" + day_segment + "_sum" + features_type] = step_data.groupby(["local_date"])[col_name].sum()
if "max" + features_type in features_to_compute:
step_features["step_" + day_segment + "_max" + features_type] = step_data.groupby(["local_date"])[col_name].max()
if "min" + features_type in features_to_compute:
step_features["step_" + day_segment + "_min" + features_type] = step_data.groupby(["local_date"])[col_name].min()
if "avg" + features_type in features_to_compute:
step_features["step_" + day_segment + "_avg" + features_type] = step_data.groupby(["local_date"])[col_name].mean()
if "median" + features_type in features_to_compute:
step_features["step_" + day_segment + "_median" + features_type] = step_data.groupby(["local_date"])[col_name].median()
if "std" + features_type in features_to_compute:
step_features["step_" + day_segment + "_std" + features_type] = step_data.groupby(["local_date"])[col_name].std()
return step_features
def base_fitbit_step_features(step_data, day_segment, requested_features, threshold_active_bout, include_zero_step_rows):
requested_features_allsteps = requested_features["features_all_steps"]
requested_features_sedentarybout = requested_features["features_sedentary_bout"]
requested_features_activebout = requested_features["features_active_bout"]
# name of the features this function can compute
base_features_allsteps = ["sumallsteps", "maxallsteps", "minallsteps", "avgallsteps", "stdallsteps"]
base_features_sedentarybout = ["countepisodesedentarybout", "sumdurationsedentarybout", "maxdurationsedentarybout", "mindurationsedentarybout", "avgdurationsedentarybout", "stddurationsedentarybout"]
base_features_activebout = ["countepisodeactivebout", "sumdurationactivebout", "maxdurationactivebout", "mindurationactivebout", "avgdurationactivebout", "stddurationactivebout"]
# the subset of requested features this function can compute
features_to_compute_allsteps = list(set(requested_features_allsteps) & set(base_features_allsteps))
features_to_compute_sedentarybout = list(set(requested_features_sedentarybout) & set(base_features_sedentarybout))
features_to_compute_activebout = list(set(requested_features_activebout) & set(base_features_activebout))
features_to_compute = features_to_compute_allsteps + features_to_compute_sedentarybout + features_to_compute_activebout
step_features = pd.DataFrame(columns=["local_date"] + ["step_" + day_segment + "_" + x for x in features_to_compute])
if not step_data.empty:
if day_segment != "daily":
step_data =step_data[step_data["local_day_segment"] == day_segment]
if not step_data.empty:
step_features = pd.DataFrame()
# statistics features of step count
step_features = statsFeatures(step_data, day_segment, features_to_compute_allsteps, "allsteps", step_features)
# calculate time interval between two records in minutes
time_interval = step_data["local_date_time"].diff().min().total_seconds() / 60
# sedentary bout: less than THRESHOLD_ACTIVE_BOUT (default: 10) steps in a minute
# active bout: greater or equal to THRESHOLD_ACTIVE_BOUT (default: 10) steps in a minute
isactivebout = np.where(step_data["steps"] < int(threshold_active_bout) * time_interval, 0, 1)
step_data = step_data.assign(isactivebout = isactivebout)
bouts = getBouts(step_data, time_interval)
# statistics features of sedentary bout
sedentary_bout = bouts[bouts["isactivebout"] == 0]
step_features = statsFeatures(sedentary_bout, day_segment, features_to_compute_sedentarybout, "durationsedentarybout", step_features)
# statistics features of active bout
active_bout = bouts[bouts["isactivebout"] == 1]
step_features = statsFeatures(active_bout, day_segment, features_to_compute_activebout, "durationactivebout", step_features)
# exclude data when the total step count is ZERO during the whole epoch
if not include_zero_step_rows:
step_features["sumallsteps_aux"] = step_data.groupby(["local_date"])["steps"].sum()
step_features = step_features.query("sumallsteps_aux != 0")
del step_features["sumallsteps_aux"]
step_features = step_features.reset_index()
return step_features

View File

@ -0,0 +1,141 @@
import pandas as pd
import numpy as np
def statsFeatures(steps_data, features_to_compute, features_type, steps_features):
if features_type == "steps" or features_type == "sumsteps":
col_name = "steps"
elif features_type == "durationsedentarybout" or features_type == "durationactivebout":
col_name = "duration"
else:
raise ValueError("features_type can only be one of ['steps', 'sumsteps', 'durationsedentarybout', 'durationactivebout'].")
if ("summarycount" if features_type == "sumsteps" else "intradaycount") + features_type.replace("duration", "episode") in features_to_compute:
steps_features["steps_rapids_" + ("summarycount" if features_type == "sumsteps" else "intradaycount") + features_type.replace("duration", "episode")] = steps_data.groupby(["local_segment"])[col_name].count()
if ("summarysum" if features_type == "sumsteps" else "intradaysum") + features_type in features_to_compute:
steps_features["steps_rapids_" + ("summarysum" if features_type == "sumsteps" else "intradaysum") + features_type] = steps_data.groupby(["local_segment"])[col_name].sum()
if ("summarymax" if features_type == "sumsteps" else "intradaymax") + features_type in features_to_compute:
steps_features["steps_rapids_" + ("summarymax" if features_type == "sumsteps" else "intradaymax") + features_type] = steps_data.groupby(["local_segment"])[col_name].max()
if ("summarymin" if features_type == "sumsteps" else "intradaymin") + features_type in features_to_compute:
steps_features["steps_rapids_" + ("summarymin" if features_type == "sumsteps" else "intradaymin") + features_type] = steps_data.groupby(["local_segment"])[col_name].min()
if ("summaryavg" if features_type == "sumsteps" else "intradayavg") + features_type in features_to_compute:
steps_features["steps_rapids_" + ("summaryavg" if features_type == "sumsteps" else "intradayavg") + features_type] = steps_data.groupby(["local_segment"])[col_name].mean()
if ("summarymedian" if features_type == "sumsteps" else "intradaymedian") + features_type in features_to_compute:
steps_features["steps_rapids_" + ("summarymedian" if features_type == "sumsteps" else "intradaymedian") + features_type] = steps_data.groupby(["local_segment"])[col_name].median()
if ("summarystd" if features_type == "sumsteps" else "intradaystd") + features_type in features_to_compute:
steps_features["steps_rapids_" + ("summarystd" if features_type == "sumsteps" else "intradaystd") + features_type] = steps_data.groupby(["local_segment"])[col_name].std()
return steps_features
def getBouts(steps_data):
# put consecutive rows into the same group if they have the same values of "isactivebout", "local_timezone", and "local_segment"
steps_data["group_idx"] = (steps_data[["isactivebout", "local_timezone", "local_segment"]].shift() != steps_data[["isactivebout", "local_timezone", "local_segment"]]).any(axis=1).cumsum()
# get bouts: duration column contains the number of minutes (rows) of sedentary and active activity for each episode
grouped = steps_data.groupby("group_idx")
bouts = grouped["local_segment"].agg(duration="count")
bouts[["local_segment", "isactivebout"]] = grouped[["local_segment", "isactivebout"]].first()
return bouts
def extractStepsFeaturesFromSummaryData(steps_summary_data, summary_features_to_compute):
steps_summary_features = pd.DataFrame()
# statistics features of daily steps count
steps_summary_features = statsFeatures(steps_summary_data, summary_features_to_compute, "sumsteps", steps_summary_features)
steps_summary_features.reset_index(inplace=True)
return steps_summary_features
def extractStepsFeaturesFromIntradayData(steps_intraday_data, threshold_active_bout, intraday_features_to_compute_steps, intraday_features_to_compute_sedentarybout, intraday_features_to_compute_activebout, steps_intraday_features):
steps_intraday_features = pd.DataFrame()
# statistics features of steps count
steps_intraday_features = statsFeatures(steps_intraday_data, intraday_features_to_compute_steps, "steps", steps_intraday_features)
# sedentary bout: less than THRESHOLD_ACTIVE_BOUT (default: 10) steps in a minute
# active bout: greater or equal to THRESHOLD_ACTIVE_BOUT (default: 10) steps in a minute
isactivebout = np.where(steps_intraday_data["steps"] < int(threshold_active_bout), 0, 1)
steps_intraday_data = steps_intraday_data.assign(isactivebout = isactivebout)
bouts = getBouts(steps_intraday_data)
# statistics features of sedentary bout
sedentary_bout = bouts[bouts["isactivebout"] == 0]
steps_intraday_features = statsFeatures(sedentary_bout, intraday_features_to_compute_sedentarybout, "durationsedentarybout", steps_intraday_features)
# statistics features of active bout
active_bout = bouts[bouts["isactivebout"] == 1]
steps_intraday_features = statsFeatures(active_bout, intraday_features_to_compute_activebout, "durationactivebout", steps_intraday_features)
steps_intraday_features.reset_index(inplace=True)
return steps_intraday_features
def rapids_features(sensor_data_files, day_segment, provider, filter_data_by_segment, *args, **kwargs):
threshold_active_bout = provider["THRESHOLD_ACTIVE_BOUT"]
include_zero_step_rows = provider["INCLUDE_ZERO_STEP_ROWS"]
steps_summary_data = pd.read_csv(sensor_data_files["sensor_data"][0])
steps_intraday_data = pd.read_csv(sensor_data_files["sensor_data"][1])
requested_summary_features = ["summary" + x for x in provider["FEATURES"]["SUMMARY"]]
requested_intraday_features = provider["FEATURES"]["INTRADAY"]
requested_intraday_features_steps = ["intraday" + x + "steps" for x in requested_intraday_features["STEPS"]]
requested_intraday_features_sedentarybout = ["intraday" + x + "sedentarybout" for x in requested_intraday_features["SEDENTARY_BOUT"]]
requested_intraday_features_activebout = ["intraday" + x + "activebout" for x in requested_intraday_features["ACTIVE_BOUT"]]
# name of the features this function can compute
base_summary_features = ["summarymaxsumsteps", "summaryminsumsteps", "summaryavgsumsteps", "summarymediansumsteps", "summarystdsumsteps"]
base_intraday_features_steps = ["intradaysumsteps", "intradaymaxsteps", "intradayminsteps", "intradayavgsteps", "intradaystdsteps"]
base_intraday_features_sedentarybout = ["intradaycountepisodesedentarybout", "intradaysumdurationsedentarybout", "intradaymaxdurationsedentarybout", "intradaymindurationsedentarybout", "intradayavgdurationsedentarybout", "intradaystddurationsedentarybout"]
base_intraday_features_activebout = ["intradaycountepisodeactivebout", "intradaysumdurationactivebout", "intradaymaxdurationactivebout", "intradaymindurationactivebout", "intradayavgdurationactivebout", "intradaystddurationactivebout"]
# the subset of requested features this function can compute
intraday_features_to_compute_steps = list(set(requested_intraday_features_steps) & set(base_intraday_features_steps))
intraday_features_to_compute_sedentarybout = list(set(requested_intraday_features_sedentarybout) & set(base_intraday_features_sedentarybout))
intraday_features_to_compute_activebout = list(set(requested_intraday_features_activebout) & set(base_intraday_features_activebout))
summary_features_to_compute = list(set(requested_summary_features) & set(base_summary_features))
intraday_features_to_compute = intraday_features_to_compute_steps + intraday_features_to_compute_sedentarybout + intraday_features_to_compute_activebout
# extract features from summary data
steps_summary_features = pd.DataFrame(columns=["local_segment"] + ["steps_rapids_" + x for x in summary_features_to_compute])
if not steps_summary_data.empty:
steps_summary_data = filter_data_by_segment(steps_summary_data, day_segment)
if not steps_summary_data.empty:
# only keep the segments start at 00:00:00 and end at 23:59:59
datetime_start_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2} 00:00:00"
datetime_end_regex = "[0-9]{4}[\\-|\\/][0-9]{2}[\\-|\\/][0-9]{2} 23:59:59"
segment_regex = "{}#{},{}".format(day_segment, datetime_start_regex, datetime_end_regex)
steps_summary_data = steps_summary_data[steps_summary_data["local_segment"].str.match(segment_regex)]
if not steps_summary_data.empty:
steps_summary_features = extractStepsFeaturesFromSummaryData(steps_summary_data, summary_features_to_compute)
# extract features from intraday features
steps_intraday_features = pd.DataFrame(columns=["local_segment"] + ["steps_rapids_" + x for x in intraday_features_to_compute])
if not steps_intraday_data.empty:
steps_intraday_data = filter_data_by_segment(steps_intraday_data, day_segment)
if not steps_intraday_data.empty:
steps_intraday_features = extractStepsFeaturesFromIntradayData(steps_intraday_data, threshold_active_bout, intraday_features_to_compute_steps, intraday_features_to_compute_sedentarybout, intraday_features_to_compute_activebout, steps_intraday_features)
# merge summary features and intraday features
steps_features = steps_intraday_features.merge(steps_summary_features, on=["local_segment"], how="outer")
# exclude rows when the total step count is ZERO during the whole day
if not include_zero_step_rows:
steps_features.index = steps_features["local_segment"].apply(lambda segment: segment.split("#")[1][:10])
steps_features["dailycountstep"] = steps_intraday_data.groupby(["local_date"])["steps"].sum()
steps_features = steps_features.query("dailycountstep != 0")
del steps_features["dailycountstep"]
steps_features.reset_index(drop=True, inplace=True)
return steps_features