Move deduplicate_episodes() function into chunk_episodes() function; rename "time_diff" with duration

pull/103/head
Meng Li 2020-09-29 18:05:25 -04:00
parent 39f6e3841c
commit bccc9a0697
3 changed files with 30 additions and 27 deletions

View File

@ -3,6 +3,8 @@ from datetime import datetime, timedelta, time
def rapids_features(battery_data, day_segment, provider, filter_data_by_segment, *args, **kwargs):
chunk_episodes = kwargs["chunk_episodes"]
# name of the features this function can compute
base_features_names = ["countdischarge", "sumdurationdischarge", "countcharge", "sumdurationcharge", "avgconsumptionrate", "maxconsumptionrate"]
# the subset of requested features this function can compute
@ -12,19 +14,18 @@ def rapids_features(battery_data, day_segment, provider, filter_data_by_segment,
battery_features = pd.DataFrame(columns=["local_segment"] + ["battery_rapids_" + x for x in features_to_compute])
if not battery_data.empty:
battery_data = filter_data_by_segment(battery_data, day_segment)
battery_data = kwargs["deduplicate_episodes"](battery_data)
if not battery_data.empty:
# chunk_episodes
battery_data = kwargs["chunk_episodes"](battery_data)
battery_data = chunk_episodes(battery_data)
if not battery_data.empty:
battery_data["episode_id"] = ((battery_data.battery_status != battery_data.battery_status.shift()) | (battery_data.start_timestamp - battery_data.end_timestamp.shift() > 1)).cumsum()
grouped = battery_data.groupby(by=["local_segment", "episode_id", "battery_status"])
battery_episodes= grouped[["time_diff"]].sum()
battery_episodes= grouped[["duration"]].sum()
battery_episodes["battery_diff"] = grouped["battery_level"].first() - grouped["battery_level"].last()
battery_episodes["battery_consumption_rate"] = battery_episodes["battery_diff"] / battery_episodes["time_diff"]
battery_episodes["battery_consumption_rate"] = battery_episodes["battery_diff"] / battery_episodes["duration"]
battery_episodes.reset_index(inplace=True)
# for discharge episodes
@ -33,7 +34,7 @@ def rapids_features(battery_data, day_segment, provider, filter_data_by_segment,
if "countdischarge" in features_to_compute:
battery_discharge_features["battery_rapids_countdischarge"] = battery_discharge_episodes.groupby(["local_segment"])["episode_id"].count()
if "sumdurationdischarge" in features_to_compute:
battery_discharge_features["battery_rapids_sumdurationdischarge"] = battery_discharge_episodes.groupby(["local_segment"])["time_diff"].sum()
battery_discharge_features["battery_rapids_sumdurationdischarge"] = battery_discharge_episodes.groupby(["local_segment"])["duration"].sum()
if "avgconsumptionrate" in features_to_compute:
battery_discharge_features["battery_rapids_avgconsumptionrate"] = battery_discharge_episodes.groupby(["local_segment"])["battery_consumption_rate"].mean()
if "maxconsumptionrate" in features_to_compute:
@ -45,7 +46,7 @@ def rapids_features(battery_data, day_segment, provider, filter_data_by_segment,
if "countcharge" in features_to_compute:
battery_charge_features["battery_rapids_countcharge"] = battery_charge_episodes.groupby(["local_segment"])["episode_id"].count()
if "sumdurationcharge" in features_to_compute:
battery_charge_features["battery_rapids_sumdurationcharge"] = battery_charge_episodes.groupby(["local_segment"])["time_diff"].sum()
battery_charge_features["battery_rapids_sumdurationcharge"] = battery_charge_episodes.groupby(["local_segment"])["duration"].sum()
# combine discharge features and charge features; fill the missing values with ZERO
battery_features = pd.concat([battery_discharge_features, battery_charge_features], axis=1, sort=True).fillna(0)

View File

@ -5,17 +5,17 @@ def getEpisodeDurationFeatures(screen_data, day_segment, episode, features, refe
screen_data_episode = screen_data[screen_data["episode"] == episode]
duration_helper = pd.DataFrame()
if "countepisode" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["time_diff"]].count().rename(columns = {"time_diff": "screen_rapids_countepisode" + episode})], axis = 1)
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["duration"]].count().rename(columns = {"duration": "screen_rapids_countepisode" + episode})], axis = 1)
if "sumduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["time_diff"]].sum().rename(columns = {"time_diff": "screen_rapids_sumduration" + episode})], axis = 1)
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["duration"]].sum().rename(columns = {"duration": "screen_rapids_sumduration" + episode})], axis = 1)
if "maxduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["time_diff"]].max().rename(columns = {"time_diff": "screen_rapids_maxduration" + episode})], axis = 1)
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["duration"]].max().rename(columns = {"duration": "screen_rapids_maxduration" + episode})], axis = 1)
if "minduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["time_diff"]].min().rename(columns = {"time_diff": "screen_rapids_minduration" + episode})], axis = 1)
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["duration"]].min().rename(columns = {"duration": "screen_rapids_minduration" + episode})], axis = 1)
if "avgduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["time_diff"]].mean().rename(columns = {"time_diff":"screen_rapids_avgduration" + episode})], axis = 1)
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["duration"]].mean().rename(columns = {"duration":"screen_rapids_avgduration" + episode})], axis = 1)
if "stdduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["time_diff"]].std().rename(columns = {"time_diff":"screen_rapids_stdduration" + episode})], axis = 1)
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["duration"]].std().rename(columns = {"duration":"screen_rapids_stdduration" + episode})], axis = 1)
if "firstuseafter" + "{0:0=2d}".format(reference_hour_first_use) in features:
screen_data_episode_after_hour = screen_data_episode.copy()
screen_data_episode_after_hour["hour"] = pd.to_datetime(screen_data_episode["local_start_date_time"]).dt.hour
@ -32,6 +32,7 @@ def rapids_features(screen_data, day_segment, provider, filter_data_by_segment,
requested_episode_types = provider["EPISODE_TYPES"]
ignore_episodes_shorter_than = provider["IGNORE_EPISODES_SHORTER_THAN"]
ignore_episodes_longer_than = provider["IGNORE_EPISODES_LONGER_THAN"]
chunk_episodes = kwargs["chunk_episodes"]
# name of the features this function can compute
base_features_episodes = ["countepisode", "episodepersensedminutes", "sumduration", "maxduration", "minduration", "avgduration", "stdduration", "firstuseafter"]
@ -47,15 +48,14 @@ def rapids_features(screen_data, day_segment, provider, filter_data_by_segment,
if not screen_data.empty:
screen_data = filter_data_by_segment(screen_data, day_segment)
screen_data = kwargs["deduplicate_episodes"](screen_data)
if not screen_data.empty:
# chunk_episodes
screen_data = kwargs["chunk_episodes"](screen_data)
screen_data = chunk_episodes(screen_data)
if ignore_episodes_shorter_than > 0:
screen_data = screen_data.query('@ignore_episodes_shorter_than <= time_diff')
screen_data = screen_data.query('@ignore_episodes_shorter_than <= duration')
if ignore_episodes_longer_than > 0:
screen_data = screen_data.query('time_diff <= @ignore_episodes_longer_than')
screen_data = screen_data.query('duration <= @ignore_episodes_longer_than')
if not screen_data.empty:
screen_features = pd.DataFrame()

View File

@ -19,18 +19,20 @@ def filter_data_by_segment(data, day_segment):
# Then, we check if the segments of start_timestamp are the same as the segments of end_timestamp:
# if they are the same (only fall into one segment), we will discard the second row;
# otherwise (fall into two segments), we will keep both.
def deduplicate_episodes(sensor_episodes):
def chunk_episodes(sensor_episodes):
import copy
import pandas as pd
# Deduplicate episodes
# Drop rows where segments of start_timestamp and end_timestamp are the same
sensor_episodes = sensor_episodes.drop_duplicates(subset=["start_timestamp", "end_timestamp", "local_segment"], keep="first")
# Delete useless columns
for drop_col in ["utc_date_time", "local_date_time", "local_date", "local_time", "local_hour", "local_minute"]:
del sensor_episodes[drop_col]
return sensor_episodes
def chunk_episodes(sensor_episodes):
import pandas as pd
# Avoid SettingWithCopyWarning
sensor_episodes = sensor_episodes.copy()
# Unix timestamp for current segment in milliseconds
sensor_episodes[["segment_start_timestamp", "segment_end_timestamp"]] = sensor_episodes["timestamps_segment"].str.split(",", expand=True)
@ -39,8 +41,8 @@ def chunk_episodes(sensor_episodes):
sensor_episodes["chunked_start_timestamp"] = sensor_episodes[["start_timestamp", "segment_start_timestamp"]].max(axis=1)
sensor_episodes["chunked_end_timestamp"] = sensor_episodes[["end_timestamp", "segment_end_timestamp"]].min(axis=1)
# Compute time_diff: intersection of current row and segment
sensor_episodes["time_diff"] = (sensor_episodes["chunked_end_timestamp"] - sensor_episodes["chunked_start_timestamp"]) / (1000 * 60)
# Compute duration: intersection of current row and segment
sensor_episodes["duration"] = (sensor_episodes["chunked_end_timestamp"] - sensor_episodes["chunked_start_timestamp"]) / (1000 * 60)
# Compute chunked datetime
sensor_episodes["chunked_start_datetime"] = pd.to_datetime(sensor_episodes["chunked_start_timestamp"], unit="ms", utc=True)
@ -50,10 +52,10 @@ def chunk_episodes(sensor_episodes):
sensor_episodes["chunked_end_datetime"] = pd.concat([data["chunked_end_datetime"].dt.tz_convert(tz) for tz, data in sensor_episodes.groupby("local_timezone")])
# Merge episodes
cols_for_groupby = [col for col in sensor_episodes.columns if col not in ["local_timezone", "timestamps_segment", "timestamp", "assigned_segments", "start_datetime", "end_datetime", "start_timestamp", "end_timestamp", "time_diff", "segment_start_timestamp", "segment_end_timestamp", "chunked_start_timestamp", "chunked_end_timestamp", "chunked_start_datetime", "chunked_end_datetime"]]
cols_for_groupby = [col for col in sensor_episodes.columns if col not in ["local_timezone", "timestamps_segment", "timestamp", "assigned_segments", "start_datetime", "end_datetime", "start_timestamp", "end_timestamp", "duration", "segment_start_timestamp", "segment_end_timestamp", "chunked_start_timestamp", "chunked_end_timestamp", "chunked_start_datetime", "chunked_end_datetime"]]
sensor_episodes_grouped = sensor_episodes.groupby(by=cols_for_groupby)
merged_sensor_episodes = sensor_episodes_grouped[["time_diff"]].sum()
merged_sensor_episodes = sensor_episodes_grouped[["duration"]].sum()
merged_sensor_episodes["start_timestamp"] = sensor_episodes_grouped["chunked_start_timestamp"].first()
merged_sensor_episodes["end_timestamp"] = sensor_episodes_grouped["chunked_end_timestamp"].last()
@ -82,7 +84,7 @@ def fetch_provider_features(provider, provider_key, config_key, sensor_data_file
for day_segment in day_segments_labels["label"]:
print("{} Processing {} {} {}".format(rapids_log_tag, config_key, provider_key, day_segment))
features = feature_function(sensor_data, day_segment, provider, filter_data_by_segment=filter_data_by_segment, deduplicate_episodes=deduplicate_episodes, chunk_episodes=chunk_episodes)
features = feature_function(sensor_data, day_segment, provider, filter_data_by_segment=filter_data_by_segment, chunk_episodes=chunk_episodes)
sensor_features = sensor_features.merge(features, how="outer")
else:
for feature in provider["FEATURES"]: