192 lines
11 KiB
Python
192 lines
11 KiB
Python
from datetime import time
|
|
import numpy as np
|
|
import pandas as pd
|
|
from phone_locations.doryab.doryab_clustering import haversine, create_clustering_hyperparameters, cluster
|
|
|
|
|
|
|
|
def apply_cluster_strategy(location_data, time_segment, clustering_algorithm, dbscan_eps, dbscan_minsamples, cluster_on, filter_data_by_segment):
|
|
|
|
hyperparameters = create_clustering_hyperparameters(clustering_algorithm, dbscan_eps, dbscan_minsamples)
|
|
|
|
if cluster_on == "PARTICIPANT_DATASET":
|
|
# clusters are created in add_doryab_extra_columns.py script
|
|
location_data = filter_data_by_segment(location_data, time_segment)
|
|
elif cluster_on == "TIME_SEGMENT":
|
|
location_data = filter_data_by_segment(location_data, time_segment)
|
|
location_data = cluster(location_data, clustering_algorithm, **hyperparameters)
|
|
else: # TIME_SEGMENT_INSTANCE
|
|
location_data = filter_data_by_segment(location_data, time_segment)
|
|
location_data_clusters = pd.DataFrame()
|
|
for segment_instance, instance_data in location_data.groupby(["local_segment"]):
|
|
location_data_per_group = cluster(instance_data, clustering_algorithm, **hyperparameters)
|
|
location_data_clusters = pd.concat([location_data_per_group, location_data_clusters])
|
|
location_data = location_data_clusters
|
|
|
|
return location_data
|
|
|
|
def variance_and_logvariance_features(location_data, location_features):
|
|
location_data_grouped = location_data.groupby("local_segment")
|
|
location_data["latitude_X_duration"] = location_data["double_latitude"] * location_data["duration"]
|
|
location_data["longitude_X_duration"] = location_data["double_longitude"] * location_data["duration"]
|
|
|
|
location_data["latitude_wavg"] = location_data_grouped["latitude_X_duration"].transform("sum") / location_data_grouped["duration"].transform("sum")
|
|
location_data["longitude_wavg"] = location_data_grouped["longitude_X_duration"].transform("sum") / location_data_grouped["duration"].transform("sum")
|
|
|
|
location_data["latitude_for_wvar"] = (location_data["double_latitude"] - location_data["latitude_wavg"]) ** 2 * location_data["duration"] * 60
|
|
location_data["longitude_for_wvar"] = (location_data["double_longitude"] - location_data["longitude_wavg"]) ** 2 * location_data["duration"] * 60
|
|
|
|
location_features["locationvariance"] = ((location_data_grouped["latitude_for_wvar"].sum() + location_data_grouped["longitude_for_wvar"].sum()) / (location_data_grouped["duration"].sum() * 60 - 1)).fillna(0)
|
|
location_features["loglocationvariance"] = np.log10(location_features["locationvariance"]).replace(-np.inf, np.nan)
|
|
|
|
return location_features
|
|
|
|
def distance_and_speed_features(moving_data):
|
|
|
|
distance_and_speed = moving_data[["local_segment", "distance"]].groupby(["local_segment"]).sum().rename(columns={"distance": "totaldistance"})
|
|
|
|
moving_data_grouped = moving_data.groupby(["local_segment"])
|
|
|
|
moving_data["speed_X_duration"] = moving_data["speed"] * moving_data["duration"]
|
|
distance_and_speed["avgspeed"] = moving_data_grouped["speed_X_duration"].sum() / moving_data_grouped["duration"].sum()
|
|
|
|
moving_data["speed_wavg"] = moving_data_grouped["speed_X_duration"].transform("sum") / moving_data_grouped["duration"].transform("sum")
|
|
moving_data["speed_for_wvar"] = (moving_data["speed"] - moving_data["speed_wavg"]) ** 2 * moving_data["duration"] * 60
|
|
distance_and_speed["varspeed"] = moving_data_grouped["speed_for_wvar"].sum() / (moving_data_grouped["duration"].sum() * 60 - 1)
|
|
|
|
return distance_and_speed
|
|
|
|
def radius_of_gyration(location_data):
|
|
|
|
if location_data.empty:
|
|
return np.nan
|
|
|
|
# define a lambda function to compute the weighted mean for each cluster
|
|
weighted_mean = lambda x: np.average(x, weights=location_data.loc[x.index, "duration"])
|
|
|
|
# center is the centroid of the places visited during a segment instance, not the home location
|
|
clusters = location_data.groupby(["local_segment", "cluster_label"]).agg(
|
|
double_latitude=("double_latitude", weighted_mean),
|
|
double_longitude=("double_longitude", weighted_mean),
|
|
time_in_a_cluster=("duration", "sum")
|
|
).reset_index()
|
|
|
|
# redefine the lambda function to compute the weighted mean across clusters
|
|
weighted_mean = lambda x: np.average(x, weights=clusters.loc[x.index, "time_in_a_cluster"])
|
|
|
|
clusters[["centroid_double_latitude", "centroid_double_longitude"]] = clusters.groupby(["local_segment"], sort=False)[["double_latitude", "double_longitude"]].transform(weighted_mean)
|
|
clusters["distance_squared"] = haversine(clusters["double_longitude"], clusters["double_latitude"], clusters["centroid_double_longitude"], clusters["centroid_double_latitude"]) ** 2
|
|
|
|
clusters["distance_squared_X_time_in_a_cluster"] = clusters["distance_squared"] * clusters["time_in_a_cluster"]
|
|
rog = np.sqrt(clusters.groupby(["local_segment"])["distance_squared_X_time_in_a_cluster"].sum() / clusters.groupby(["local_segment"])["time_in_a_cluster"].sum().replace(0, np.inf))
|
|
|
|
return rog
|
|
|
|
def cluster_stay(x, stay_at_clusters, cluster_n):
|
|
topn_cluster_label = x[stay_at_clusters.loc[x.index]["cluster_label"] == cluster_n]
|
|
time_at_topn = topn_cluster_label.iloc[0] if len(topn_cluster_label) == 1 else None
|
|
return time_at_topn
|
|
|
|
def stay_at_topn_clusters(location_data):
|
|
|
|
stay_at_clusters = location_data[["local_segment", "cluster_label", "duration"]].groupby(["local_segment", "cluster_label"], sort=True).sum().reset_index()
|
|
|
|
stay_at_clusters_features = stay_at_clusters.groupby(["local_segment"]).agg(
|
|
timeattop1location=("duration", lambda x: cluster_stay(x, stay_at_clusters, 1)),
|
|
timeattop2location=("duration", lambda x: cluster_stay(x, stay_at_clusters, 2)),
|
|
timeattop3location=("duration", lambda x: cluster_stay(x, stay_at_clusters, 3)),
|
|
maxlengthstayatclusters=("duration", "max"),
|
|
minlengthstayatclusters=("duration", "min"),
|
|
avglengthstayatclusters=("duration", "mean"),
|
|
stdlengthstayatclusters=("duration", "std")
|
|
).fillna(0)
|
|
|
|
return stay_at_clusters_features
|
|
|
|
def location_entropy(location_data):
|
|
|
|
location_data = location_data.groupby(["local_segment", "cluster_label"])[["duration"]].sum().reset_index().rename(columns={"duration": "cluster_duration"})
|
|
location_data["all_clusters_duration"] = location_data.groupby(["local_segment"])["cluster_duration"].transform("sum")
|
|
location_data["plogp"] = (location_data["cluster_duration"] / location_data["all_clusters_duration"]).apply(lambda x: x * np.log(x))
|
|
|
|
entropy = -1 * location_data.groupby(["local_segment"])[["plogp"]].sum().rename(columns={"plogp": "locationentropy"})
|
|
|
|
entropy["num_clusters"] = location_data.groupby(["local_segment"])["cluster_label"].nunique()
|
|
entropy["normalizedlocationentropy"] = entropy["locationentropy"] / entropy["num_clusters"]
|
|
|
|
return entropy
|
|
|
|
|
|
|
|
def doryab_features(sensor_data_files, time_segment, provider, filter_data_by_segment, *args, **kwargs):
|
|
|
|
location_data = pd.read_csv(sensor_data_files["sensor_data"])
|
|
requested_features = provider["FEATURES"]
|
|
dbscan_eps = provider["DBSCAN_EPS"]
|
|
dbscan_minsamples = provider["DBSCAN_MINSAMPLES"]
|
|
cluster_on = provider["CLUSTER_ON"]
|
|
clustering_algorithm = provider["CLUSTERING_ALGORITHM"]
|
|
radius_from_home = provider["RADIUS_FOR_HOME"]
|
|
|
|
if provider["MINUTES_DATA_USED"]:
|
|
requested_features.append("minutesdataused")
|
|
|
|
# name of the features this function can compute
|
|
base_features_names = ["locationvariance","loglocationvariance","totaldistance","avgspeed","varspeed","numberofsignificantplaces","numberlocationtransitions","radiusgyration","timeattop1location","timeattop2location","timeattop3location","movingtostaticratio","outlierstimepercent","maxlengthstayatclusters","minlengthstayatclusters","avglengthstayatclusters","stdlengthstayatclusters","locationentropy","normalizedlocationentropy","minutesdataused","timeathome","homelabel"]
|
|
# the subset of requested features this function can compute
|
|
features_to_compute = list(set(requested_features) & set(base_features_names))
|
|
|
|
location_data = apply_cluster_strategy(location_data, time_segment, clustering_algorithm, dbscan_eps, dbscan_minsamples, cluster_on, filter_data_by_segment)
|
|
|
|
if location_data.empty:
|
|
return pd.DataFrame(columns=["local_segment"] + features_to_compute)
|
|
location_features = pd.DataFrame()
|
|
|
|
# update distance after chunk_episodes() function
|
|
location_data["distance"] = location_data["speed"] * (location_data["duration"] / 60) * 1000 # in meters
|
|
|
|
location_features["minutesdataused"] = location_data[["local_segment", "duration"]].groupby(["local_segment"])["duration"].sum()
|
|
|
|
# variance features
|
|
location_features = variance_and_logvariance_features(location_data, location_features)
|
|
|
|
# distance and speed features
|
|
moving_data = location_data[location_data["is_stationary"] == 0].copy()
|
|
location_features = location_features.merge(distance_and_speed_features(moving_data), how="outer", left_index=True, right_index=True)
|
|
location_features[["totaldistance", "avgspeed", "varspeed"]] = location_features[["totaldistance", "avgspeed", "varspeed"]].fillna(0)
|
|
|
|
# stationary features
|
|
stationary_data = location_data[location_data["is_stationary"] == 1].copy()
|
|
stationary_data_without_outliers = stationary_data[stationary_data["cluster_label"] != -1]
|
|
|
|
location_features["numberofsignificantplaces"] = stationary_data_without_outliers.groupby(["local_segment"])["cluster_label"].nunique()
|
|
# number of location transitions: ignores transitions from moving to static and vice-versa, but counts transitions from outliers to major location clusters
|
|
location_features["numberlocationtransitions"] = stationary_data[["local_segment", "cluster_label"]].groupby(["local_segment"])["cluster_label"].apply(lambda x: np.sum(x != x.shift()) - 1)
|
|
location_features["radiusgyration"] = radius_of_gyration(stationary_data_without_outliers)
|
|
|
|
# stay at topn clusters features
|
|
location_features = location_features.merge(stay_at_topn_clusters(stationary_data_without_outliers), how="outer", left_index=True, right_index=True)
|
|
|
|
# moving to static ratio
|
|
static_time = stationary_data.groupby(["local_segment"])["duration"].sum()
|
|
total_time = location_data.groupby(["local_segment"])["duration"].sum()
|
|
location_features["movingtostaticratio"] = static_time / total_time
|
|
|
|
# outliers time percent
|
|
outliers_time = stationary_data[stationary_data["cluster_label"] == -1].groupby(["local_segment"])["duration"].sum()
|
|
location_features["outlierstimepercent"] = (outliers_time / static_time).fillna(0)
|
|
|
|
# entropy features
|
|
location_features = location_features.merge(location_entropy(stationary_data_without_outliers), how="outer", left_index=True, right_index=True)
|
|
|
|
# time at home
|
|
stationary_data["time_at_home"] = stationary_data.apply(lambda row: row["duration"] if row["distance_from_home"] <= radius_from_home else 0, axis=1)
|
|
location_features["timeathome"] = stationary_data[["local_segment", "time_at_home"]].groupby(["local_segment"])["time_at_home"].sum()
|
|
|
|
# home label
|
|
location_features["homelabel"] = stationary_data[["local_segment", "home_label"]].groupby(["local_segment"]).agg(lambda x: pd.Series.mode(x)[0])
|
|
|
|
location_features = location_features[features_to_compute].reset_index()
|
|
|
|
return location_features
|