2021-04-09 18:05:25 +02:00
import warnings
import numpy as np
import pandas as pd
from doryab_clustering import haversine , create_clustering_hyperparameters , cluster
# Add "is_stationary" column to denote whether it is stationary or not
# "distance" and "speed" columns are also added
def mark_as_stationary ( location_data , threshold_static ) :
# Distance in meters
location_data = location_data . assign ( distance = haversine ( location_data [ " double_longitude " ] , location_data [ " double_latitude " ] , location_data [ " double_longitude " ] . shift ( - 1 ) , location_data [ " double_latitude " ] . shift ( - 1 ) ) )
# Speed in km/h
location_data . loc [ : , " speed " ] = ( location_data [ " distance " ] / location_data [ " duration_in_seconds " ] ) . replace ( np . inf , np . nan ) * 3.6
location_data . loc [ : , " is_stationary " ] = np . where ( location_data [ " speed " ] < threshold_static , 1 , 0 )
location_data . dropna ( subset = [ " duration_in_seconds " ] , inplace = True )
return location_data
def infer_home_location ( location_data , clustering_algorithm , hyperparameters , strategy , days_threshold ) :
# Home locations are inferred based on records logged during midnight to 6am.
# The home location is the mean coordinate of the home cluster.
if ( strategy == " DORYAB_STRATEGY " ) or ( strategy == " SUN_LI_VEGA_STRATEGY " ) :
location_data_filtered = location_data [ location_data [ " local_hour " ] < 6 ]
if location_data_filtered . empty :
warnings . warn ( " We could not infer a home location because there are no location records logged during midnight to 6am. " )
return pd . DataFrame ( columns = location_data_filtered . columns . tolist ( ) + [ " distance_from_home " , " home_label " ] )
location_data_filtered = cluster ( location_data_filtered , clustering_algorithm , * * hyperparameters )
if strategy == " DORYAB_STRATEGY " :
# We assume the participant does not change the home location during the whole study.
# The most common cluster of all nights are regarded as the home cluster.
home_location = location_data_filtered [ location_data_filtered [ " cluster_label " ] == 1 ] [ [ " double_latitude " , " double_longitude " ] ] . mean ( )
location_data [ " distance_from_home " ] = haversine ( location_data [ " double_longitude " ] , location_data [ " double_latitude " ] , [ home_location [ " double_longitude " ] ] * location_data . shape [ 0 ] , [ home_location [ " double_latitude " ] ] * location_data . shape [ 0 ] )
location_data [ " home_label " ] = 1
else : # SUN_LI_VEGA_STRATEGY
"""
We assume the participant might change the home location during the whole study .
Each night will be assigned a candidate home location based on the following rules :
if there are records within [ 03 : 30 : 00 , 04 : 30 : 00 ] : ( group 1 )
we choose the most common cluster during that period as the candidate of home cluster .
elif there are records within [ midnight , 03 : 30 : 00 ) : ( group 2 )
we choose the last valid cluster during that period as the candidate of home cluster .
elif there are records within ( 04 : 30 : 00 , 06 : 00 : 00 ] : ( group 3 )
we choose the first valid cluster during that period as the candidate of home cluster .
else :
the home location is NA ( missing ) for that night .
If the count of consecutive days with the same candidate home location cluster label is larger or equal to MINIMUM_DAYS_TO_DETECT_HOME_CHANGES ,
the candidate will be regarded as the home cluster ;
otherwise , the home cluster will be the last valid day ' s cluster.
( If there are no valid clusters before that day , it will be assigned the next valid day ' s cluster.)
"""
# Split location data into 3 groups: [midnight, 03:30:00), [03:30:00, 04:30:00], (04:30:00, 06:00:00]
location_data_filtered = location_data_filtered [ ~ location_data_filtered [ " cluster_label " ] . isin ( [ - 1 , np . nan ] ) ]
location_data_filtered [ " group " ] = location_data_filtered [ " local_time " ] . apply ( lambda x : 1 if x > = " 03:30:00 " and x < = " 04:30:00 " else ( 2 if x < " 03:30:00 " else 3 ) )
# Select the smallest group number per day
selected_groups = location_data_filtered [ location_data_filtered [ " group " ] == location_data_filtered . groupby ( " local_date " ) [ " group " ] . transform ( " min " ) ] [ [ " group " , " local_date " , " cluster_label " ] ]
# For group 1: [03:30:00, 04:30:00]
group_1 = selected_groups [ selected_groups [ " group " ] == 1 ]
home_clusters_group_1 = group_1 . groupby ( [ " local_date " ] ) . agg ( lambda x : pd . Series . mode ( x ) [ 0 ] )
# For group 2: [midnight, 03:30:00)
group_2 = selected_groups [ selected_groups [ " group " ] == 2 ]
home_clusters_group_2 = group_2 . groupby ( [ " local_date " ] ) . last ( )
# For group 3: (04:30:00, 06:00:00]
group_3 = selected_groups [ selected_groups [ " group " ] == 3 ]
home_clusters_group_3 = group_3 . groupby ( [ " local_date " ] ) . first ( )
home_clusters = pd . concat ( [ home_clusters_group_1 , home_clusters_group_2 , home_clusters_group_3 ] ) . sort_index ( )
# Count the consecutive days with the same candidate home location cluster label
home_clusters [ " number_of_days " ] = home_clusters . groupby ( ( home_clusters [ " cluster_label " ] != home_clusters [ " cluster_label " ] . shift ( 1 ) ) . cumsum ( ) ) [ " cluster_label " ] . transform ( " count " )
# Assign the missing days with (1) the last valid day's cluster first and (2) the next valid day's cluster then
home_clusters . loc [ home_clusters [ " number_of_days " ] < days_threshold , " cluster_label " ] = np . nan
location_data = location_data . merge ( home_clusters [ [ " cluster_label " ] ] , left_on = " local_date " , right_index = True , how = " left " )
location_data [ " cluster_label " ] = location_data [ " cluster_label " ] . fillna ( method = " ffill " ) . fillna ( method = " bfill " )
center_per_cluster = location_data_filtered . groupby ( [ " cluster_label " ] ) [ [ " double_latitude " , " double_longitude " ] ] . mean ( ) . rename ( columns = { " double_latitude " : " home_latitude " , " double_longitude " : " home_longitude " } )
location_data = location_data . merge ( center_per_cluster , left_on = " cluster_label " , right_index = True , how = " left " )
location_data [ " distance_from_home " ] = haversine ( location_data [ " double_longitude " ] , location_data [ " double_latitude " ] , location_data [ " home_longitude " ] , location_data [ " home_latitude " ] )
# reorder cluster labels
reorder_mapping = { old_label : idx + 1 for idx , old_label in enumerate ( location_data [ " cluster_label " ] . unique ( ) ) }
location_data [ " home_label " ] = location_data [ " cluster_label " ] . map ( reorder_mapping )
location_data . drop ( [ " cluster_label " , " home_longitude " , " home_latitude " ] , axis = 1 , inplace = True )
return location_data
location_data = pd . read_csv ( snakemake . input [ " sensor_input " ] )
2021-05-28 15:48:36 +02:00
provider = snakemake . params [ " provider " ]
accuracy_limit = provider [ " ACCURACY_LIMIT " ]
maximum_row_gap = provider [ " MAXIMUM_ROW_GAP " ]
dbscan_eps = provider [ " DBSCAN_EPS " ]
dbscan_minsamples = provider [ " DBSCAN_MINSAMPLES " ]
threshold_static = provider [ " THRESHOLD_STATIC " ]
clustering_algorithm = provider [ " CLUSTERING_ALGORITHM " ]
cluster_on = provider [ " CLUSTER_ON " ]
strategy = provider [ " INFER_HOME_LOCATION_STRATEGY " ]
days_threshold = provider [ " MINIMUM_DAYS_TO_DETECT_HOME_CHANGES " ]
2021-04-09 18:05:25 +02:00
rows_before_accuracy_filter = len ( location_data )
location_data = location_data [ location_data [ " accuracy " ] < accuracy_limit ]
if rows_before_accuracy_filter > 0 and len ( location_data ) == 0 :
warnings . warn ( " Cannot compute Doryab location features because there are no rows with an accuracy value lower than ACCURACY_LIMIT: {} " . format ( accuracy_limit ) )
if not location_data . timestamp . is_monotonic :
location_data . sort_values ( by = [ " timestamp " ] , inplace = True )
location_data [ " duration_in_seconds " ] = - 1 * location_data . timestamp . diff ( - 1 ) / 1000
location_data . loc [ location_data [ " duration_in_seconds " ] > = maximum_row_gap , " duration_in_seconds " ] = np . nan
location_data = mark_as_stationary ( location_data , threshold_static )
hyperparameters = create_clustering_hyperparameters ( clustering_algorithm , dbscan_eps , dbscan_minsamples )
location_data_with_doryab_columns = infer_home_location ( location_data , clustering_algorithm , hyperparameters , strategy , days_threshold )
if cluster_on == " PARTICIPANT_DATASET " :
location_data_with_doryab_columns = cluster ( location_data_with_doryab_columns , clustering_algorithm , * * hyperparameters )
location_data_with_doryab_columns . to_csv ( snakemake . output [ 0 ] , index = False )