diff --git a/Snakefile b/Snakefile index 2a251901..f28a86e0 100644 --- a/Snakefile +++ b/Snakefile @@ -65,12 +65,15 @@ if config["ACTIVITY_RECOGNITION"]["COMPUTE"]: files_to_compute.extend(expand("data/processed/{pid}/{sensor}_deltas.csv", pid=pids, sensor=table)) files_to_compute.extend(expand("data/processed/{pid}/activity_recognition_{day_segment}.csv",pid=config["PIDS"], day_segment = config["ACTIVITY_RECOGNITION"]["DAY_SEGMENTS"])) -if config["BATTERY"]["COMPUTE"]: - files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["BATTERY"]["DB_TABLE"])) - files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["BATTERY"]["DB_TABLE"])) - files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime_unified.csv", pid=config["PIDS"], sensor=config["BATTERY"]["DB_TABLE"])) - files_to_compute.extend(expand("data/processed/{pid}/battery_deltas.csv", pid=config["PIDS"])) - files_to_compute.extend(expand("data/processed/{pid}/battery_{day_segment}.csv", pid = config["PIDS"], day_segment = config["BATTERY"]["DAY_SEGMENTS"])) +for provider in config["BATTERY"]["PROVIDERS"].keys(): + if config["BATTERY"]["PROVIDERS"][provider]["COMPUTE"]: + files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["BATTERY"]["DB_TABLE"])) + files_to_compute.extend(expand("data/interim/{pid}/battery_episodes.csv", pid=config["PIDS"])) + files_to_compute.extend(expand("data/interim/{pid}/battery_episodes_resampled.csv", pid=config["PIDS"])) + files_to_compute.extend(expand("data/interim/{pid}/battery_episodes_resampled_with_datetime.csv", pid=config["PIDS"])) + files_to_compute.extend(expand("data/interim/{pid}/{sensor_key}_features/{sensor_key}_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["SCREEN"]["PROVIDERS"][provider]["SRC_LANGUAGE"], provider_key=provider, sensor_key="BATTERY".lower())) + files_to_compute.extend(expand("data/processed/features/{pid}/{sensor_key}.csv", pid=config["PIDS"], sensor_key="BATTERY".lower())) + for provider in config["SCREEN"]["PROVIDERS"].keys(): if config["SCREEN"]["PROVIDERS"][provider]["COMPUTE"]: diff --git a/config.yaml b/config.yaml index 0b937e3e..b14909be 100644 --- a/config.yaml +++ b/config.yaml @@ -121,10 +121,13 @@ ACTIVITY_RECOGNITION: FEATURES: ["count","mostcommonactivity","countuniqueactivities","activitychangecount","sumstationary","summobile","sumvehicle"] BATTERY: - COMPUTE: False DB_TABLE: battery - DAY_SEGMENTS: *day_segments - FEATURES: ["countdischarge", "sumdurationdischarge", "countcharge", "sumdurationcharge", "avgconsumptionrate", "maxconsumptionrate"] + PROVIDERS: + RAPIDS: + COMPUTE: False + FEATURES: ["countdischarge", "sumdurationdischarge", "countcharge", "sumdurationcharge", "avgconsumptionrate", "maxconsumptionrate"] + SRC_FOLDER: "rapids" # inside src/features/battery + SRC_LANGUAGE: "python" SCREEN: DB_TABLE: screen diff --git a/rules/features.smk b/rules/features.smk index d45069f6..d3323260 100644 --- a/rules/features.smk +++ b/rules/features.smk @@ -73,16 +73,14 @@ rule screen_episodes: rule resample_episodes: input: "data/interim/{pid}/{sensor}_episodes.csv" - params: - sensor = "{sensor}" output: "data/interim/{pid}/{sensor}_episodes_resampled.csv" script: "../src/features/utils/resample_episodes.R" -rule resample_screen_episodes_with_datetime: +rule resample_episodes_with_datetime: input: - sensor_input = "data/interim/{pid}/screen_episodes_resampled.csv", + sensor_input = "data/interim/{pid}/{sensor}_episodes_resampled.csv", day_segments = "data/interim/day_segments/{pid}_day_segments.csv" params: timezones = None, @@ -90,7 +88,7 @@ rule resample_screen_episodes_with_datetime: day_segments_type = config["DAY_SEGMENTS"]["TYPE"], include_past_periodic_segments = config["DAY_SEGMENTS"]["INCLUDE_PAST_PERIODIC_SEGMENTS"] output: - "data/interim/{pid}/screen_episodes_resampled_with_datetime.csv" + "data/interim/{pid}/{sensor}_episodes_resampled_with_datetime.csv" script: "../src/data/readable_datetime.R" @@ -170,16 +168,29 @@ rule activity_features: script: "../src/features/activity_recognition.py" -rule battery_features: +rule battery_r_features: input: - "data/interim/{pid}/battery_episodes.csv" + battery_episodes = "data/interim/{pid}/battery_episodes_resampled_with_datetime.csv", + day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv" params: - day_segment = "{day_segment}", - features = config["BATTERY"]["FEATURES"] + provider = lambda wildcards: config["BATTERY"]["PROVIDERS"][wildcards.provider_key], + provider_key = "{provider_key}" output: - "data/processed/{pid}/battery_{day_segment}.csv" + "data/interim/{pid}/battery_features/battery_r_{provider_key}.csv" script: - "../src/features/battery_features.py" + "../src/features/battery/battery_entry.R" + +rule battery_python_features: + input: + battery_episodes = "data/interim/{pid}/battery_episodes_resampled_with_datetime.csv", + day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv" + params: + provider = lambda wildcards: config["BATTERY"]["PROVIDERS"][wildcards.provider_key], + provider_key = "{provider_key}" + output: + "data/interim/{pid}/battery_features/battery_python_{provider_key}.csv" + script: + "../src/features/battery/battery_entry.py" rule screen_r_features: input: diff --git a/src/features/battery/battery_base.py b/src/features/battery/battery_base.py deleted file mode 100644 index c6cdedf6..00000000 --- a/src/features/battery/battery_base.py +++ /dev/null @@ -1,48 +0,0 @@ -import pandas as pd -from datetime import datetime, timedelta, time -from features_utils import splitOvernightEpisodes, splitMultiSegmentEpisodes - - -def base_battery_features(battery_data, day_segment, requested_features): - # name of the features this function can compute - base_features_names = ["countdischarge", "sumdurationdischarge", "countcharge", "sumdurationcharge", "avgconsumptionrate", "maxconsumptionrate"] - # the subset of requested features this function can compute - features_to_compute = list(set(requested_features) & set(base_features_names)) - - battery_features = pd.DataFrame(columns=["local_date"] + ["battery_" + day_segment + "_" + x for x in features_to_compute]) - if not battery_data.empty: - battery_data = splitOvernightEpisodes(battery_data, ["battery_diff"], []) - - if day_segment != "daily": - battery_data = splitMultiSegmentEpisodes(battery_data, day_segment, ["battery_diff"]) - - if not battery_data.empty: - battery_data["battery_consumption_rate"] = battery_data["battery_diff"] / battery_data["time_diff"] - - # for battery_data_discharge: - battery_data_discharge = battery_data[battery_data["battery_diff"] > 0] - battery_discharge_features = pd.DataFrame() - if "countdischarge" in features_to_compute: - battery_discharge_features["battery_"+day_segment+"_countdischarge"] = battery_data_discharge.groupby(["local_start_date"])["local_start_date"].count() - if "sumdurationdischarge" in features_to_compute: - battery_discharge_features["battery_"+day_segment+"_sumdurationdischarge"] = battery_data_discharge.groupby(["local_start_date"])["time_diff"].sum() - if "avgconsumptionrate" in features_to_compute: - battery_discharge_features["battery_"+day_segment+"_avgconsumptionrate"] = battery_data_discharge.groupby(["local_start_date"])["battery_consumption_rate"].mean() - if "maxconsumptionrate" in features_to_compute: - battery_discharge_features["battery_"+day_segment+"_maxconsumptionrate"] = battery_data_discharge.groupby(["local_start_date"])["battery_consumption_rate"].max() - - # for battery_data_charge: - battery_data_charge = battery_data[battery_data["battery_diff"] <= 0] - battery_charge_features = pd.DataFrame() - if "countcharge" in features_to_compute: - battery_charge_features["battery_"+day_segment+"_countcharge"] = battery_data_charge.groupby(["local_start_date"])["local_start_date"].count() - if "sumdurationcharge" in features_to_compute: - battery_charge_features["battery_"+day_segment+"_sumdurationcharge"] = battery_data_charge.groupby(["local_start_date"])["time_diff"].sum() - - # combine discharge features and charge features; fill the missing values with ZERO - battery_features = pd.concat([battery_discharge_features, battery_charge_features], axis=1, sort=True).fillna(0) - - battery_features.index.rename("local_date", inplace=True) - battery_features = battery_features.reset_index() - - return battery_features diff --git a/src/features/battery/battery_entry.R b/src/features/battery/battery_entry.R new file mode 100644 index 00000000..f86dcd91 --- /dev/null +++ b/src/features/battery/battery_entry.R @@ -0,0 +1,13 @@ +source("renv/activate.R") +source("src/features/utils/utils.R") +library("dplyr") +library("tidyr") + +sensor_data_file <- snakemake@input[["battery_episodes"]] +day_segments_file <- snakemake@input[["day_segments_labels"]] +provider <- snakemake@params["provider"][["provider"]] +provider_key <- snakemake@params["provider_key"] + +sensor_features <- fetch_provider_features(provider, provider_key, "battery", sensor_data_file, day_segments_file) + +write.csv(sensor_features, snakemake@output[[1]], row.names = FALSE) \ No newline at end of file diff --git a/src/features/battery/battery_entry.py b/src/features/battery/battery_entry.py new file mode 100644 index 00000000..c38db5d6 --- /dev/null +++ b/src/features/battery/battery_entry.py @@ -0,0 +1,18 @@ +import pandas as pd +from importlib import import_module, util +from pathlib import Path + +# import fetch_provider_features from src/features/utils/utils.py +spec = util.spec_from_file_location("util", str(Path(snakemake.scriptdir).parent / "utils" / "utils.py")) +mod = util.module_from_spec(spec) +spec.loader.exec_module(mod) +fetch_provider_features = getattr(mod, "fetch_provider_features") + +battery_episodes_file = snakemake.input["battery_episodes"] +day_segments_file = snakemake.input["day_segments_labels"] +provider = snakemake.params["provider"] +provider_key = snakemake.params["provider_key"] + +sensor_features = fetch_provider_features(provider, provider_key, "battery", battery_episodes_file, day_segments_file) + +sensor_features.to_csv(snakemake.output[0], index=False) \ No newline at end of file diff --git a/src/features/battery/rapids/main.py b/src/features/battery/rapids/main.py new file mode 100644 index 00000000..391f7fd7 --- /dev/null +++ b/src/features/battery/rapids/main.py @@ -0,0 +1,56 @@ +import pandas as pd +from datetime import datetime, timedelta, time + +def rapids_features(battery_data, day_segment, provider, filter_data_by_segment, *args, **kwargs): + + # name of the features this function can compute + base_features_names = ["countdischarge", "sumdurationdischarge", "countcharge", "sumdurationcharge", "avgconsumptionrate", "maxconsumptionrate"] + # the subset of requested features this function can compute + requested_features = provider["FEATURES"] + features_to_compute = list(set(requested_features) & set(base_features_names)) + + battery_features = pd.DataFrame(columns=["local_segment"] + ["battery_rapids_" + x for x in features_to_compute]) + if not battery_data.empty: + battery_data = filter_data_by_segment(battery_data, day_segment) + battery_data = kwargs["deduplicate_episodes"](battery_data) + + if not battery_data.empty: + # chunk_episodes + battery_data = kwargs["chunk_episodes"](battery_data) + + if not battery_data.empty: + + battery_data["episode_id"] = ((battery_data.battery_status != battery_data.battery_status.shift()) | (battery_data.start_timestamp - battery_data.end_timestamp.shift() > 1)).cumsum() + grouped = battery_data.groupby(by=["local_segment", "episode_id", "battery_status"]) + battery_episodes= grouped[["time_diff"]].sum() + battery_episodes["battery_diff"] = grouped["battery_level"].first() - grouped["battery_level"].last() + battery_episodes["battery_consumption_rate"] = battery_episodes["battery_diff"] / battery_episodes["time_diff"] + battery_episodes.reset_index(inplace=True) + + # for discharge episodes + battery_discharge_episodes = battery_episodes[(battery_episodes["battery_status"] == 3) | (battery_episodes["battery_status"] == 4)] + battery_discharge_features = pd.DataFrame() + if "countdischarge" in features_to_compute: + battery_discharge_features["battery_rapids_countdischarge"] = battery_discharge_episodes.groupby(["local_segment"])["episode_id"].count() + if "sumdurationdischarge" in features_to_compute: + battery_discharge_features["battery_rapids_sumdurationdischarge"] = battery_discharge_episodes.groupby(["local_segment"])["time_diff"].sum() + if "avgconsumptionrate" in features_to_compute: + battery_discharge_features["battery_rapids_avgconsumptionrate"] = battery_discharge_episodes.groupby(["local_segment"])["battery_consumption_rate"].mean() + if "maxconsumptionrate" in features_to_compute: + battery_discharge_features["battery_rapids_maxconsumptionrate"] = battery_discharge_episodes.groupby(["local_segment"])["battery_consumption_rate"].max() + + # for charge episodes + battery_charge_episodes = battery_episodes[(battery_episodes["battery_status"] == 2) | (battery_episodes["battery_status"] == 5)] + battery_charge_features = pd.DataFrame() + if "countcharge" in features_to_compute: + battery_charge_features["battery_rapids_countcharge"] = battery_charge_episodes.groupby(["local_segment"])["episode_id"].count() + if "sumdurationcharge" in features_to_compute: + battery_charge_features["battery_rapids_sumdurationcharge"] = battery_charge_episodes.groupby(["local_segment"])["time_diff"].sum() + + # combine discharge features and charge features; fill the missing values with ZERO + battery_features = pd.concat([battery_discharge_features, battery_charge_features], axis=1, sort=True).fillna(0) + + battery_features.index.rename("local_segment", inplace=True) + battery_features = battery_features.reset_index() + + return battery_features diff --git a/src/features/screen/episodes/screen_episodes.R b/src/features/screen/episodes/screen_episodes.R index 914bbab4..d5fabf52 100644 --- a/src/features/screen/episodes/screen_episodes.R +++ b/src/features/screen/episodes/screen_episodes.R @@ -44,7 +44,6 @@ get_screen_episodes <- function(screen){ filter( (screen_status == 3 & lead(screen_status) == 0) | (screen_status == 0 & lag(screen_status) == 3) ) %>% summarise(episode = "unlock", screen_sequence = toString(screen_status), - time_diff = (last(timestamp) - first(timestamp)) / (1000 * 60), start_timestamp = first(timestamp), end_timestamp = last(timestamp)) %>% filter(str_detect(screen_sequence, @@ -58,7 +57,6 @@ get_screen_episodes <- function(screen){ if(nrow(screen) < 2){ episodes <- data.frame(episode = character(), screen_sequence = character(), - time_diff = numeric(), start_timestamp = character(), end_timestamp = character()) } else { diff --git a/src/features/screen/rapids/main.py b/src/features/screen/rapids/main.py index ec106039..d6039a65 100644 --- a/src/features/screen/rapids/main.py +++ b/src/features/screen/rapids/main.py @@ -47,6 +47,7 @@ def rapids_features(screen_data, day_segment, provider, filter_data_by_segment, if not screen_data.empty: screen_data = filter_data_by_segment(screen_data, day_segment) + screen_data = kwargs["deduplicate_episodes"](screen_data) if not screen_data.empty: # chunk_episodes screen_data = kwargs["chunk_episodes"](screen_data) diff --git a/src/features/utils/resample_episodes.py b/src/features/utils/resample_episodes.py deleted file mode 100644 index 00ad427c..00000000 --- a/src/features/utils/resample_episodes.py +++ /dev/null @@ -1,44 +0,0 @@ -import pandas as pd - - -def resample_screen_deltas(screen_deltas): - - column_names = ("episode_id", "episode", "screen_sequence", "timestamp", "duration") - records_resampled = [] - - for _, row in screen_deltas.iterrows(): - episode_id, episode, screen_sequence = row["episode_id"], row["episode"], row["screen_sequence"] - start_timestamp, end_timestamp = row["start_timestamp"], row["end_timestamp"] - - for timestamp in range(start_timestamp, end_timestamp, 1000 * 60): - records_resampled.append((episode_id, episode, screen_sequence, timestamp, min(1, (end_timestamp - timestamp) / (1000 * 60)))) - - return records_resampled, column_names - - -def resample_battery_deltas(battery_deltas): - column_names = ("battery_diff", "timestamp") - records_resampled = [] - - for _, row in battery_deltas.iterrows(): - start_timestamp, end_timestamp = row["start_timestamp"], row["end_timestamp"] - battery_diff = row["battery_diff"] / row["time_diff"] - - for timestamp in range(start_timestamp, end_timestamp, 1000 * 60): - records_resampled.append((battery_diff, timestamp)) - - return records_resampled, column_names - - -deltas = pd.read_csv(snakemake.input[0]) -sensor = snakemake.params["sensor"] - -if sensor == "battery": - records_resampled, column_names = resample_battery_deltas(deltas) - -if sensor == "screen": - records_resampled, column_names = resample_screen_deltas(deltas) - - -deltas_resampled = pd.DataFrame(data=records_resampled, columns=column_names) -deltas_resampled.to_csv(snakemake.output[0], index=False) diff --git a/src/features/utils/utils.py b/src/features/utils/utils.py index f8fa9758..8726174a 100644 --- a/src/features/utils/utils.py +++ b/src/features/utils/utils.py @@ -13,45 +13,47 @@ def filter_data_by_segment(data, day_segment): data[["local_segment","timestamps_segment"]] = data["local_segment"].str.split(pat =";",n=1, expand=True) return(data) +# Each minute could fall into two segments. +# Firstly, we generate two rows for each resampled minute via resample_episodes rule: +# the first row's timestamp column is the start_timestamp, while the second row's timestamp column is the end_timestamp. +# Then, we check if the segments of start_timestamp are the same as the segments of end_timestamp: +# if they are the same (only fall into one segment), we will discard the second row; +# otherwise (fall into two segments), we will keep both. +def deduplicate_episodes(sensor_episodes): + # Drop rows where segments of start_timestamp and end_timestamp are the same + sensor_episodes = sensor_episodes.drop_duplicates(subset=["start_timestamp", "end_timestamp", "local_segment"], keep="first") + + # Delete useless columns + for drop_col in ["utc_date_time", "local_date_time", "local_date", "local_time", "local_hour", "local_minute"]: + del sensor_episodes[drop_col] + + return sensor_episodes + def chunk_episodes(sensor_episodes): - import pytz, copy import pandas as pd - from datetime import datetime - # avoid warning messages: SettingWithCopyWarning - sensor_episodes = sensor_episodes.copy() + # Unix timestamp for current segment in milliseconds + sensor_episodes[["segment_start_timestamp", "segment_end_timestamp"]] = sensor_episodes["timestamps_segment"].str.split(",", expand=True) - # convert string to datetime with local timezone - sensor_episodes["start_datetime"] = pd.to_datetime(sensor_episodes["local_segment"].str[-39:-20], format="%Y-%m-%d#%H:%M:%S") - sensor_episodes["start_datetime"] = pd.concat([data["start_datetime"].dt.tz_localize(tz) for tz, data in sensor_episodes.groupby("local_timezone")]) + # Compute chunked timestamp + sensor_episodes["chunked_start_timestamp"] = sensor_episodes[["start_timestamp", "segment_start_timestamp"]].max(axis=1) + sensor_episodes["chunked_end_timestamp"] = sensor_episodes[["end_timestamp", "segment_end_timestamp"]].min(axis=1) - sensor_episodes["end_datetime"] = pd.to_datetime(sensor_episodes["local_segment"].str[-19:], format="%Y-%m-%d#%H:%M:%S") - sensor_episodes["end_datetime"] = pd.concat([data["end_datetime"].dt.tz_localize(tz) for tz, data in sensor_episodes.groupby("local_timezone")]) - - # unix timestamp in milliseconds - sensor_episodes["start_timestamp"] = sensor_episodes["start_datetime"].apply(lambda dt: dt.timestamp() * 1000) - sensor_episodes["end_timestamp"] = sensor_episodes["end_datetime"].apply(lambda dt: dt.timestamp() * 1000) - - # compute chunked timestamp - sensor_episodes["chunked_start_timestamp"] = sensor_episodes[["timestamp", "start_timestamp"]].max(axis=1) - - sensor_episodes["timestamp_plus_duration"] = sensor_episodes["timestamp"] + sensor_episodes["duration"] * 1000 * 60 - sensor_episodes["chunked_end_timestamp"] = sensor_episodes[["timestamp_plus_duration", "end_timestamp"]].min(axis=1) - - # time_diff: intersection of current row and segment + # Compute time_diff: intersection of current row and segment sensor_episodes["time_diff"] = (sensor_episodes["chunked_end_timestamp"] - sensor_episodes["chunked_start_timestamp"]) / (1000 * 60) - # compute chunked datetime + # Compute chunked datetime sensor_episodes["chunked_start_datetime"] = pd.to_datetime(sensor_episodes["chunked_start_timestamp"], unit="ms", utc=True) sensor_episodes["chunked_start_datetime"] = pd.concat([data["chunked_start_datetime"].dt.tz_convert(tz) for tz, data in sensor_episodes.groupby("local_timezone")]) sensor_episodes["chunked_end_datetime"] = pd.to_datetime(sensor_episodes["chunked_end_timestamp"], unit="ms", utc=True) sensor_episodes["chunked_end_datetime"] = pd.concat([data["chunked_end_datetime"].dt.tz_convert(tz) for tz, data in sensor_episodes.groupby("local_timezone")]) - # merge episodes - sensor_episodes_grouped = sensor_episodes.groupby(["episode_id", "episode", "screen_sequence"]) + # Merge episodes + cols_for_groupby = [col for col in sensor_episodes.columns if col not in ["local_timezone", "timestamps_segment", "timestamp", "assigned_segments", "start_datetime", "end_datetime", "start_timestamp", "end_timestamp", "time_diff", "segment_start_timestamp", "segment_end_timestamp", "chunked_start_timestamp", "chunked_end_timestamp", "chunked_start_datetime", "chunked_end_datetime"]] + + sensor_episodes_grouped = sensor_episodes.groupby(by=cols_for_groupby) merged_sensor_episodes = sensor_episodes_grouped[["time_diff"]].sum() - merged_sensor_episodes["local_segment"] = sensor_episodes_grouped["local_segment"].first() merged_sensor_episodes["start_timestamp"] = sensor_episodes_grouped["chunked_start_timestamp"].first() merged_sensor_episodes["end_timestamp"] = sensor_episodes_grouped["chunked_end_timestamp"].last() @@ -80,7 +82,7 @@ def fetch_provider_features(provider, provider_key, config_key, sensor_data_file for day_segment in day_segments_labels["label"]: print("{} Processing {} {} {}".format(rapids_log_tag, config_key, provider_key, day_segment)) - features = feature_function(sensor_data, day_segment, provider, filter_data_by_segment=filter_data_by_segment, chunk_episodes=chunk_episodes) + features = feature_function(sensor_data, day_segment, provider, filter_data_by_segment=filter_data_by_segment, deduplicate_episodes=deduplicate_episodes, chunk_episodes=chunk_episodes) sensor_features = sensor_features.merge(features, how="outer") else: for feature in provider["FEATURES"]: