2022-07-20 15:51:22 +02:00
import pandas as pd
import numpy as np
import math , sys
2022-08-31 12:18:50 +02:00
from sklearn . impute import KNNImputer
2022-07-20 15:51:22 +02:00
2022-08-31 12:18:50 +02:00
def straw_cleaning ( sensor_data_files , provider , target = None ) :
2022-07-20 15:51:22 +02:00
features = pd . read_csv ( sensor_data_files [ " sensor_data " ] [ 0 ] )
2022-08-31 12:18:50 +02:00
esm_cols = features . loc [ : , features . columns . str . startswith ( ' phone_esm_straw ' ) ] # Get target columns
#Filter all rows that do not have the target column available
# get target from config or function parameter
if target is None :
features = features [ features [ esm_cols [ 0 ] ] . notna ( ) ]
else :
features = features [ features [ ' phone_esm_straw_ ' + target ] . notna ( ) ]
2022-07-22 17:31:30 +02:00
# TODO: reorder the cleaning steps so it makes sense for the analysis
# TODO: add conditions that differentiates cleaning steps for standardized and nonstandardized features, for this
# the snakemake rules will also have to come with additional parameter (in rules/features.smk)
2022-08-31 12:18:50 +02:00
# TODO: imputate the rows where the participants have at least 2 rows (2 time segments) - error prevention (has to be tested)
# TODO: because of different imputation logic (e.g., the phone_data_yield parameter for phone features) the imputation has to
# be planned accordingly. Should the phone features first be imputated with 0 and only then general kNN imputation is executed
# i.e., on the rows that are missing when E4 and phone features availability is not synchronized. CHECK phone_data_yield feat.
# A lot of imputation types/levels (1) imputation related to feature's content (2) imputation related to phone / empatica
# structual specifics (3) general imputation which is needed when types of features desynchronization is present (row is not full)
# because of the lack of the availability. Secondly, there's a high importance that features data frame is checked if and NaN
# values still exist.
2022-07-20 15:51:22 +02:00
# Impute selected features event
impute_phone_features = provider [ " IMPUTE_PHONE_SELECTED_EVENT_FEATURES " ]
if impute_phone_features [ " COMPUTE " ] :
if not ' phone_data_yield_rapids_ratiovalidyieldedminutes ' in features . columns :
raise KeyError ( " RAPIDS provider needs to impute the selected event features based on phone_data_yield_rapids_ratiovalidyieldedminutes column, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include ' ratiovalidyieldedminutes ' in [FEATURES]. " )
2022-07-22 17:31:30 +02:00
# TODO: if the type of the imputation will vary for different groups of features make conditional imputations here
2022-07-20 15:51:22 +02:00
phone_cols = [ col for col in features if \
col . startswith ( ' phone_applications_foreground_rapids_ ' ) or
col . startswith ( ' phone_battery_rapids_ ' ) or
col . startswith ( ' phone_calls_rapids_ ' ) or
col . startswith ( ' phone_keyboard_rapids_ ' ) or
col . startswith ( ' phone_messages_rapids_ ' ) or
col . startswith ( ' phone_screen_rapids_ ' ) or
col . startswith ( ' phone_wifi_ ' ) ]
mask = features [ ' phone_data_yield_rapids_ratiovalidyieldedminutes ' ] > impute_phone_features [ ' MIN_DATA_YIELDED_MINUTES_TO_IMPUTE ' ]
2022-07-22 17:31:30 +02:00
features . loc [ mask , phone_cols ] = impute ( features [ mask ] [ phone_cols ] , method = impute_phone_features [ " TYPE " ] . lower ( ) )
2022-07-20 15:51:22 +02:00
# Drop rows with the value of data_yield_column less than data_yield_ratio_threshold
data_yield_unit = provider [ " DATA_YIELD_FEATURE " ] . split ( " _ " ) [ 3 ] . lower ( )
data_yield_column = " phone_data_yield_rapids_ratiovalidyielded " + data_yield_unit
if not data_yield_column in features . columns :
raise KeyError ( f " RAPIDS provider needs to impute the selected event features based on { data_yield_column } column, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include ' ratiovalidyielded { data_yield_unit } ' in [FEATURES]. " )
2022-08-19 15:10:56 +02:00
if provider [ " DATA_YIELD_RATIO_THRESHOLD " ] :
features = features [ features [ data_yield_column ] > = provider [ " DATA_YIELD_RATIO_THRESHOLD " ] ]
2022-07-20 15:51:22 +02:00
2022-07-22 17:31:30 +02:00
# Remove cols if threshold of NaN values is passed
features = features . loc [ : , features . isna ( ) . sum ( ) < provider [ " COLS_NAN_THRESHOLD " ] * features . shape [ 0 ] ]
2022-07-20 15:51:22 +02:00
# Remove cols where variance is 0
if provider [ " COLS_VAR_THRESHOLD " ] :
features . drop ( features . std ( ) [ features . std ( ) == 0 ] . index . values , axis = 1 , inplace = True )
2022-07-22 17:31:30 +02:00
# Preserve esm cols if deleted (has to come after drop cols operations)
for esm in esm_cols :
if esm not in features :
features [ esm ] = esm_cols [ esm ]
2022-07-20 15:51:22 +02:00
# Drop highly correlated features - To-Do še en thershold var, ki je v config + kako se tretirajo NaNs?
drop_corr_features = provider [ " DROP_HIGHLY_CORRELATED_FEATURES " ]
if drop_corr_features [ " COMPUTE " ] :
2022-07-22 17:31:30 +02:00
2022-07-20 15:51:22 +02:00
numerical_cols = features . select_dtypes ( include = np . number ) . columns . tolist ( )
2022-07-22 17:31:30 +02:00
# Remove columns where NaN count threshold is passed
valid_features = features [ numerical_cols ] . loc [ : , features [ numerical_cols ] . isna ( ) . sum ( ) < drop_corr_features [ ' MIN_OVERLAP_FOR_CORR_THRESHOLD ' ] * features [ numerical_cols ] . shape [ 0 ] ]
2022-07-20 15:51:22 +02:00
2022-07-22 17:31:30 +02:00
cor_matrix = valid_features . corr ( method = ' spearman ' ) . abs ( )
2022-07-20 15:51:22 +02:00
upper_tri = cor_matrix . where ( np . triu ( np . ones ( cor_matrix . shape ) , k = 1 ) . astype ( np . bool ) )
to_drop = [ column for column in upper_tri . columns if any ( upper_tri [ column ] > drop_corr_features [ " CORR_THRESHOLD " ] ) ]
features . drop ( to_drop , axis = 1 , inplace = True )
# Remove rows if threshold of NaN values is passed
2022-07-22 17:31:30 +02:00
min_count = math . ceil ( ( 1 - provider [ " ROWS_NAN_THRESHOLD " ] ) * features . shape [ 1 ] ) # minimal not nan values in row
2022-07-20 15:51:22 +02:00
features . dropna ( axis = 0 , thresh = min_count , inplace = True )
return features
def impute ( df , method = ' zero ' ) :
2022-07-22 17:31:30 +02:00
def k_nearest ( df ) : # TODO: if needed, implement k-nearest imputation / interpolation
2022-08-31 12:18:50 +02:00
imputer = KNNImputer ( n_neighbors = 3 )
return pd . DataFrame ( imputer . fit_transform ( df ) , columns = df . columns )
2022-07-22 17:31:30 +02:00
2022-07-20 15:51:22 +02:00
return { # rest of the columns should be imputed with the selected method
' zero ' : df . fillna ( 0 ) ,
' mean ' : df . fillna ( df . mean ( ) ) ,
' median ' : df . fillna ( df . median ( ) ) ,
2022-07-22 17:31:30 +02:00
' k-nearest ' : k_nearest ( df )
2022-07-20 15:51:22 +02:00
} [ method ]