Fix the bug of screen duration features for different segments

pull/103/head
Meng Li 2020-09-18 20:25:29 -04:00
parent 09d373a31b
commit f806cb44ac
10 changed files with 271 additions and 129 deletions

View File

@ -72,16 +72,20 @@ if config["BATTERY"]["COMPUTE"]:
files_to_compute.extend(expand("data/processed/{pid}/battery_deltas.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/processed/{pid}/battery_{day_segment}.csv", pid = config["PIDS"], day_segment = config["BATTERY"]["DAY_SEGMENTS"]))
if config["SCREEN"]["COMPUTE"]:
if config["SCREEN"]["DB_TABLE"] in config["PHONE_VALID_SENSED_BINS"]["DB_TABLES"]:
files_to_compute.extend(expand("data/interim/{pid}/phone_sensed_bins.csv", pid=config["PIDS"]))
else:
raise ValueError("Error: Add your screen table (and as many sensor tables as you have) to [PHONE_VALID_SENSED_BINS][DB_TABLES] in config.yaml. This is necessary to compute phone_sensed_bins (bins of time when the smartphone was sensing data)")
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["SCREEN"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["SCREEN"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime_unified.csv", pid=config["PIDS"], sensor=config["SCREEN"]["DB_TABLE"]))
files_to_compute.extend(expand("data/processed/{pid}/screen_deltas.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/processed/{pid}/screen_{day_segment}.csv", pid = config["PIDS"], day_segment = config["SCREEN"]["DAY_SEGMENTS"]))
for provider in config["SCREEN"]["PROVIDERS"].keys():
if config["SCREEN"]["PROVIDERS"][provider]["COMPUTE"]:
if config["SCREEN"]["DB_TABLE"] in config["PHONE_VALID_SENSED_BINS"]["DB_TABLES"]:
files_to_compute.extend(expand("data/interim/{pid}/phone_sensed_bins.csv", pid=config["PIDS"]))
else:
raise ValueError("Error: Add your screen table (and as many sensor tables as you have) to [PHONE_VALID_SENSED_BINS][DB_TABLES] in config.yaml. This is necessary to compute phone_sensed_bins (bins of time when the smartphone was sensing data)")
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["SCREEN"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["SCREEN"]["DB_TABLE"]))
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime_unified.csv", pid=config["PIDS"], sensor=config["SCREEN"]["DB_TABLE"]))
files_to_compute.extend(expand("data/interim/{pid}/screen_episodes.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/screen_episodes_resampled.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/screen_episodes_resampled_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/{sensor_key}_features/{sensor_key}_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["SCREEN"]["PROVIDERS"][provider]["SRC_LANGUAGE"], provider_key=provider, sensor_key="SCREEN".lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/{sensor_key}.csv", pid=config["PIDS"], sensor_key="SCREEN".lower()))
for provider in config["LIGHT"]["PROVIDERS"].keys():
if config["LIGHT"]["PROVIDERS"][provider]["COMPUTE"]:

View File

@ -127,14 +127,17 @@ BATTERY:
FEATURES: ["countdischarge", "sumdurationdischarge", "countcharge", "sumdurationcharge", "avgconsumptionrate", "maxconsumptionrate"]
SCREEN:
COMPUTE: False
DB_TABLE: screen
DAY_SEGMENTS: *day_segments
REFERENCE_HOUR_FIRST_USE: 0
IGNORE_EPISODES_SHORTER_THAN: 0 # in minutes, set to 0 to disable
IGNORE_EPISODES_LONGER_THAN: 0 # in minutes, set to 0 to disable
FEATURES_DELTAS: ["countepisode", "episodepersensedminutes", "sumduration", "maxduration", "minduration", "avgduration", "stdduration", "firstuseafter"]
EPISODE_TYPES: ["unlock"]
PROVIDERS:
RAPIDS:
COMPUTE: False
REFERENCE_HOUR_FIRST_USE: 0
IGNORE_EPISODES_SHORTER_THAN: 0 # in minutes, set to 0 to disable
IGNORE_EPISODES_LONGER_THAN: 0 # in minutes, set to 0 to disable
FEATURES: ["countepisode", "sumduration", "maxduration", "minduration", "avgduration", "stdduration", "firstuseafter"] # "episodepersensedminutes" needs to be added later
EPISODE_TYPES: ["unlock"]
SRC_FOLDER: "rapids" # inside src/features/screen
SRC_LANGUAGE: "python"
LIGHT:
DB_TABLE: light

View File

@ -62,13 +62,38 @@ rule battery_deltas:
script:
"../src/features/battery_deltas.R"
rule screen_deltas:
rule screen_episodes:
input:
screen = expand("data/raw/{{pid}}/{sensor}_with_datetime_unified.csv", sensor=config["SCREEN"]["DB_TABLE"])
output:
"data/processed/{pid}/screen_deltas.csv"
"data/interim/{pid}/screen_episodes.csv"
script:
"../src/features/screen_deltas.R"
"../src/features/screen/episodes/screen_episodes.R"
rule resample_episodes:
input:
"data/interim/{pid}/{sensor}_episodes.csv"
params:
sensor = "{sensor}"
output:
"data/interim/{pid}/{sensor}_episodes_resampled.csv"
script:
"../src/features/utils/resample_episodes.py"
rule resample_screen_episodes_with_datetime:
input:
sensor_input = "data/interim/{pid}/screen_episodes_resampled.csv",
day_segments = "data/interim/day_segments/{pid}_day_segments.csv"
params:
timezones = None,
fixed_timezone = config["READABLE_DATETIME"]["FIXED_TIMEZONE"],
day_segments_type = config["DAY_SEGMENTS"]["TYPE"],
include_past_periodic_segments = config["DAY_SEGMENTS"]["INCLUDE_PAST_PERIODIC_SEGMENTS"]
output:
"data/interim/{pid}/screen_episodes_resampled_with_datetime.csv"
script:
"../src/data/readable_datetime.R"
rule google_activity_recognition_deltas:
input:
@ -156,22 +181,29 @@ rule battery_features:
script:
"../src/features/battery_features.py"
rule screen_features:
rule screen_r_features:
input:
screen_deltas = "data/processed/{pid}/screen_deltas.csv",
phone_sensed_bins = "data/interim/{pid}/phone_sensed_bins.csv"
screen_episodes = "data/interim/{pid}/screen_episodes_resampled_with_datetime.csv",
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
day_segment = "{day_segment}",
reference_hour_first_use = config["SCREEN"]["REFERENCE_HOUR_FIRST_USE"],
features_deltas = config["SCREEN"]["FEATURES_DELTAS"],
episode_types = config["SCREEN"]["EPISODE_TYPES"],
ignore_episodes_shorter_than = config["SCREEN"]["IGNORE_EPISODES_SHORTER_THAN"],
ignore_episodes_longer_than = config["SCREEN"]["IGNORE_EPISODES_LONGER_THAN"],
bin_size = config["PHONE_VALID_SENSED_BINS"]["BIN_SIZE"]
provider = lambda wildcards: config["SCREEN"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
output:
"data/processed/{pid}/screen_{day_segment}.csv"
"data/interim/{pid}/screen_features/screen_r_{provider_key}.csv"
script:
"../src/features/screen_features.py"
"../src/features/screen/screen_entry.R"
rule screen_python_features:
input:
screen_episodes = "data/interim/{pid}/screen_episodes_resampled_with_datetime.csv",
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
params:
provider = lambda wildcards: config["SCREEN"]["PROVIDERS"][wildcards.provider_key],
provider_key = "{provider_key}"
output:
"data/interim/{pid}/screen_features/screen_python_{provider_key}.csv"
script:
"../src/features/screen/screen_entry.py"
rule light_r_features:
input:

View File

@ -45,12 +45,8 @@ get_screen_episodes <- function(screen){
summarise(episode = "unlock",
screen_sequence = toString(screen_status),
time_diff = (last(timestamp) - first(timestamp)) / (1000 * 60),
local_start_date_time = first(local_date_time),
local_end_date_time = last(local_date_time),
local_start_date = first(local_date),
local_end_date = last(local_date),
local_start_day_segment = first(local_day_segment),
local_end_day_segment = last(local_day_segment)) %>%
start_timestamp = first(timestamp),
end_timestamp = last(timestamp)) %>%
filter(str_detect(screen_sequence,
paste0("^(",
paste(c(3), collapse = "|"), # Filter sequences that start with 3 (UNLOCK) AND
@ -61,13 +57,10 @@ get_screen_episodes <- function(screen){
if(nrow(screen) < 2){
episodes <- data.frame(episode = character(),
screen_sequence = character(),
time_diff = numeric(),
local_start_date_time = character(),
local_end_date_time = character(),
local_start_date = character(),
local_end_date = character(),
local_start_day_segment = character(),
local_end_day_segment = character())
start_timestamp = character(),
end_timestamp = character())
} else {
episodes <- get_screen_episodes(screen)
}

View File

@ -0,0 +1,68 @@
import pandas as pd
import itertools
def getEpisodeDurationFeatures(screen_data, day_segment, episode, features, reference_hour_first_use):
screen_data_episode = screen_data[screen_data["episode"] == episode]
duration_helper = pd.DataFrame()
if "countepisode" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["time_diff"]].count().rename(columns = {"time_diff": "screen_rapids_countepisode" + episode})], axis = 1)
if "sumduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["time_diff"]].sum().rename(columns = {"time_diff": "screen_rapids_sumduration" + episode})], axis = 1)
if "maxduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["time_diff"]].max().rename(columns = {"time_diff": "screen_rapids_maxduration" + episode})], axis = 1)
if "minduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["time_diff"]].min().rename(columns = {"time_diff": "screen_rapids_minduration" + episode})], axis = 1)
if "avgduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["time_diff"]].mean().rename(columns = {"time_diff":"screen_rapids_avgduration" + episode})], axis = 1)
if "stdduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode.groupby(["local_segment"])[["time_diff"]].std().rename(columns = {"time_diff":"screen_rapids_stdduration" + episode})], axis = 1)
if "firstuseafter" + "{0:0=2d}".format(reference_hour_first_use) in features:
screen_data_episode_after_hour = screen_data_episode.copy()
screen_data_episode_after_hour["hour"] = pd.to_datetime(screen_data_episode["local_start_date_time"]).dt.hour
screen_data_episode_after_hour = screen_data_episode_after_hour[screen_data_episode_after_hour["hour"] >= reference_hour_first_use]
duration_helper = pd.concat([duration_helper, pd.DataFrame(screen_data_episode_after_hour.groupby(["local_segment"])[["local_start_date_time"]].min().local_start_date_time.apply(lambda x: (x.to_pydatetime().hour - reference_hour_first_use) * 60 + x.to_pydatetime().minute + (x.to_pydatetime().second / 60))).rename(columns = {"local_start_date_time":"screen_rapids_firstuseafter" + "{0:0=2d}".format(reference_hour_first_use) + episode})], axis = 1)
return duration_helper
def rapids_features(screen_data, day_segment, provider, filter_data_by_segment, *args, **kwargs):
reference_hour_first_use = provider["REFERENCE_HOUR_FIRST_USE"]
requested_features_episodes = provider["FEATURES"]
requested_episode_types = provider["EPISODE_TYPES"]
ignore_episodes_shorter_than = provider["IGNORE_EPISODES_SHORTER_THAN"]
ignore_episodes_longer_than = provider["IGNORE_EPISODES_LONGER_THAN"]
# name of the features this function can compute
base_features_episodes = ["countepisode", "episodepersensedminutes", "sumduration", "maxduration", "minduration", "avgduration", "stdduration", "firstuseafter"]
base_episode_type = ["unlock"]
# the subset of requested features this function can compute
features_episodes_to_compute = list(set(requested_features_episodes) & set(base_features_episodes))
episode_type_to_compute = list(set(requested_episode_types) & set(base_episode_type))
features_episodes_to_compute = ["firstuseafter" + "{0:0=2d}".format(reference_hour_first_use) if feature_name == "firstuseafter" else feature_name for feature_name in features_episodes_to_compute]
features_to_compute = ["".join(feature) for feature in itertools.product(features_episodes_to_compute, episode_type_to_compute)]
screen_features = pd.DataFrame(columns=["local_segment"]+["screen_rapids_" + x for x in features_to_compute])
if not screen_data.empty:
screen_data = filter_data_by_segment(screen_data, day_segment)
if not screen_data.empty:
# chunk_episodes
screen_data = kwargs["chunk_episodes"](screen_data)
if ignore_episodes_shorter_than > 0:
screen_data = screen_data.query('@ignore_episodes_shorter_than <= time_diff')
if ignore_episodes_longer_than > 0:
screen_data = screen_data.query('time_diff <= @ignore_episodes_longer_than')
if not screen_data.empty:
screen_features = pd.DataFrame()
for episode in episode_type_to_compute:
screen_features = pd.concat([screen_features, getEpisodeDurationFeatures(screen_data, day_segment, episode, features_episodes_to_compute, reference_hour_first_use)], axis=1)
if not screen_features.empty:
screen_features = screen_features.reset_index()
return screen_features

View File

@ -1,83 +0,0 @@
import pandas as pd
import itertools
from features_utils import splitOvernightEpisodes, splitMultiSegmentEpisodes
EPOCH2HOUR = {"night": ("0_", "1_", "2_", "3_", "4_", "5_"),
"morning": ("6_", "7_", "8_", "9_", "10_", "11_"),
"afternoon": ("12_", "13_", "14_", "15_", "16_", "17_"),
"evening": ("18_", "19_", "20_", "21_", "22_", "23_")}
def getEpisodeDurationFeatures(screen_data, day_segment, episode, features, phone_sensed_bins, bin_size, reference_hour_first_use):
screen_data_episode = screen_data[screen_data["episode"] == episode]
duration_helper = pd.DataFrame()
if "countepisode" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode[["time_diff"]].groupby(["local_start_date"]).count().rename(columns = {"time_diff": "screen_" + day_segment + "_countepisode" + episode})], axis = 1)
if "episodepersensedminutes" in features:
for date, row in screen_data_episode[["time_diff"]].groupby(["local_start_date"]).count().iterrows():
try:
if day_segment == "daily":
sensed_minutes = phone_sensed_bins.loc[date, :].sum() * bin_size
else:
sensed_minutes = phone_sensed_bins.loc[date, phone_sensed_bins.columns.str.startswith(EPOCH2HOUR[day_segment])].sum() * bin_size
except:
raise ValueError("You need to include the screen sensor in the list for phone_sensed_bins.")
episode_per_sensedminutes = row["time_diff"] / (1 if sensed_minutes == 0 else sensed_minutes)
duration_helper.loc[date, "screen_" + day_segment + "_episodepersensedminutes" + episode] = episode_per_sensedminutes
if "sumduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode[["time_diff"]].groupby(["local_start_date"]).sum().rename(columns = {"time_diff": "screen_" + day_segment + "_sumduration" + episode})], axis = 1)
if "maxduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode[["time_diff"]].groupby(["local_start_date"]).max().rename(columns = {"time_diff": "screen_" + day_segment + "_maxduration" + episode})], axis = 1)
if "minduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode[["time_diff"]].groupby(["local_start_date"]).min().rename(columns = {"time_diff": "screen_" + day_segment + "_minduration" + episode})], axis = 1)
if "avgduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode[["time_diff"]].groupby(["local_start_date"]).mean().rename(columns = {"time_diff":"screen_" + day_segment + "_avgduration" + episode})], axis = 1)
if "stdduration" in features:
duration_helper = pd.concat([duration_helper, screen_data_episode[["time_diff"]].groupby(["local_start_date"]).std().rename(columns = {"time_diff":"screen_" + day_segment + "_stdduration" + episode})], axis = 1)
if "firstuseafter" + "{0:0=2d}".format(reference_hour_first_use) in features:
duration_helper = pd.concat([duration_helper, pd.DataFrame(screen_data_episode.groupby(["local_start_date"]).first()[["local_start_date_time"]].local_start_date_time.apply(lambda x: (x.to_pydatetime().hour - reference_hour_first_use) * 60 + x.to_pydatetime().minute + (x.to_pydatetime().second / 60))).rename(columns = {"local_start_date_time":"screen_" + day_segment + "_firstuseafter" + "{0:0=2d}".format(reference_hour_first_use) + episode})], axis = 1)
return duration_helper
def base_screen_features(screen_data, phone_sensed_bins, day_segment, params):
reference_hour_first_use = params["reference_hour_first_use"]
bin_size = params["bin_size"]
requested_features_deltas = params["features_deltas"]
requested_episode_types = params["episode_types"]
ignore_episodes_shorter_than = params["ignore_episodes_shorter_than"]
ignore_episodes_longer_than = params["ignore_episodes_longer_than"]
# name of the features this function can compute
base_features_deltas = ["countepisode", "episodepersensedminutes", "sumduration", "maxduration", "minduration", "avgduration", "stdduration", "firstuseafter"]
base_episode_type = ["unlock"]
# the subset of requested features this function can compute
features_deltas_to_compute = list(set(requested_features_deltas) & set(base_features_deltas))
episode_type_to_compute = list(set(requested_episode_types) & set(base_episode_type))
features_deltas_to_compute = ["firstuseafter" + "{0:0=2d}".format(reference_hour_first_use) if feature_name == "firstuseafter" else feature_name for feature_name in features_deltas_to_compute]
features_to_compute = ["".join(feature) for feature in itertools.product(features_deltas_to_compute, episode_type_to_compute)]
screen_features = pd.DataFrame(columns=["local_date"]+["screen_" + day_segment + "_" + x for x in features_to_compute])
if not screen_data.empty:
# preprocess day_segment and episodes
screen_data = splitOvernightEpisodes(screen_data, [], ["episode"])
if (not screen_data.empty) and (day_segment != "daily"):
screen_data = splitMultiSegmentEpisodes(screen_data, day_segment, [])
screen_data.set_index(["local_start_date"],inplace=True)
if ignore_episodes_shorter_than > 0:
screen_data = screen_data.query('@ignore_episodes_shorter_than <= time_diff')
if ignore_episodes_longer_than > 0:
screen_data = screen_data.query('time_diff <= @ignore_episodes_longer_than')
if not screen_data.empty:
screen_features = pd.DataFrame()
for episode in episode_type_to_compute:
screen_features = pd.concat([screen_features, getEpisodeDurationFeatures(screen_data, day_segment, episode, features_deltas_to_compute, phone_sensed_bins, bin_size, reference_hour_first_use)], axis=1)
if not screen_features.empty:
screen_features = screen_features.rename_axis("local_date").reset_index()
return screen_features

View File

@ -0,0 +1,13 @@
source("renv/activate.R")
source("src/features/utils/utils.R")
library("dplyr")
library("tidyr")
sensor_data_file <- snakemake@input[["screen_episodes"]]
day_segments_file <- snakemake@input[["day_segments_labels"]]
provider <- snakemake@params["provider"][["provider"]]
provider_key <- snakemake@params["provider_key"]
sensor_features <- fetch_provider_features(provider, provider_key, "screen", sensor_data_file, day_segments_file)
write.csv(sensor_features, snakemake@output[[1]], row.names = FALSE)

View File

@ -0,0 +1,18 @@
import pandas as pd
from importlib import import_module, util
from pathlib import Path
# import fetch_provider_features from src/features/utils/utils.py
spec = util.spec_from_file_location("util", str(Path(snakemake.scriptdir).parent / "utils" / "utils.py"))
mod = util.module_from_spec(spec)
spec.loader.exec_module(mod)
fetch_provider_features = getattr(mod, "fetch_provider_features")
screen_episodes_file = snakemake.input["screen_episodes"]
day_segments_file = snakemake.input["day_segments_labels"]
provider = snakemake.params["provider"]
provider_key = snakemake.params["provider_key"]
sensor_features = fetch_provider_features(provider, provider_key, "screen", screen_episodes_file, day_segments_file)
sensor_features.to_csv(snakemake.output[0], index=False)

View File

@ -0,0 +1,44 @@
import pandas as pd
def resample_screen_deltas(screen_deltas):
column_names = ("episode_id", "episode", "screen_sequence", "timestamp", "duration")
records_resampled = []
for _, row in screen_deltas.iterrows():
episode_id, episode, screen_sequence = row["episode_id"], row["episode"], row["screen_sequence"]
start_timestamp, end_timestamp = row["start_timestamp"], row["end_timestamp"]
for timestamp in range(start_timestamp, end_timestamp, 1000 * 60):
records_resampled.append((episode_id, episode, screen_sequence, timestamp, min(1, (end_timestamp - timestamp) / (1000 * 60))))
return records_resampled, column_names
def resample_battery_deltas(battery_deltas):
column_names = ("battery_diff", "timestamp")
records_resampled = []
for _, row in battery_deltas.iterrows():
start_timestamp, end_timestamp = row["start_timestamp"], row["end_timestamp"]
battery_diff = row["battery_diff"] / row["time_diff"]
for timestamp in range(start_timestamp, end_timestamp, 1000 * 60):
records_resampled.append((battery_diff, timestamp))
return records_resampled, column_names
deltas = pd.read_csv(snakemake.input[0])
sensor = snakemake.params["sensor"]
if sensor == "battery":
records_resampled, column_names = resample_battery_deltas(deltas)
if sensor == "screen":
records_resampled, column_names = resample_screen_deltas(deltas)
deltas_resampled = pd.DataFrame(data=records_resampled, columns=column_names)
deltas_resampled.to_csv(snakemake.output[0], index=False)

View File

@ -7,6 +7,56 @@ def filter_data_by_segment(data, day_segment):
data["local_segment"] = data["assigned_segments"].str.extract(segment_regex, expand=True)
return(data.dropna(subset = ["local_segment"]))
def chunk_episodes(sensor_episodes):
import pytz, copy
import pandas as pd
from datetime import datetime
# avoid warning messages: SettingWithCopyWarning
sensor_episodes = sensor_episodes.copy()
# convert string to datetime with local timezone
sensor_episodes["start_datetime"] = pd.to_datetime(sensor_episodes["local_segment"].str[-39:-20], format="%Y-%m-%d#%H:%M:%S")
sensor_episodes["start_datetime"] = pd.concat([data["start_datetime"].dt.tz_localize(tz) for tz, data in sensor_episodes.groupby("local_timezone")])
sensor_episodes["end_datetime"] = pd.to_datetime(sensor_episodes["local_segment"].str[-19:], format="%Y-%m-%d#%H:%M:%S")
sensor_episodes["end_datetime"] = pd.concat([data["end_datetime"].dt.tz_localize(tz) for tz, data in sensor_episodes.groupby("local_timezone")])
# unix timestamp in milliseconds
sensor_episodes["start_timestamp"] = sensor_episodes["start_datetime"].apply(lambda dt: dt.timestamp() * 1000)
sensor_episodes["end_timestamp"] = sensor_episodes["end_datetime"].apply(lambda dt: dt.timestamp() * 1000)
# compute chunked timestamp
sensor_episodes["chunked_start_timestamp"] = sensor_episodes[["timestamp", "start_timestamp"]].max(axis=1)
sensor_episodes["timestamp_plus_duration"] = sensor_episodes["timestamp"] + sensor_episodes["duration"] * 1000 * 60
sensor_episodes["chunked_end_timestamp"] = sensor_episodes[["timestamp_plus_duration", "end_timestamp"]].min(axis=1)
# time_diff: intersection of current row and segment
sensor_episodes["time_diff"] = (sensor_episodes["chunked_end_timestamp"] - sensor_episodes["chunked_start_timestamp"]) / (1000 * 60)
# compute chunked datetime
sensor_episodes["chunked_start_datetime"] = pd.to_datetime(sensor_episodes["chunked_start_timestamp"], unit="ms", utc=True)
sensor_episodes["chunked_start_datetime"] = pd.concat([data["chunked_start_datetime"].dt.tz_convert(tz) for tz, data in sensor_episodes.groupby("local_timezone")])
sensor_episodes["chunked_end_datetime"] = pd.to_datetime(sensor_episodes["chunked_end_timestamp"], unit="ms", utc=True)
sensor_episodes["chunked_end_datetime"] = pd.concat([data["chunked_end_datetime"].dt.tz_convert(tz) for tz, data in sensor_episodes.groupby("local_timezone")])
# merge episodes
sensor_episodes_grouped = sensor_episodes.groupby(["episode_id", "episode", "screen_sequence"])
merged_sensor_episodes = sensor_episodes_grouped[["time_diff"]].sum()
merged_sensor_episodes["local_segment"] = sensor_episodes_grouped["local_segment"].first()
merged_sensor_episodes["start_timestamp"] = sensor_episodes_grouped["chunked_start_timestamp"].first()
merged_sensor_episodes["end_timestamp"] = sensor_episodes_grouped["chunked_end_timestamp"].last()
merged_sensor_episodes["local_start_date_time"] = sensor_episodes_grouped["chunked_start_datetime"].first()
merged_sensor_episodes["local_end_date_time"] = sensor_episodes_grouped["chunked_end_datetime"].last()
merged_sensor_episodes.reset_index(inplace=True)
return merged_sensor_episodes
def fetch_provider_features(provider, provider_key, config_key, sensor_data_file, day_segments_file):
import pandas as pd
from importlib import import_module, util
@ -24,7 +74,7 @@ def fetch_provider_features(provider, provider_key, config_key, sensor_data_file
for day_segment in day_segments_labels["label"]:
print("{} Processing {} {} {}".format(rapids_log_tag, config_key, provider_key, day_segment))
features = feature_function(sensor_data, day_segment, provider, filter_data_by_segment=filter_data_by_segment)
features = feature_function(sensor_data, day_segment, provider, filter_data_by_segment=filter_data_by_segment, chunk_episodes=chunk_episodes)
sensor_features = sensor_features.merge(features, how="outer")
else:
for feature in provider["FEATURES"]:
@ -36,4 +86,4 @@ def fetch_provider_features(provider, provider_key, config_key, sensor_data_file
for i in range(segment_colums.shape[1]):
sensor_features.insert(1 + i, segment_colums.columns[i], segment_colums[segment_colums.columns[i]])
return sensor_features
return sensor_features