Update screen&battery episodes features with different segment format
parent
0dafdd1340
commit
f1717e59e7
15
Snakefile
15
Snakefile
|
@ -65,12 +65,15 @@ if config["ACTIVITY_RECOGNITION"]["COMPUTE"]:
|
||||||
files_to_compute.extend(expand("data/processed/{pid}/{sensor}_deltas.csv", pid=pids, sensor=table))
|
files_to_compute.extend(expand("data/processed/{pid}/{sensor}_deltas.csv", pid=pids, sensor=table))
|
||||||
files_to_compute.extend(expand("data/processed/{pid}/activity_recognition_{day_segment}.csv",pid=config["PIDS"], day_segment = config["ACTIVITY_RECOGNITION"]["DAY_SEGMENTS"]))
|
files_to_compute.extend(expand("data/processed/{pid}/activity_recognition_{day_segment}.csv",pid=config["PIDS"], day_segment = config["ACTIVITY_RECOGNITION"]["DAY_SEGMENTS"]))
|
||||||
|
|
||||||
if config["BATTERY"]["COMPUTE"]:
|
for provider in config["BATTERY"]["PROVIDERS"].keys():
|
||||||
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["BATTERY"]["DB_TABLE"]))
|
if config["BATTERY"]["PROVIDERS"][provider]["COMPUTE"]:
|
||||||
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime.csv", pid=config["PIDS"], sensor=config["BATTERY"]["DB_TABLE"]))
|
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_raw.csv", pid=config["PIDS"], sensor=config["BATTERY"]["DB_TABLE"]))
|
||||||
files_to_compute.extend(expand("data/raw/{pid}/{sensor}_with_datetime_unified.csv", pid=config["PIDS"], sensor=config["BATTERY"]["DB_TABLE"]))
|
files_to_compute.extend(expand("data/interim/{pid}/battery_episodes.csv", pid=config["PIDS"]))
|
||||||
files_to_compute.extend(expand("data/processed/{pid}/battery_deltas.csv", pid=config["PIDS"]))
|
files_to_compute.extend(expand("data/interim/{pid}/battery_episodes_resampled.csv", pid=config["PIDS"]))
|
||||||
files_to_compute.extend(expand("data/processed/{pid}/battery_{day_segment}.csv", pid = config["PIDS"], day_segment = config["BATTERY"]["DAY_SEGMENTS"]))
|
files_to_compute.extend(expand("data/interim/{pid}/battery_episodes_resampled_with_datetime.csv", pid=config["PIDS"]))
|
||||||
|
files_to_compute.extend(expand("data/interim/{pid}/{sensor_key}_features/{sensor_key}_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["SCREEN"]["PROVIDERS"][provider]["SRC_LANGUAGE"], provider_key=provider, sensor_key="BATTERY".lower()))
|
||||||
|
files_to_compute.extend(expand("data/processed/features/{pid}/{sensor_key}.csv", pid=config["PIDS"], sensor_key="BATTERY".lower()))
|
||||||
|
|
||||||
|
|
||||||
for provider in config["SCREEN"]["PROVIDERS"].keys():
|
for provider in config["SCREEN"]["PROVIDERS"].keys():
|
||||||
if config["SCREEN"]["PROVIDERS"][provider]["COMPUTE"]:
|
if config["SCREEN"]["PROVIDERS"][provider]["COMPUTE"]:
|
||||||
|
|
|
@ -121,10 +121,13 @@ ACTIVITY_RECOGNITION:
|
||||||
FEATURES: ["count","mostcommonactivity","countuniqueactivities","activitychangecount","sumstationary","summobile","sumvehicle"]
|
FEATURES: ["count","mostcommonactivity","countuniqueactivities","activitychangecount","sumstationary","summobile","sumvehicle"]
|
||||||
|
|
||||||
BATTERY:
|
BATTERY:
|
||||||
COMPUTE: False
|
|
||||||
DB_TABLE: battery
|
DB_TABLE: battery
|
||||||
DAY_SEGMENTS: *day_segments
|
PROVIDERS:
|
||||||
FEATURES: ["countdischarge", "sumdurationdischarge", "countcharge", "sumdurationcharge", "avgconsumptionrate", "maxconsumptionrate"]
|
RAPIDS:
|
||||||
|
COMPUTE: False
|
||||||
|
FEATURES: ["countdischarge", "sumdurationdischarge", "countcharge", "sumdurationcharge", "avgconsumptionrate", "maxconsumptionrate"]
|
||||||
|
SRC_FOLDER: "rapids" # inside src/features/battery
|
||||||
|
SRC_LANGUAGE: "python"
|
||||||
|
|
||||||
SCREEN:
|
SCREEN:
|
||||||
DB_TABLE: screen
|
DB_TABLE: screen
|
||||||
|
|
|
@ -73,16 +73,14 @@ rule screen_episodes:
|
||||||
rule resample_episodes:
|
rule resample_episodes:
|
||||||
input:
|
input:
|
||||||
"data/interim/{pid}/{sensor}_episodes.csv"
|
"data/interim/{pid}/{sensor}_episodes.csv"
|
||||||
params:
|
|
||||||
sensor = "{sensor}"
|
|
||||||
output:
|
output:
|
||||||
"data/interim/{pid}/{sensor}_episodes_resampled.csv"
|
"data/interim/{pid}/{sensor}_episodes_resampled.csv"
|
||||||
script:
|
script:
|
||||||
"../src/features/utils/resample_episodes.R"
|
"../src/features/utils/resample_episodes.R"
|
||||||
|
|
||||||
rule resample_screen_episodes_with_datetime:
|
rule resample_episodes_with_datetime:
|
||||||
input:
|
input:
|
||||||
sensor_input = "data/interim/{pid}/screen_episodes_resampled.csv",
|
sensor_input = "data/interim/{pid}/{sensor}_episodes_resampled.csv",
|
||||||
day_segments = "data/interim/day_segments/{pid}_day_segments.csv"
|
day_segments = "data/interim/day_segments/{pid}_day_segments.csv"
|
||||||
params:
|
params:
|
||||||
timezones = None,
|
timezones = None,
|
||||||
|
@ -90,7 +88,7 @@ rule resample_screen_episodes_with_datetime:
|
||||||
day_segments_type = config["DAY_SEGMENTS"]["TYPE"],
|
day_segments_type = config["DAY_SEGMENTS"]["TYPE"],
|
||||||
include_past_periodic_segments = config["DAY_SEGMENTS"]["INCLUDE_PAST_PERIODIC_SEGMENTS"]
|
include_past_periodic_segments = config["DAY_SEGMENTS"]["INCLUDE_PAST_PERIODIC_SEGMENTS"]
|
||||||
output:
|
output:
|
||||||
"data/interim/{pid}/screen_episodes_resampled_with_datetime.csv"
|
"data/interim/{pid}/{sensor}_episodes_resampled_with_datetime.csv"
|
||||||
script:
|
script:
|
||||||
"../src/data/readable_datetime.R"
|
"../src/data/readable_datetime.R"
|
||||||
|
|
||||||
|
@ -170,16 +168,29 @@ rule activity_features:
|
||||||
script:
|
script:
|
||||||
"../src/features/activity_recognition.py"
|
"../src/features/activity_recognition.py"
|
||||||
|
|
||||||
rule battery_features:
|
rule battery_r_features:
|
||||||
input:
|
input:
|
||||||
"data/interim/{pid}/battery_episodes.csv"
|
battery_episodes = "data/interim/{pid}/battery_episodes_resampled_with_datetime.csv",
|
||||||
|
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
|
||||||
params:
|
params:
|
||||||
day_segment = "{day_segment}",
|
provider = lambda wildcards: config["BATTERY"]["PROVIDERS"][wildcards.provider_key],
|
||||||
features = config["BATTERY"]["FEATURES"]
|
provider_key = "{provider_key}"
|
||||||
output:
|
output:
|
||||||
"data/processed/{pid}/battery_{day_segment}.csv"
|
"data/interim/{pid}/battery_features/battery_r_{provider_key}.csv"
|
||||||
script:
|
script:
|
||||||
"../src/features/battery_features.py"
|
"../src/features/battery/battery_entry.R"
|
||||||
|
|
||||||
|
rule battery_python_features:
|
||||||
|
input:
|
||||||
|
battery_episodes = "data/interim/{pid}/battery_episodes_resampled_with_datetime.csv",
|
||||||
|
day_segments_labels = "data/interim/day_segments/{pid}_day_segments_labels.csv"
|
||||||
|
params:
|
||||||
|
provider = lambda wildcards: config["BATTERY"]["PROVIDERS"][wildcards.provider_key],
|
||||||
|
provider_key = "{provider_key}"
|
||||||
|
output:
|
||||||
|
"data/interim/{pid}/battery_features/battery_python_{provider_key}.csv"
|
||||||
|
script:
|
||||||
|
"../src/features/battery/battery_entry.py"
|
||||||
|
|
||||||
rule screen_r_features:
|
rule screen_r_features:
|
||||||
input:
|
input:
|
||||||
|
|
|
@ -1,48 +0,0 @@
|
||||||
import pandas as pd
|
|
||||||
from datetime import datetime, timedelta, time
|
|
||||||
from features_utils import splitOvernightEpisodes, splitMultiSegmentEpisodes
|
|
||||||
|
|
||||||
|
|
||||||
def base_battery_features(battery_data, day_segment, requested_features):
|
|
||||||
# name of the features this function can compute
|
|
||||||
base_features_names = ["countdischarge", "sumdurationdischarge", "countcharge", "sumdurationcharge", "avgconsumptionrate", "maxconsumptionrate"]
|
|
||||||
# the subset of requested features this function can compute
|
|
||||||
features_to_compute = list(set(requested_features) & set(base_features_names))
|
|
||||||
|
|
||||||
battery_features = pd.DataFrame(columns=["local_date"] + ["battery_" + day_segment + "_" + x for x in features_to_compute])
|
|
||||||
if not battery_data.empty:
|
|
||||||
battery_data = splitOvernightEpisodes(battery_data, ["battery_diff"], [])
|
|
||||||
|
|
||||||
if day_segment != "daily":
|
|
||||||
battery_data = splitMultiSegmentEpisodes(battery_data, day_segment, ["battery_diff"])
|
|
||||||
|
|
||||||
if not battery_data.empty:
|
|
||||||
battery_data["battery_consumption_rate"] = battery_data["battery_diff"] / battery_data["time_diff"]
|
|
||||||
|
|
||||||
# for battery_data_discharge:
|
|
||||||
battery_data_discharge = battery_data[battery_data["battery_diff"] > 0]
|
|
||||||
battery_discharge_features = pd.DataFrame()
|
|
||||||
if "countdischarge" in features_to_compute:
|
|
||||||
battery_discharge_features["battery_"+day_segment+"_countdischarge"] = battery_data_discharge.groupby(["local_start_date"])["local_start_date"].count()
|
|
||||||
if "sumdurationdischarge" in features_to_compute:
|
|
||||||
battery_discharge_features["battery_"+day_segment+"_sumdurationdischarge"] = battery_data_discharge.groupby(["local_start_date"])["time_diff"].sum()
|
|
||||||
if "avgconsumptionrate" in features_to_compute:
|
|
||||||
battery_discharge_features["battery_"+day_segment+"_avgconsumptionrate"] = battery_data_discharge.groupby(["local_start_date"])["battery_consumption_rate"].mean()
|
|
||||||
if "maxconsumptionrate" in features_to_compute:
|
|
||||||
battery_discharge_features["battery_"+day_segment+"_maxconsumptionrate"] = battery_data_discharge.groupby(["local_start_date"])["battery_consumption_rate"].max()
|
|
||||||
|
|
||||||
# for battery_data_charge:
|
|
||||||
battery_data_charge = battery_data[battery_data["battery_diff"] <= 0]
|
|
||||||
battery_charge_features = pd.DataFrame()
|
|
||||||
if "countcharge" in features_to_compute:
|
|
||||||
battery_charge_features["battery_"+day_segment+"_countcharge"] = battery_data_charge.groupby(["local_start_date"])["local_start_date"].count()
|
|
||||||
if "sumdurationcharge" in features_to_compute:
|
|
||||||
battery_charge_features["battery_"+day_segment+"_sumdurationcharge"] = battery_data_charge.groupby(["local_start_date"])["time_diff"].sum()
|
|
||||||
|
|
||||||
# combine discharge features and charge features; fill the missing values with ZERO
|
|
||||||
battery_features = pd.concat([battery_discharge_features, battery_charge_features], axis=1, sort=True).fillna(0)
|
|
||||||
|
|
||||||
battery_features.index.rename("local_date", inplace=True)
|
|
||||||
battery_features = battery_features.reset_index()
|
|
||||||
|
|
||||||
return battery_features
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
source("renv/activate.R")
|
||||||
|
source("src/features/utils/utils.R")
|
||||||
|
library("dplyr")
|
||||||
|
library("tidyr")
|
||||||
|
|
||||||
|
sensor_data_file <- snakemake@input[["battery_episodes"]]
|
||||||
|
day_segments_file <- snakemake@input[["day_segments_labels"]]
|
||||||
|
provider <- snakemake@params["provider"][["provider"]]
|
||||||
|
provider_key <- snakemake@params["provider_key"]
|
||||||
|
|
||||||
|
sensor_features <- fetch_provider_features(provider, provider_key, "battery", sensor_data_file, day_segments_file)
|
||||||
|
|
||||||
|
write.csv(sensor_features, snakemake@output[[1]], row.names = FALSE)
|
|
@ -0,0 +1,18 @@
|
||||||
|
import pandas as pd
|
||||||
|
from importlib import import_module, util
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# import fetch_provider_features from src/features/utils/utils.py
|
||||||
|
spec = util.spec_from_file_location("util", str(Path(snakemake.scriptdir).parent / "utils" / "utils.py"))
|
||||||
|
mod = util.module_from_spec(spec)
|
||||||
|
spec.loader.exec_module(mod)
|
||||||
|
fetch_provider_features = getattr(mod, "fetch_provider_features")
|
||||||
|
|
||||||
|
battery_episodes_file = snakemake.input["battery_episodes"]
|
||||||
|
day_segments_file = snakemake.input["day_segments_labels"]
|
||||||
|
provider = snakemake.params["provider"]
|
||||||
|
provider_key = snakemake.params["provider_key"]
|
||||||
|
|
||||||
|
sensor_features = fetch_provider_features(provider, provider_key, "battery", battery_episodes_file, day_segments_file)
|
||||||
|
|
||||||
|
sensor_features.to_csv(snakemake.output[0], index=False)
|
|
@ -0,0 +1,56 @@
|
||||||
|
import pandas as pd
|
||||||
|
from datetime import datetime, timedelta, time
|
||||||
|
|
||||||
|
def rapids_features(battery_data, day_segment, provider, filter_data_by_segment, *args, **kwargs):
|
||||||
|
|
||||||
|
# name of the features this function can compute
|
||||||
|
base_features_names = ["countdischarge", "sumdurationdischarge", "countcharge", "sumdurationcharge", "avgconsumptionrate", "maxconsumptionrate"]
|
||||||
|
# the subset of requested features this function can compute
|
||||||
|
requested_features = provider["FEATURES"]
|
||||||
|
features_to_compute = list(set(requested_features) & set(base_features_names))
|
||||||
|
|
||||||
|
battery_features = pd.DataFrame(columns=["local_segment"] + ["battery_rapids_" + x for x in features_to_compute])
|
||||||
|
if not battery_data.empty:
|
||||||
|
battery_data = filter_data_by_segment(battery_data, day_segment)
|
||||||
|
battery_data = kwargs["deduplicate_episodes"](battery_data)
|
||||||
|
|
||||||
|
if not battery_data.empty:
|
||||||
|
# chunk_episodes
|
||||||
|
battery_data = kwargs["chunk_episodes"](battery_data)
|
||||||
|
|
||||||
|
if not battery_data.empty:
|
||||||
|
|
||||||
|
battery_data["episode_id"] = ((battery_data.battery_status != battery_data.battery_status.shift()) | (battery_data.start_timestamp - battery_data.end_timestamp.shift() > 1)).cumsum()
|
||||||
|
grouped = battery_data.groupby(by=["local_segment", "episode_id", "battery_status"])
|
||||||
|
battery_episodes= grouped[["time_diff"]].sum()
|
||||||
|
battery_episodes["battery_diff"] = grouped["battery_level"].first() - grouped["battery_level"].last()
|
||||||
|
battery_episodes["battery_consumption_rate"] = battery_episodes["battery_diff"] / battery_episodes["time_diff"]
|
||||||
|
battery_episodes.reset_index(inplace=True)
|
||||||
|
|
||||||
|
# for discharge episodes
|
||||||
|
battery_discharge_episodes = battery_episodes[(battery_episodes["battery_status"] == 3) | (battery_episodes["battery_status"] == 4)]
|
||||||
|
battery_discharge_features = pd.DataFrame()
|
||||||
|
if "countdischarge" in features_to_compute:
|
||||||
|
battery_discharge_features["battery_rapids_countdischarge"] = battery_discharge_episodes.groupby(["local_segment"])["episode_id"].count()
|
||||||
|
if "sumdurationdischarge" in features_to_compute:
|
||||||
|
battery_discharge_features["battery_rapids_sumdurationdischarge"] = battery_discharge_episodes.groupby(["local_segment"])["time_diff"].sum()
|
||||||
|
if "avgconsumptionrate" in features_to_compute:
|
||||||
|
battery_discharge_features["battery_rapids_avgconsumptionrate"] = battery_discharge_episodes.groupby(["local_segment"])["battery_consumption_rate"].mean()
|
||||||
|
if "maxconsumptionrate" in features_to_compute:
|
||||||
|
battery_discharge_features["battery_rapids_maxconsumptionrate"] = battery_discharge_episodes.groupby(["local_segment"])["battery_consumption_rate"].max()
|
||||||
|
|
||||||
|
# for charge episodes
|
||||||
|
battery_charge_episodes = battery_episodes[(battery_episodes["battery_status"] == 2) | (battery_episodes["battery_status"] == 5)]
|
||||||
|
battery_charge_features = pd.DataFrame()
|
||||||
|
if "countcharge" in features_to_compute:
|
||||||
|
battery_charge_features["battery_rapids_countcharge"] = battery_charge_episodes.groupby(["local_segment"])["episode_id"].count()
|
||||||
|
if "sumdurationcharge" in features_to_compute:
|
||||||
|
battery_charge_features["battery_rapids_sumdurationcharge"] = battery_charge_episodes.groupby(["local_segment"])["time_diff"].sum()
|
||||||
|
|
||||||
|
# combine discharge features and charge features; fill the missing values with ZERO
|
||||||
|
battery_features = pd.concat([battery_discharge_features, battery_charge_features], axis=1, sort=True).fillna(0)
|
||||||
|
|
||||||
|
battery_features.index.rename("local_segment", inplace=True)
|
||||||
|
battery_features = battery_features.reset_index()
|
||||||
|
|
||||||
|
return battery_features
|
|
@ -44,7 +44,6 @@ get_screen_episodes <- function(screen){
|
||||||
filter( (screen_status == 3 & lead(screen_status) == 0) | (screen_status == 0 & lag(screen_status) == 3) ) %>%
|
filter( (screen_status == 3 & lead(screen_status) == 0) | (screen_status == 0 & lag(screen_status) == 3) ) %>%
|
||||||
summarise(episode = "unlock",
|
summarise(episode = "unlock",
|
||||||
screen_sequence = toString(screen_status),
|
screen_sequence = toString(screen_status),
|
||||||
time_diff = (last(timestamp) - first(timestamp)) / (1000 * 60),
|
|
||||||
start_timestamp = first(timestamp),
|
start_timestamp = first(timestamp),
|
||||||
end_timestamp = last(timestamp)) %>%
|
end_timestamp = last(timestamp)) %>%
|
||||||
filter(str_detect(screen_sequence,
|
filter(str_detect(screen_sequence,
|
||||||
|
@ -58,7 +57,6 @@ get_screen_episodes <- function(screen){
|
||||||
if(nrow(screen) < 2){
|
if(nrow(screen) < 2){
|
||||||
episodes <- data.frame(episode = character(),
|
episodes <- data.frame(episode = character(),
|
||||||
screen_sequence = character(),
|
screen_sequence = character(),
|
||||||
time_diff = numeric(),
|
|
||||||
start_timestamp = character(),
|
start_timestamp = character(),
|
||||||
end_timestamp = character())
|
end_timestamp = character())
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -47,6 +47,7 @@ def rapids_features(screen_data, day_segment, provider, filter_data_by_segment,
|
||||||
if not screen_data.empty:
|
if not screen_data.empty:
|
||||||
|
|
||||||
screen_data = filter_data_by_segment(screen_data, day_segment)
|
screen_data = filter_data_by_segment(screen_data, day_segment)
|
||||||
|
screen_data = kwargs["deduplicate_episodes"](screen_data)
|
||||||
if not screen_data.empty:
|
if not screen_data.empty:
|
||||||
# chunk_episodes
|
# chunk_episodes
|
||||||
screen_data = kwargs["chunk_episodes"](screen_data)
|
screen_data = kwargs["chunk_episodes"](screen_data)
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
source("renv/activate.R")
|
source("renv/activate.R")
|
||||||
library("dplyr")
|
library("dplyr")
|
||||||
|
library("tidyr")
|
||||||
|
|
||||||
# Using mostly indeixng instead of tidyr because is faster
|
# Using mostly indeixng instead of tidyr because is faster
|
||||||
resampled_episodes <- read.csv(snakemake@input[[1]])
|
resampled_episodes <- read.csv(snakemake@input[[1]])
|
||||||
|
|
|
@ -1,44 +0,0 @@
|
||||||
import pandas as pd
|
|
||||||
|
|
||||||
|
|
||||||
def resample_screen_deltas(screen_deltas):
|
|
||||||
|
|
||||||
column_names = ("episode_id", "episode", "screen_sequence", "timestamp", "duration")
|
|
||||||
records_resampled = []
|
|
||||||
|
|
||||||
for _, row in screen_deltas.iterrows():
|
|
||||||
episode_id, episode, screen_sequence = row["episode_id"], row["episode"], row["screen_sequence"]
|
|
||||||
start_timestamp, end_timestamp = row["start_timestamp"], row["end_timestamp"]
|
|
||||||
|
|
||||||
for timestamp in range(start_timestamp, end_timestamp, 1000 * 60):
|
|
||||||
records_resampled.append((episode_id, episode, screen_sequence, timestamp, min(1, (end_timestamp - timestamp) / (1000 * 60))))
|
|
||||||
|
|
||||||
return records_resampled, column_names
|
|
||||||
|
|
||||||
|
|
||||||
def resample_battery_deltas(battery_deltas):
|
|
||||||
column_names = ("battery_diff", "timestamp")
|
|
||||||
records_resampled = []
|
|
||||||
|
|
||||||
for _, row in battery_deltas.iterrows():
|
|
||||||
start_timestamp, end_timestamp = row["start_timestamp"], row["end_timestamp"]
|
|
||||||
battery_diff = row["battery_diff"] / row["time_diff"]
|
|
||||||
|
|
||||||
for timestamp in range(start_timestamp, end_timestamp, 1000 * 60):
|
|
||||||
records_resampled.append((battery_diff, timestamp))
|
|
||||||
|
|
||||||
return records_resampled, column_names
|
|
||||||
|
|
||||||
|
|
||||||
deltas = pd.read_csv(snakemake.input[0])
|
|
||||||
sensor = snakemake.params["sensor"]
|
|
||||||
|
|
||||||
if sensor == "battery":
|
|
||||||
records_resampled, column_names = resample_battery_deltas(deltas)
|
|
||||||
|
|
||||||
if sensor == "screen":
|
|
||||||
records_resampled, column_names = resample_screen_deltas(deltas)
|
|
||||||
|
|
||||||
|
|
||||||
deltas_resampled = pd.DataFrame(data=records_resampled, columns=column_names)
|
|
||||||
deltas_resampled.to_csv(snakemake.output[0], index=False)
|
|
|
@ -13,45 +13,47 @@ def filter_data_by_segment(data, day_segment):
|
||||||
data[["local_segment","timestamps_segment"]] = data["local_segment"].str.split(pat =";",n=1, expand=True)
|
data[["local_segment","timestamps_segment"]] = data["local_segment"].str.split(pat =";",n=1, expand=True)
|
||||||
return(data)
|
return(data)
|
||||||
|
|
||||||
|
# Each minute could fall into two segments.
|
||||||
|
# Firstly, we generate two rows for each resampled minute via resample_episodes rule:
|
||||||
|
# the first row's timestamp column is the start_timestamp, while the second row's timestamp column is the end_timestamp.
|
||||||
|
# Then, we check if the segments of start_timestamp are the same as the segments of end_timestamp:
|
||||||
|
# if they are the same (only fall into one segment), we will discard the second row;
|
||||||
|
# otherwise (fall into two segments), we will keep both.
|
||||||
|
def deduplicate_episodes(sensor_episodes):
|
||||||
|
# Drop rows where segments of start_timestamp and end_timestamp are the same
|
||||||
|
sensor_episodes = sensor_episodes.drop_duplicates(subset=["start_timestamp", "end_timestamp", "local_segment"], keep="first")
|
||||||
|
|
||||||
|
# Delete useless columns
|
||||||
|
for drop_col in ["utc_date_time", "local_date_time", "local_date", "local_time", "local_hour", "local_minute"]:
|
||||||
|
del sensor_episodes[drop_col]
|
||||||
|
|
||||||
|
return sensor_episodes
|
||||||
|
|
||||||
def chunk_episodes(sensor_episodes):
|
def chunk_episodes(sensor_episodes):
|
||||||
import pytz, copy
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
# avoid warning messages: SettingWithCopyWarning
|
# Unix timestamp for current segment in milliseconds
|
||||||
sensor_episodes = sensor_episodes.copy()
|
sensor_episodes[["segment_start_timestamp", "segment_end_timestamp"]] = sensor_episodes["timestamps_segment"].str.split(",", expand=True)
|
||||||
|
|
||||||
# convert string to datetime with local timezone
|
# Compute chunked timestamp
|
||||||
sensor_episodes["start_datetime"] = pd.to_datetime(sensor_episodes["local_segment"].str[-39:-20], format="%Y-%m-%d#%H:%M:%S")
|
sensor_episodes["chunked_start_timestamp"] = sensor_episodes[["start_timestamp", "segment_start_timestamp"]].max(axis=1)
|
||||||
sensor_episodes["start_datetime"] = pd.concat([data["start_datetime"].dt.tz_localize(tz) for tz, data in sensor_episodes.groupby("local_timezone")])
|
sensor_episodes["chunked_end_timestamp"] = sensor_episodes[["end_timestamp", "segment_end_timestamp"]].min(axis=1)
|
||||||
|
|
||||||
sensor_episodes["end_datetime"] = pd.to_datetime(sensor_episodes["local_segment"].str[-19:], format="%Y-%m-%d#%H:%M:%S")
|
# Compute time_diff: intersection of current row and segment
|
||||||
sensor_episodes["end_datetime"] = pd.concat([data["end_datetime"].dt.tz_localize(tz) for tz, data in sensor_episodes.groupby("local_timezone")])
|
|
||||||
|
|
||||||
# unix timestamp in milliseconds
|
|
||||||
sensor_episodes["start_timestamp"] = sensor_episodes["start_datetime"].apply(lambda dt: dt.timestamp() * 1000)
|
|
||||||
sensor_episodes["end_timestamp"] = sensor_episodes["end_datetime"].apply(lambda dt: dt.timestamp() * 1000)
|
|
||||||
|
|
||||||
# compute chunked timestamp
|
|
||||||
sensor_episodes["chunked_start_timestamp"] = sensor_episodes[["timestamp", "start_timestamp"]].max(axis=1)
|
|
||||||
|
|
||||||
sensor_episodes["timestamp_plus_duration"] = sensor_episodes["timestamp"] + sensor_episodes["duration"] * 1000 * 60
|
|
||||||
sensor_episodes["chunked_end_timestamp"] = sensor_episodes[["timestamp_plus_duration", "end_timestamp"]].min(axis=1)
|
|
||||||
|
|
||||||
# time_diff: intersection of current row and segment
|
|
||||||
sensor_episodes["time_diff"] = (sensor_episodes["chunked_end_timestamp"] - sensor_episodes["chunked_start_timestamp"]) / (1000 * 60)
|
sensor_episodes["time_diff"] = (sensor_episodes["chunked_end_timestamp"] - sensor_episodes["chunked_start_timestamp"]) / (1000 * 60)
|
||||||
|
|
||||||
# compute chunked datetime
|
# Compute chunked datetime
|
||||||
sensor_episodes["chunked_start_datetime"] = pd.to_datetime(sensor_episodes["chunked_start_timestamp"], unit="ms", utc=True)
|
sensor_episodes["chunked_start_datetime"] = pd.to_datetime(sensor_episodes["chunked_start_timestamp"], unit="ms", utc=True)
|
||||||
sensor_episodes["chunked_start_datetime"] = pd.concat([data["chunked_start_datetime"].dt.tz_convert(tz) for tz, data in sensor_episodes.groupby("local_timezone")])
|
sensor_episodes["chunked_start_datetime"] = pd.concat([data["chunked_start_datetime"].dt.tz_convert(tz) for tz, data in sensor_episodes.groupby("local_timezone")])
|
||||||
|
|
||||||
sensor_episodes["chunked_end_datetime"] = pd.to_datetime(sensor_episodes["chunked_end_timestamp"], unit="ms", utc=True)
|
sensor_episodes["chunked_end_datetime"] = pd.to_datetime(sensor_episodes["chunked_end_timestamp"], unit="ms", utc=True)
|
||||||
sensor_episodes["chunked_end_datetime"] = pd.concat([data["chunked_end_datetime"].dt.tz_convert(tz) for tz, data in sensor_episodes.groupby("local_timezone")])
|
sensor_episodes["chunked_end_datetime"] = pd.concat([data["chunked_end_datetime"].dt.tz_convert(tz) for tz, data in sensor_episodes.groupby("local_timezone")])
|
||||||
|
|
||||||
# merge episodes
|
# Merge episodes
|
||||||
sensor_episodes_grouped = sensor_episodes.groupby(["episode_id", "episode", "screen_sequence"])
|
cols_for_groupby = [col for col in sensor_episodes.columns if col not in ["local_timezone", "timestamps_segment", "timestamp", "assigned_segments", "start_datetime", "end_datetime", "start_timestamp", "end_timestamp", "time_diff", "segment_start_timestamp", "segment_end_timestamp", "chunked_start_timestamp", "chunked_end_timestamp", "chunked_start_datetime", "chunked_end_datetime"]]
|
||||||
|
|
||||||
|
sensor_episodes_grouped = sensor_episodes.groupby(by=cols_for_groupby)
|
||||||
merged_sensor_episodes = sensor_episodes_grouped[["time_diff"]].sum()
|
merged_sensor_episodes = sensor_episodes_grouped[["time_diff"]].sum()
|
||||||
merged_sensor_episodes["local_segment"] = sensor_episodes_grouped["local_segment"].first()
|
|
||||||
|
|
||||||
merged_sensor_episodes["start_timestamp"] = sensor_episodes_grouped["chunked_start_timestamp"].first()
|
merged_sensor_episodes["start_timestamp"] = sensor_episodes_grouped["chunked_start_timestamp"].first()
|
||||||
merged_sensor_episodes["end_timestamp"] = sensor_episodes_grouped["chunked_end_timestamp"].last()
|
merged_sensor_episodes["end_timestamp"] = sensor_episodes_grouped["chunked_end_timestamp"].last()
|
||||||
|
@ -80,7 +82,7 @@ def fetch_provider_features(provider, provider_key, config_key, sensor_data_file
|
||||||
|
|
||||||
for day_segment in day_segments_labels["label"]:
|
for day_segment in day_segments_labels["label"]:
|
||||||
print("{} Processing {} {} {}".format(rapids_log_tag, config_key, provider_key, day_segment))
|
print("{} Processing {} {} {}".format(rapids_log_tag, config_key, provider_key, day_segment))
|
||||||
features = feature_function(sensor_data, day_segment, provider, filter_data_by_segment=filter_data_by_segment, chunk_episodes=chunk_episodes)
|
features = feature_function(sensor_data, day_segment, provider, filter_data_by_segment=filter_data_by_segment, deduplicate_episodes=deduplicate_episodes, chunk_episodes=chunk_episodes)
|
||||||
sensor_features = sensor_features.merge(features, how="outer")
|
sensor_features = sensor_features.merge(features, how="outer")
|
||||||
else:
|
else:
|
||||||
for feature in provider["FEATURES"]:
|
for feature in provider["FEATURES"]:
|
||||||
|
|
Loading…
Reference in New Issue