diff --git a/Snakefile b/Snakefile index dc023282..e95d14ab 100644 --- a/Snakefile +++ b/Snakefile @@ -104,10 +104,12 @@ for provider in config["LIGHT"]["PROVIDERS"].keys(): files_to_compute.extend(expand("data/interim/{pid}/{sensor_key}_features/{sensor_key}_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["LIGHT"]["PROVIDERS"][provider]["SRC_LANGUAGE"], provider_key=provider, sensor_key="LIGHT".lower())) files_to_compute.extend(expand("data/processed/features/{pid}/{sensor_key}.csv", pid=config["PIDS"], sensor_key="LIGHT".lower())) -if config["ACCELEROMETER"]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["ACCELEROMETER"]["DB_TABLE"])) - files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["ACCELEROMETER"]["DB_TABLE"])) - files_to_compute.extend(expand("data/processed/{pid}/accelerometer_{day_segment}.csv", pid = config["PIDS"], day_segment = config["ACCELEROMETER"]["DAY_SEGMENTS"])) +for provider in config["ACCELEROMETER"]["PROVIDERS"].keys(): + if config["ACCELEROMETER"]["PROVIDERS"][provider]["COMPUTE"]: + files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["ACCELEROMETER"]["DB_TABLE"])) + files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["ACCELEROMETER"]["DB_TABLE"])) + files_to_compute.extend(expand("data/interim/{pid}/{sensor_key}_features/{sensor_key}_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["ACCELEROMETER"]["PROVIDERS"][provider]["SRC_LANGUAGE"], provider_key=provider, sensor_key="ACCELEROMETER".lower())) + files_to_compute.extend(expand("data/processed/features/{pid}/{sensor_key}.csv", pid=config["PIDS"], sensor_key="ACCELEROMETER".lower())) for provider in config["APPLICATIONS_FOREGROUND"]["PROVIDERS"].keys(): if config["APPLICATIONS_FOREGROUND"]["PROVIDERS"][provider]["COMPUTE"]: diff --git a/config.yaml b/config.yaml index a852c126..1127e521 100644 --- a/config.yaml +++ b/config.yaml @@ -158,16 +158,23 @@ LIGHT: SRC_FOLDER: "rapids" # inside src/features/light SRC_LANGUAGE: "python" - ACCELEROMETER: - COMPUTE: False DB_TABLE: accelerometer - DAY_SEGMENTS: *day_segments - FEATURES: - MAGNITUDE: ["maxmagnitude", "minmagnitude", "avgmagnitude", "medianmagnitude", "stdmagnitude"] - EXERTIONAL_ACTIVITY_EPISODE: ["sumduration", "maxduration", "minduration", "avgduration", "medianduration", "stdduration"] - NONEXERTIONAL_ACTIVITY_EPISODE: ["sumduration", "maxduration", "minduration", "avgduration", "medianduration", "stdduration"] - VALID_SENSED_MINUTES: False + PROVIDERS: + RAPIDS: + COMPUTE: False + FEATURES: ["maxmagnitude", "minmagnitude", "avgmagnitude", "medianmagnitude", "stdmagnitude"] + SRC_FOLDER: "rapids" # inside src/features/accelerometer + SRC_LANGUAGE: "python" + + PANDA: + COMPUTE: False + VALID_SENSED_MINUTES: False + FEATURES: + exertional_activity_episode: ["sumduration", "maxduration", "minduration", "avgduration", "medianduration", "stdduration"] + nonexertional_activity_episode: ["sumduration", "maxduration", "minduration", "avgduration", "medianduration", "stdduration"] + SRC_FOLDER: "panda" # inside src/features/accelerometer + SRC_LANGUAGE: "python" APPLICATIONS_FOREGROUND: DB_TABLE: applications_foreground diff --git a/rules/features.smk b/rules/features.smk index 2e8424ad..7945d62d 100644 --- a/rules/features.smk +++ b/rules/features.smk @@ -28,19 +28,31 @@ rule resample_episodes_with_datetime: script: "../src/data/readable_datetime.R" -rule accelerometer_features: +rule accelerometer_r_features: input: - expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["ACCELEROMETER"]["DB_TABLE"]), + sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["ACCELEROMETER"]["DB_TABLE"])[0], + day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv" params: - day_segment = "{day_segment}", - magnitude = config["ACCELEROMETER"]["FEATURES"]["MAGNITUDE"], - exertional_activity_episode = config["ACCELEROMETER"]["FEATURES"]["EXERTIONAL_ACTIVITY_EPISODE"], - nonexertional_activity_episode = config["ACCELEROMETER"]["FEATURES"]["NONEXERTIONAL_ACTIVITY_EPISODE"], - valid_sensed_minutes = config["ACCELEROMETER"]["FEATURES"]["VALID_SENSED_MINUTES"], + provider = lambda wildcards: config["ACCELEROMETER"]["PROVIDERS"][wildcards.provider_key], + provider_key = "{provider_key}", + sensor_key = "accelerometer" output: - "data/processed/{pid}/accelerometer_{day_segment}.csv" + "data/interim/{pid}/accelerometer_features/accelerometer_r_{provider_key}.csv" script: - "../src/features/accelerometer_features.py" + "../src/features/entry.R" + +rule accelerometer_python_features: + input: + sensor_data = expand("data/raw/{{pid}}/{sensor}_with_datetime.csv", sensor=config["ACCELEROMETER"]["DB_TABLE"])[0], + day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv" + params: + provider = lambda wildcards: config["ACCELEROMETER"]["PROVIDERS"][wildcards.provider_key], + provider_key = "{provider_key}", + sensor_key = "accelerometer" + output: + "data/interim/{pid}/accelerometer_features/accelerometer_python_{provider_key}.csv" + script: + "../src/features/entry.py" rule activity_recognition_episodes: input: diff --git a/src/features/accelerometer/accelerometer_base.py b/src/features/accelerometer/accelerometer_base.py deleted file mode 100644 index 230584ce..00000000 --- a/src/features/accelerometer/accelerometer_base.py +++ /dev/null @@ -1,111 +0,0 @@ -import pandas as pd -import numpy as np - -def getActivityEpisodes(acc_minute): - # rebuild local date time for resampling - acc_minute["local_datetime"] = pd.to_datetime(acc_minute["local_date"].dt.strftime("%Y-%m-%d") + \ - " " + acc_minute["local_hour"].apply(str) + ":" + acc_minute["local_minute"].apply(str) + ":00") - - # resample the data into 1 minute bins, set "isexertionalactivity" column to be NA if it is missing - resampled_acc_minute = pd.DataFrame(acc_minute.resample("1T", on="local_datetime")["isexertionalactivity"].sum(min_count=1)) - - # group rows by consecutive values of "isexertionalactivity" column - group = pd.DataFrame(resampled_acc_minute["isexertionalactivity"] != resampled_acc_minute["isexertionalactivity"].shift()).cumsum().rename(columns={"isexertionalactivity": "group_idx"}) - - # combine resampled_acc_minute and group column - resampled_acc_minute = pd.concat([resampled_acc_minute, group], axis=1) - - # drop rows where "isexertionalactivity" column is missing and reset the index - resampled_acc_minute.dropna(subset=["isexertionalactivity"], inplace=True) - resampled_acc_minute.reset_index(inplace=True) - resampled_acc_minute.loc[:, "local_date"] = resampled_acc_minute["local_datetime"].dt.date - - # duration column contains the number of minutes (rows) of exertional and nonexertional activity for each episode - activity_episode = resampled_acc_minute.groupby(["isexertionalactivity", "group_idx", "local_date"]).count().rename(columns={"local_datetime": "duration"}).reset_index() - - return activity_episode - -def dropRowsWithCertainThreshold(data, threshold): - data_grouped = data.groupby(["local_date", "local_hour", "local_minute"]).count() - drop_dates = data_grouped[data_grouped["timestamp"] == threshold].index - data.set_index(["local_date", "local_hour", "local_minute"], inplace = True) - if not drop_dates.empty: - data.drop(drop_dates, axis = 0, inplace = True) - return data.reset_index() - -def statsFeatures(acc_data, day_segment, features_to_compute, features_type, acc_features): - if features_type == "magnitude": - col_name = features_type - elif features_type == "durationexertionalactivityepisode" or features_type == "durationnonexertionalactivityepisode": - col_name = "duration" - else: - raise ValueError("features_type can only be one of ['magnitude', 'durationexertionalactivityepisode', 'durationnonexertionalactivityepisode'].") - - if "sum" + features_type in features_to_compute: - acc_features["acc_" + day_segment + "_sum" + features_type] = acc_data.groupby(["local_date"])[col_name].sum() - if "max" + features_type in features_to_compute: - acc_features["acc_" + day_segment + "_max" + features_type] = acc_data.groupby(["local_date"])[col_name].max() - if "min" + features_type in features_to_compute: - acc_features["acc_" + day_segment + "_min" + features_type] = acc_data.groupby(["local_date"])[col_name].min() - if "avg" + features_type in features_to_compute: - acc_features["acc_" + day_segment + "_avg" + features_type] = acc_data.groupby(["local_date"])[col_name].mean() - if "median" + features_type in features_to_compute: - acc_features["acc_" + day_segment + "_median" + features_type] = acc_data.groupby(["local_date"])[col_name].median() - if "std" + features_type in features_to_compute: - acc_features["acc_" + day_segment + "_std" + features_type] = acc_data.groupby(["local_date"])[col_name].std() - - return acc_features - - - -def base_accelerometer_features(acc_data, day_segment, requested_features, valid_sensed_minutes): - # name of the features this function can compute - base_features_names_magnitude = ["maxmagnitude", "minmagnitude", "avgmagnitude", "medianmagnitude", "stdmagnitude"] - base_features_names_exertionalactivityepisode = ["sumdurationexertionalactivityepisode", "maxdurationexertionalactivityepisode", "mindurationexertionalactivityepisode", "avgdurationexertionalactivityepisode", "mediandurationexertionalactivityepisode", "stddurationexertionalactivityepisode"] - base_features_names_nonexertionalactivityepisode = ["sumdurationnonexertionalactivityepisode", "maxdurationnonexertionalactivityepisode", "mindurationnonexertionalactivityepisode", "avgdurationnonexertionalactivityepisode", "mediandurationnonexertionalactivityepisode", "stddurationnonexertionalactivityepisode"] - # the subset of requested features this function can compute - features_to_compute_magnitude = list(set(requested_features["magnitude"]) & set(base_features_names_magnitude)) - features_to_compute_exertionalactivityepisode = list(set(requested_features["exertional_activity_episode"]) & set(base_features_names_exertionalactivityepisode)) - features_to_compute_nonexertionalactivityepisode = list(set(requested_features["nonexertional_activity_episode"]) & set(base_features_names_nonexertionalactivityepisode)) - - features_to_compute = features_to_compute_magnitude + features_to_compute_exertionalactivityepisode + features_to_compute_nonexertionalactivityepisode + (["validsensedminutes"] if valid_sensed_minutes else []) - - acc_features = pd.DataFrame(columns=["local_date"] + ["acc_" + day_segment + "_" + x for x in features_to_compute]) - if not acc_data.empty: - if day_segment != "daily": - acc_data = acc_data[acc_data["local_day_segment"] == day_segment] - - if not acc_data.empty: - acc_features = pd.DataFrame() - # get magnitude related features: magnitude = sqrt(x^2+y^2+z^2) - magnitude = acc_data.apply(lambda row: np.sqrt(row["double_values_0"] ** 2 + row["double_values_1"] ** 2 + row["double_values_2"] ** 2), axis=1) - acc_data = acc_data.assign(magnitude = magnitude.values) - acc_features = statsFeatures(acc_data, day_segment, features_to_compute_magnitude, "magnitude", acc_features) - - - # get extertional activity features - # reference: https://jamanetwork.com/journals/jamasurgery/fullarticle/2753807 - - # drop rows where we only have one row per minute (no variance) - acc_data = dropRowsWithCertainThreshold(acc_data, 1) - if not acc_data.empty: - # check if the participant performs exertional activity for each minute - acc_minute = pd.DataFrame() - acc_minute["isexertionalactivity"] = (acc_data.groupby(["local_date", "local_hour", "local_minute"])["double_values_0"].var() + acc_data.groupby(["local_date", "local_hour", "local_minute"])["double_values_1"].var() + acc_data.groupby(["local_date", "local_hour", "local_minute"])["double_values_2"].var()).apply(lambda x: 1 if x > 0.15 * (9.807 ** 2) else 0) - acc_minute.reset_index(inplace=True) - - if valid_sensed_minutes: - acc_features["acc_" + day_segment + "_validsensedminutes"] = acc_minute.groupby(["local_date"])["isexertionalactivity"].count() - - activity_episode = getActivityEpisodes(acc_minute) - exertionalactivity_episodes = activity_episode[activity_episode["isexertionalactivity"] == 1] - acc_features = statsFeatures(exertionalactivity_episodes, day_segment, features_to_compute_exertionalactivityepisode, "durationexertionalactivityepisode", acc_features) - - nonexertionalactivity_episodes = activity_episode[activity_episode["isexertionalactivity"] == 0] - acc_features = statsFeatures(nonexertionalactivity_episodes, day_segment, features_to_compute_nonexertionalactivityepisode, "durationnonexertionalactivityepisode", acc_features) - - acc_features[[colname for colname in acc_features.columns if "std" not in colname]] = acc_features[[colname for colname in acc_features.columns if "std" not in colname]].fillna(0) - - acc_features = acc_features.reset_index() - - return acc_features diff --git a/src/features/accelerometer/panda/main.py b/src/features/accelerometer/panda/main.py new file mode 100644 index 00000000..0e6a50cd --- /dev/null +++ b/src/features/accelerometer/panda/main.py @@ -0,0 +1,89 @@ +import pandas as pd +import numpy as np + +def dropRowsWithCertainThreshold(data, threshold): + data_grouped = data.groupby(["local_timezone", "local_segment", "local_date", "local_hour", "local_minute"]) + data_cleaned = data_grouped.filter(lambda x: x["timestamp"].count() > threshold) + return data_cleaned + +def getActivityEpisodes(acc_minute): + # rebuild local date time for resampling + acc_minute["local_datetime"] = pd.to_datetime(acc_minute["local_date"] + \ + " " + acc_minute["local_hour"].apply(str) + ":" + acc_minute["local_minute"].apply(str) + ":00") + + # compute time interval between consecutive rows in minutes + acc_minute["rows_interval"] = round(acc_minute["local_datetime"].diff().dt.total_seconds() / 60, 0) + + # put consecutive rows into the same group if (1) the interval between two rows is 1 minute and (2) have the same values of "isexertionalactivity", "local_timezone", and "local_segment" + acc_minute["group_idx"] = ((acc_minute[["isexertionalactivity", "local_timezone", "local_segment"]].shift() != acc_minute[["isexertionalactivity", "local_timezone", "local_segment"]]).any(axis=1) | (acc_minute["rows_interval"] != 1)).cumsum() + + # get activity episodes: duration column contains the number of minutes (rows) of exertional and nonexertional activity for each episode + grouped = acc_minute.groupby("group_idx") + activity_episodes = grouped["local_segment"].agg(duration="count") + activity_episodes[["local_segment", "isexertionalactivity"]] = grouped[["local_segment", "isexertionalactivity"]].first() + + return activity_episodes + +def statsFeatures(acc_data, features_to_compute, features_type, acc_features): + if "sum" + features_type in features_to_compute: + acc_features["acc_panda_sum" + features_type] = acc_data.groupby(["local_segment"])["duration"].sum() + if "max" + features_type in features_to_compute: + acc_features["acc_panda_max" + features_type] = acc_data.groupby(["local_segment"])["duration"].max() + if "min" + features_type in features_to_compute: + acc_features["acc_panda_min" + features_type] = acc_data.groupby(["local_segment"])["duration"].min() + if "avg" + features_type in features_to_compute: + acc_features["acc_panda_avg" + features_type] = acc_data.groupby(["local_segment"])["duration"].mean() + if "median" + features_type in features_to_compute: + acc_features["acc_panda_median" + features_type] = acc_data.groupby(["local_segment"])["duration"].median() + if "std" + features_type in features_to_compute: + acc_features["acc_panda_std" + features_type] = acc_data.groupby(["local_segment"])["duration"].std() + + return acc_features + + + +def panda_features(sensor_data_files, day_segment, provider, filter_data_by_segment, *args, **kwargs): + + acc_data = pd.read_csv(sensor_data_files["sensor_data"]) + requested_features = provider["FEATURES"] + valid_sensed_minutes = provider["VALID_SENSED_MINUTES"] + # name of the features this function can compute + base_features_names_exertionalactivityepisode = ["sumdurationexertionalactivityepisode", "maxdurationexertionalactivityepisode", "mindurationexertionalactivityepisode", "avgdurationexertionalactivityepisode", "mediandurationexertionalactivityepisode", "stddurationexertionalactivityepisode"] + base_features_names_nonexertionalactivityepisode = ["sumdurationnonexertionalactivityepisode", "maxdurationnonexertionalactivityepisode", "mindurationnonexertionalactivityepisode", "avgdurationnonexertionalactivityepisode", "mediandurationnonexertionalactivityepisode", "stddurationnonexertionalactivityepisode"] + # the subset of requested features this function can compute + features_to_compute_exertionalactivityepisode = list(set([x + "exertionalactivityepisode" for x in requested_features["exertional_activity_episode"]]) & set(base_features_names_exertionalactivityepisode)) + features_to_compute_nonexertionalactivityepisode = list(set([ x + "nonexertionalactivityepisode" for x in requested_features["nonexertional_activity_episode"]]) & set(base_features_names_nonexertionalactivityepisode)) + + features_to_compute = features_to_compute_exertionalactivityepisode + features_to_compute_nonexertionalactivityepisode + (["validsensedminutes"] if valid_sensed_minutes else []) + + acc_features = pd.DataFrame(columns=["local_segment"] + ["acc_panda_" + x for x in features_to_compute]) + if not acc_data.empty: + acc_data = filter_data_by_segment(acc_data, day_segment) + + if not acc_data.empty: + acc_features = pd.DataFrame() + # drop rows where we only have one row per minute (no variance) + acc_data = dropRowsWithCertainThreshold(acc_data, 1) + + if not acc_data.empty: + # check if the participant performs exertional activity for each minute + acc_minute = pd.DataFrame() + acc_minute["isexertionalactivity"] = (acc_data.groupby(["local_timezone", "local_segment", "local_date", "local_hour", "local_minute"])["double_values_0"].var() + acc_data.groupby(["local_timezone", "local_segment", "local_date", "local_hour", "local_minute"])["double_values_1"].var() + acc_data.groupby(["local_timezone", "local_segment", "local_date", "local_hour", "local_minute"])["double_values_2"].var()).apply(lambda x: 1 if x > 0.15 * (9.807 ** 2) else 0) + acc_minute.reset_index(inplace=True) + + if valid_sensed_minutes: + acc_features["acc_panda_validsensedminutes"] = acc_minute.groupby(["local_segment"])["isexertionalactivity"].count() + + activity_episodes = getActivityEpisodes(acc_minute) + # compute exertional episodes features + exertionalactivity_episodes = activity_episodes[activity_episodes["isexertionalactivity"] == 1] + acc_features = statsFeatures(exertionalactivity_episodes, features_to_compute_exertionalactivityepisode, "durationexertionalactivityepisode", acc_features) + # compute non-exertional episodes features + nonexertionalactivity_episodes = activity_episodes[activity_episodes["isexertionalactivity"] == 0] + acc_features = statsFeatures(nonexertionalactivity_episodes, features_to_compute_nonexertionalactivityepisode, "durationnonexertionalactivityepisode", acc_features) + + acc_features[[colname for colname in acc_features.columns if "std" not in colname]] = acc_features[[colname for colname in acc_features.columns if "std" not in colname]].fillna(0) + + acc_features = acc_features.reset_index() + + return acc_features diff --git a/src/features/accelerometer/rapids/main.py b/src/features/accelerometer/rapids/main.py new file mode 100644 index 00000000..09920343 --- /dev/null +++ b/src/features/accelerometer/rapids/main.py @@ -0,0 +1,36 @@ +import pandas as pd +import numpy as np + +def rapids_features(sensor_data_files, day_segment, provider, filter_data_by_segment, *args, **kwargs): + + acc_data = pd.read_csv(sensor_data_files["sensor_data"]) + requested_features = provider["FEATURES"] + # name of the features this function can compute + base_features_names = ["maxmagnitude", "minmagnitude", "avgmagnitude", "medianmagnitude", "stdmagnitude"] + # the subset of requested features this function can compute + features_to_compute = list(set(requested_features) & set(base_features_names)) + + acc_features = pd.DataFrame(columns=["local_segment"] + ["acc_rapids_" + x for x in features_to_compute]) + if not acc_data.empty: + acc_data = filter_data_by_segment(acc_data, day_segment) + + if not acc_data.empty: + acc_features = pd.DataFrame() + # get magnitude related features: magnitude = sqrt(x^2+y^2+z^2) + magnitude = acc_data.apply(lambda row: np.sqrt(row["double_values_0"] ** 2 + row["double_values_1"] ** 2 + row["double_values_2"] ** 2), axis=1) + acc_data = acc_data.assign(magnitude = magnitude.values) + + if "maxmagnitude" in features_to_compute: + acc_features["acc_rapids_maxmagnitude"] = acc_data.groupby(["local_segment"])["magnitude"].max() + if "minmagnitude" in features_to_compute: + acc_features["acc_rapids_minmagnitude"] = acc_data.groupby(["local_segment"])["magnitude"].min() + if "avgmagnitude" in features_to_compute: + acc_features["acc_rapids_avgmagnitude"] = acc_data.groupby(["local_segment"])["magnitude"].mean() + if "medianmagnitude" in features_to_compute: + acc_features["acc_rapids_medianmagnitude"] = acc_data.groupby(["local_segment"])["magnitude"].median() + if "stdmagnitude" in features_to_compute: + acc_features["acc_rapids_stdmagnitude"] = acc_data.groupby(["local_segment"])["magnitude"].std() + + acc_features = acc_features.reset_index() + + return acc_features diff --git a/src/features/accelerometer_features.py b/src/features/accelerometer_features.py deleted file mode 100644 index f4461c1d..00000000 --- a/src/features/accelerometer_features.py +++ /dev/null @@ -1,22 +0,0 @@ -import numpy as np -import pandas as pd -from accelerometer.accelerometer_base import base_accelerometer_features - - -acc_data = pd.read_csv(snakemake.input[0], parse_dates=["local_date_time", "local_date"]) -day_segment = snakemake.params["day_segment"] - -requested_features = {} -requested_features["magnitude"] = snakemake.params["magnitude"] -requested_features["exertional_activity_episode"] = [feature + "exertionalactivityepisode" for feature in snakemake.params["exertional_activity_episode"]] -requested_features["nonexertional_activity_episode"] = [feature + "nonexertionalactivityepisode" for feature in snakemake.params["nonexertional_activity_episode"]] - -valid_sensed_minutes = snakemake.params["valid_sensed_minutes"] - -acc_features = pd.DataFrame(columns=["local_date"]) - -acc_features = acc_features.merge(base_accelerometer_features(acc_data, day_segment, requested_features, valid_sensed_minutes), on="local_date", how="outer") - -assert np.sum([len(x) for x in requested_features.values()]) + (1 if valid_sensed_minutes else 0) + 1 == acc_features.shape[1], "The number of features in the output dataframe (=" + str(acc_features.shape[1]) + ") does not match the expected value (=" + str(np.sum([len(x) for x in requested_features.values()]) + (1 if valid_sensed_minutes else 0)) + " + 1). Verify your accelerometer feature extraction functions" - -acc_features.to_csv(snakemake.output[0], index=False) \ No newline at end of file