2023-02-15 12:27:39 +01:00
import os
import sys
import numpy as np
import matplotlib . pyplot as plt
import pandas as pd
2023-02-20 11:51:34 +01:00
from sklearn . feature_selection import SequentialFeatureSelector
from sklearn . naive_bayes import GaussianNB
2023-04-14 17:20:22 +02:00
from sklearn . linear_model import Lasso
2023-02-20 11:51:34 +01:00
2023-02-15 12:27:39 +01:00
""" Feature selection pipeline: a methods that can be used in the wrapper metod alongside other wrapper contents (hyperparameter tuning etc.).
2023-04-14 17:20:22 +02:00
( 1 ) Establish methods for each of the steps in feature selection protocol .
2023-02-15 12:27:39 +01:00
( 2 ) Ensure that above methods are given only a part of data and use appropriate random seeds - to later simulate use case in production .
( 3 ) Implement a method which gives graphical exploration of ( 1 ) ( a ) and ( b ) steps of the feature selection .
( 4 ) Prepare a core method that can be fit into a wrapper ( see sklearn wrapper methods ) and integrates methods from ( 1 )
2023-02-20 11:51:34 +01:00
"""
class FeatureSelection :
def __init__ ( self , X_train , X_test , y_train , y_test ) : # TODO: what about leave-one-subject-out CV?
2023-04-14 17:20:22 +02:00
pass # TODO....
def select_best_feature ( df , features , method = " remove " , ml_type = " classification " , metric = " recall " , stored_features = [ ] ) :
""" The method selects the best feature by testing the prediction on the feature set with or without the current feature.
The " remove " method removes a particular feature and predicts on the test set without it . The " add " method adds a particulat
feature to the previously established feature set ( stored_features ) . The best feature is selected dependent on the metric
specified as a parameter .
Args :
df ( DataFrame ) : Input data on which the predictions will be made .
features ( list ) : List of features to select the best / worst from
method ( str , optional ) : remove or add features . Defaults to " remove " .
ml_type ( str , optional ) : Either classification or regression ml problem controls the ML algorithm and metric . Defaults to " classification " .
metric ( str , optional ) : Selected metric with which the best / worst feature will be determined . Defaults to " recall " .
stored_features ( list , optional ) : In case if method is ' add ' , stored features refer to the features that had been previously added . Defaults to [ ] .
Raises :
ValueError : Raises if classification or regression metrics are not recognised if a specific ml_type is selected .
ValueError : If unknown ml_type is chosen .
Returns :
tuple : name of the best feature , best feature score , best feature score standard deviation .
"""
best_feature = None
if ml_type == " classification " and metric not in [ ' accuracy ' , ' precision ' , ' recall ' , ' f1 ' ] :
raise ValueError ( " Classification metric not recognized. Please choose ' accuracy ' , ' precision ' , ' recall ' and/or ' f1 ' " )
elif ml_type == " regression " and metric not in [ ' r2 ' ] :
raise ValueError ( " Regression metric not recognized. Please choose ' r2 ' " )
for feat in features :
if method == " remove " :
pred_features = [ col for col in df . columns if feat != col ] # All but feat
elif method == " add " :
pred_features = [ feat ] + stored_features # Feat with stored features
X , y = df . drop ( columns = [ ' target ' , ' pid ' ] ) [ pred_features ] , df [ ' target ' ]
if ml_type == " classification " :
nb = GaussianNB ( )
model_cv = cross_validate (
nb ,
X = X ,
y = y ,
cv = StratifiedKFold ( n_splits = 5 , shuffle = True ) ,
n_jobs = - 1 ,
scoring = ( ' accuracy ' , ' precision ' , ' recall ' , ' f1 ' )
)
with warnings . catch_warnings ( ) :
warnings . filterwarnings ( " ignore " , message = " Precision is ill-defined and being set to 0.0 due to no predicted samples. Use `zero_division` parameter to control this behavior. " )
if metric == " accuracy " :
acc = np . mean ( model_cv [ ' test_accuracy ' ] )
acc_std = np . std ( model_cv [ ' test_accuracy ' ] )
if not best_feature or ( acc > best_metric_score ) :
best_feature = feat
best_metric_score = acc
best_metric_score_std = acc_std
elif metric == " precision " :
prec = np . mean ( model_cv [ ' test_precision ' ] )
prec_std = np . std ( model_cv [ ' test_precision ' ] )
if not best_feature or ( prec > best_metric_score ) :
best_feature = feat
best_metric_score = prec
best_metric_score_std = prec_std
elif metric == " recall " :
rec = np . mean ( model_cv [ ' test_recall ' ] )
rec_std = np . std ( model_cv [ ' test_recall ' ] )
if not best_feature or ( rec > best_metric_score ) :
best_feature = feat
best_metric_score = rec
best_metric_score_std = rec_std
else :
f1 = np . mean ( model_cv [ ' test_f1 ' ] )
f1_std = np . std ( model_cv [ ' test_f1 ' ] )
if not best_feature or ( f1 > best_metric_score ) :
best_feature = feat
best_metric_score = f1
best_metric_score_std = f1_std
elif ml_type == " regression " :
lass = Lasso ( )
model_cv = cross_validate (
lass ,
X = X ,
y = y ,
cv = StratifiedKFold ( n_splits = 5 , shuffle = True ) ,
n_jobs = - 1 ,
scoring = ( ' r2 ' )
)
if metric == " r2 " :
r2 = np . mean ( model_cv [ ' test_r2 ' ] )
r2_std = np . std ( model_cv [ ' test_r2 ' ] )
if not best_feature or ( r2 > best_metric_score ) :
best_feature = feat
best_metric_score = r2
best_metric_score_std = r2_std
else :
raise ValueError ( " ML type not yet implemented! " )
return best_feature , best_metric_score , best_metric_score_std
def select_features ( df , n_min = 20 , n_max = 50 , method = " remove " , n_not_improve = 10 ) :
n_features = df . shape [ 1 ] - 2 # -2 beacause pid and target are not considered
if n_max > n_features :
n_max = n_features
if n_min > n_features :
raise ValueError ( " The number of features in the dataframe must be at least as n_min-1 parameter. " )
if n_max < n_min :
raise ValueError ( " n_max parameter needs to be greater than or equal to n_min parameter. " )
features = df . columns . tolist ( )
features . remove ( " pid " )
features . remove ( " target " )
feature_importance = [ ]
if method == " remove " :
for i in reversed ( range ( n_features ) ) :
best_feature , best_metric_score , best_metric_score_std = \
self . select_best_feature ( df , features , method = method , ml_type = " classification " , metric = " recall " )
feature_importance . append ( tuple ( i + 1 , best_feature , best_metric_score , best_metric_score_std ) )
features . remove ( best_feature )
feature_importance_df = pd . DataFrame ( feature_importance , columns = [ ' i ' , ' name ' , ' metric ' , ' metric_sd ' ] )
# Selekcijski kriterij značilk v rangu max-min
# Npr. izbira najboljšega score-a v tem rangu. Ali pa dokler se v tem rangu score zvišuje za 0.0X, ko se ne izberi tisti set značilk.
# Set značilk se bo izbral od i=1 do i=index_izbrane_značilke
# "Tipping point" značilka mora biti v rangu max-min
selection_area = feature_importance_df [ ( feature_importance_df [ " i " ] > = n_min + 1 ) & ( feature_importance_df [ " i " ] < = n_max ) ]
selection_area . set_index ( [ " i " , " name " ] , inplace = True )
diffrences = selection_area . diff ( )
diffrences . dropna ( how = ' any ' , inplace = True )
# Morda tudi komulativna sumacija? Kjer se preprosto index z najvišjo vrednostjo
cumulative_sumation = diffrences . cumsum ( )
tipping_feature_indx_1 = cumulative_sumation . idxmax ( ) [ " metric " ]
# Zelo konzervativna metoda, ki ob prvem neizboljšanjem rezultata preneha z iskanjem boljše alternative
tipping_feature_indx_2 = None
for indx , row in diffrences . iterrows ( ) :
if row [ " metric " ] > 0 :
tipping_feature_indx_2 = indx
else :
break
# Metoda, ki pusti n_not_improve značilkam, da premagajo dosedajno najboljši score
tipping_feature_indx_3 = None
cum_sum_score = 0
i_worse = 0
# TODO: morda bi bilo smisleno združiti diff, cumsum in scores stolpce ...
for indx , row in selection_area . iterrows ( ) :
if row [ " metric " ] > 0 :
tipping_feature_indx_3 = indx
cum_sum_score + = row [ " metric " ]
i_worse = 0
else :
i_worse + = 1
if i_worse == n_not_improve :
break
def make_predictions_with_features ( df , groups_substrings , include_group = True , with_cols = [ ] , print_flag = False ) :
2023-02-20 11:51:34 +01:00
pass
def vizualize_feature_selection_process ( ) :
pass
def execute_feature_selection_step ( ) :
pass