2020-08-28 23:40:23 +02:00
rapids_log_tag = " RAPIDS: "
2020-08-28 19:53:00 +02:00
2020-12-03 00:41:03 +01:00
def filter_data_by_segment ( data , time_segment ) :
2021-01-22 01:41:17 +01:00
data . dropna ( subset = [ " assigned_segments " ] , inplace = True )
2020-12-12 23:10:59 +01:00
if ( data . shape [ 0 ] == 0 ) : # data is empty
data [ " local_segment " ] = data [ " timestamps_segment " ] = None
return data
2020-09-28 17:38:47 +02:00
datetime_regex = " [0-9] {4} [ \ -| \ /][0-9] {2} [ \ -| \ /][0-9] {2} [0-9] {2} :[0-9] {2} :[0-9] {2} "
timestamps_regex = " [0-9] {13} "
2020-12-03 00:41:03 +01:00
segment_regex = " \ [( {} # {} , {} ; {} , {} ) \ ] " . format ( time_segment , datetime_regex , datetime_regex , timestamps_regex , timestamps_regex )
2020-08-28 19:53:00 +02:00
data [ " local_segment " ] = data [ " assigned_segments " ] . str . extract ( segment_regex , expand = True )
2020-09-28 17:38:47 +02:00
data = data . drop ( columns = [ " assigned_segments " ] )
2020-09-28 21:53:38 +02:00
data = data . dropna ( subset = [ " local_segment " ] )
2020-12-12 23:10:59 +01:00
if ( data . shape [ 0 ] == 0 ) : # there are no rows belonging to time_segment after droping na
2020-09-28 21:53:38 +02:00
data [ " timestamps_segment " ] = None
else :
data [ [ " local_segment " , " timestamps_segment " ] ] = data [ " local_segment " ] . str . split ( pat = " ; " , n = 1 , expand = True )
2020-11-19 23:27:53 +01:00
# chunk episodes
if ( not data . empty ) and ( " start_timestamp " in data . columns ) and ( " end_timestamp " in data . columns ) :
data = chunk_episodes ( data )
return data
2020-08-28 19:53:00 +02:00
2020-09-29 23:13:34 +02:00
# Each minute could fall into two segments.
# Firstly, we generate two rows for each resampled minute via resample_episodes rule:
# the first row's timestamp column is the start_timestamp, while the second row's timestamp column is the end_timestamp.
# Then, we check if the segments of start_timestamp are the same as the segments of end_timestamp:
# if they are the same (only fall into one segment), we will discard the second row;
# otherwise (fall into two segments), we will keep both.
2020-09-30 00:05:25 +02:00
def chunk_episodes ( sensor_episodes ) :
import copy
import pandas as pd
# Deduplicate episodes
2020-09-29 23:13:34 +02:00
# Drop rows where segments of start_timestamp and end_timestamp are the same
sensor_episodes = sensor_episodes . drop_duplicates ( subset = [ " start_timestamp " , " end_timestamp " , " local_segment " ] , keep = " first " )
# Delete useless columns
for drop_col in [ " utc_date_time " , " local_date_time " , " local_date " , " local_time " , " local_hour " , " local_minute " ] :
del sensor_episodes [ drop_col ]
2020-09-30 00:05:25 +02:00
# Avoid SettingWithCopyWarning
sensor_episodes = sensor_episodes . copy ( )
2020-09-19 02:25:29 +02:00
2020-09-29 23:13:34 +02:00
# Unix timestamp for current segment in milliseconds
2020-10-20 01:36:26 +02:00
sensor_episodes [ [ " segment_start_timestamp " , " segment_end_timestamp " ] ] = sensor_episodes [ " timestamps_segment " ] . str . split ( " , " , expand = True ) . astype ( int )
2020-09-19 02:25:29 +02:00
2020-09-29 23:13:34 +02:00
# Compute chunked timestamp
sensor_episodes [ " chunked_start_timestamp " ] = sensor_episodes [ [ " start_timestamp " , " segment_start_timestamp " ] ] . max ( axis = 1 )
sensor_episodes [ " chunked_end_timestamp " ] = sensor_episodes [ [ " end_timestamp " , " segment_end_timestamp " ] ] . min ( axis = 1 )
2020-09-19 02:25:29 +02:00
2020-09-30 00:05:25 +02:00
# Compute duration: intersection of current row and segment
sensor_episodes [ " duration " ] = ( sensor_episodes [ " chunked_end_timestamp " ] - sensor_episodes [ " chunked_start_timestamp " ] ) / ( 1000 * 60 )
2020-09-19 02:25:29 +02:00
2020-09-29 23:13:34 +02:00
# Merge episodes
2020-10-26 23:47:57 +01:00
cols_for_groupby = [ col for col in sensor_episodes . columns if col not in [ " timestamps_segment " , " timestamp " , " assigned_segments " , " start_datetime " , " end_datetime " , " start_timestamp " , " end_timestamp " , " duration " , " segment_start_timestamp " , " segment_end_timestamp " , " chunked_start_timestamp " , " chunked_end_timestamp " ] ]
2020-09-29 23:13:34 +02:00
sensor_episodes_grouped = sensor_episodes . groupby ( by = cols_for_groupby )
2020-09-30 00:05:25 +02:00
merged_sensor_episodes = sensor_episodes_grouped [ [ " duration " ] ] . sum ( )
2020-09-19 02:25:29 +02:00
merged_sensor_episodes [ " start_timestamp " ] = sensor_episodes_grouped [ " chunked_start_timestamp " ] . first ( )
merged_sensor_episodes [ " end_timestamp " ] = sensor_episodes_grouped [ " chunked_end_timestamp " ] . last ( )
merged_sensor_episodes . reset_index ( inplace = True )
2020-10-26 23:47:57 +01:00
# Compute datetime
merged_sensor_episodes [ " local_start_date_time " ] = pd . to_datetime ( merged_sensor_episodes [ " start_timestamp " ] , unit = " ms " , utc = True )
merged_sensor_episodes [ " local_start_date_time " ] = pd . concat ( [ data [ " local_start_date_time " ] . dt . tz_convert ( tz ) for tz , data in merged_sensor_episodes . groupby ( " local_timezone " ) ] ) . dt . tz_localize ( None ) . apply ( lambda x : x . replace ( microsecond = 0 ) )
merged_sensor_episodes [ " local_end_date_time " ] = pd . to_datetime ( merged_sensor_episodes [ " end_timestamp " ] , unit = " ms " , utc = True )
merged_sensor_episodes [ " local_end_date_time " ] = pd . concat ( [ data [ " local_end_date_time " ] . dt . tz_convert ( tz ) for tz , data in merged_sensor_episodes . groupby ( " local_timezone " ) ] ) . dt . tz_localize ( None ) . apply ( lambda x : x . replace ( microsecond = 0 ) )
2020-09-19 02:25:29 +02:00
return merged_sensor_episodes
2020-12-03 00:41:03 +01:00
def fetch_provider_features ( provider , provider_key , sensor_key , sensor_data_files , time_segments_file ) :
2020-08-28 23:40:23 +02:00
import pandas as pd
from importlib import import_module , util
sensor_features = pd . DataFrame ( columns = [ " local_segment " ] )
2020-12-03 00:41:03 +01:00
time_segments_labels = pd . read_csv ( time_segments_file , header = 0 )
2020-08-28 23:40:23 +02:00
if " FEATURES " not in provider :
2020-10-19 21:07:12 +02:00
raise ValueError ( " Provider config[ {} ][PROVIDERS][ {} ] is missing a FEATURES attribute in config.yaml " . format ( sensor_key . upper ( ) , provider_key . upper ( ) ) )
2020-08-28 23:40:23 +02:00
if provider [ " COMPUTE " ] == True :
2020-10-08 00:11:06 +02:00
code_path = sensor_key + " . " + provider [ " SRC_FOLDER " ] + " .main "
2020-08-28 23:40:23 +02:00
feature_module = import_module ( code_path )
feature_function = getattr ( feature_module , provider [ " SRC_FOLDER " ] + " _features " )
2020-12-03 00:41:03 +01:00
for time_segment in time_segments_labels [ " label " ] :
print ( " {} Processing {} {} {} " . format ( rapids_log_tag , sensor_key , provider_key , time_segment ) )
features = feature_function ( sensor_data_files , time_segment , provider , filter_data_by_segment = filter_data_by_segment , chunk_episodes = chunk_episodes )
2020-11-30 20:42:19 +01:00
if not " local_segment " in features . columns :
2020-12-03 00:41:03 +01:00
raise ValueError ( " The dataframe returned by the " + sensor_key + " provider ' " + provider_key + " ' is missing the ' local_segment ' column added by the ' filter_data_by_segment() ' function. Check the provider script is using such function and is not removing ' local_segment ' by accident ( " + code_path + " ) \n The ' local_segment ' column is used to index a provider ' s features (each row corresponds to a different time segment instance (e.g. 2020-01-01, 2020-01-02, 2020-01-03, etc.) " )
2020-11-30 20:42:19 +01:00
features . columns = [ " {} {} " . format ( " " if col . startswith ( " local_segment " ) else ( sensor_key + " _ " + provider_key + " _ " ) , col ) for col in features . columns ]
2021-01-21 20:58:31 +01:00
sensor_features = pd . concat ( [ sensor_features , features ] , axis = 0 , sort = False )
2020-08-28 23:40:23 +02:00
else :
for feature in provider [ " FEATURES " ] :
sensor_features [ feature ] = None
segment_colums = pd . DataFrame ( )
2020-09-28 17:38:47 +02:00
split_segemnt_columns = sensor_features [ " local_segment " ] . str . split ( pat = " (.*)#(.*),(.*) " , expand = True )
new_segment_columns = split_segemnt_columns . iloc [ : , 1 : 4 ] if split_segemnt_columns . shape [ 1 ] == 5 else pd . DataFrame ( columns = [ " local_segment_label " , " local_segment_start_datetime " , " local_segment_end_datetime " ] )
segment_colums [ [ " local_segment_label " , " local_segment_start_datetime " , " local_segment_end_datetime " ] ] = new_segment_columns
2020-08-28 23:40:23 +02:00
for i in range ( segment_colums . shape [ 1 ] ) :
sensor_features . insert ( 1 + i , segment_colums . columns [ i ] , segment_colums [ segment_colums . columns [ i ] ] )
2020-09-19 02:25:29 +02:00
return sensor_features