Implement feature selection methods (WIP).

ml_pipeline
Primoz 2023-04-14 17:20:22 +02:00
parent f672709ea6
commit 10ca47583c
1 changed files with 186 additions and 10 deletions

View File

@ -7,13 +7,12 @@ import pandas as pd
from sklearn.feature_selection import SequentialFeatureSelector from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.naive_bayes import GaussianNB from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import Lasso
""" Feature selection pipeline: a methods that can be used in the wrapper metod alongside other wrapper contents (hyperparameter tuning etc.). """ Feature selection pipeline: a methods that can be used in the wrapper metod alongside other wrapper contents (hyperparameter tuning etc.).
(1) Establish methods for each of the steps in feature selection protocol: (1) Establish methods for each of the steps in feature selection protocol.
(a) feature selection inside specific sensors (sklearn method): returns most important features from all sensors
(b) feature selection between "tuned" sensors: returns filtered sensors, containing most important features retured with (a)
(2) Ensure that above methods are given only a part of data and use appropriate random seeds - to later simulate use case in production. (2) Ensure that above methods are given only a part of data and use appropriate random seeds - to later simulate use case in production.
(3) Implement a method which gives graphical exploration of (1) (a) and (b) steps of the feature selection. (3) Implement a method which gives graphical exploration of (1) (a) and (b) steps of the feature selection.
(4) Prepare a core method that can be fit into a wrapper (see sklearn wrapper methods) and integrates methods from (1) (4) Prepare a core method that can be fit into a wrapper (see sklearn wrapper methods) and integrates methods from (1)
@ -23,19 +22,196 @@ from sklearn.naive_bayes import GaussianNB
class FeatureSelection: class FeatureSelection:
def __init__(self, X_train, X_test, y_train, y_test): # TODO: what about leave-one-subject-out CV? def __init__(self, X_train, X_test, y_train, y_test): # TODO: what about leave-one-subject-out CV?
pass pass # TODO....
def within_sensors_feature_selection(estimator, scoring, tol): def select_best_feature(df, features, method="remove", ml_type="classification", metric="recall", stored_features=[]):
features_list = [] """The method selects the best feature by testing the prediction on the feature set with or without the current feature.
The "remove" method removes a particular feature and predicts on the test set without it. The "add" method adds a particulat
feature to the previously established feature set (stored_features). The best feature is selected dependent on the metric
specified as a parameter.
nb = GaussianNB() Args:
sfs = SequentialFeatureSelector(nb, n_features_to_select='auto', tol=0.02) # Can set n_features to an absolute value -> then remove tol parameter. df (DataFrame): Input data on which the predictions will be made.
features (list): List of features to select the best/worst from
method (str, optional): remove or add features. Defaults to "remove".
ml_type (str, optional): Either classification or regression ml problem controls the ML algorithm and metric. Defaults to "classification".
metric (str, optional): Selected metric with which the best/worst feature will be determined. Defaults to "recall".
stored_features (list, optional): In case if method is 'add', stored features refer to the features that had been previously added. Defaults to [].
Raises:
ValueError: Raises if classification or regression metrics are not recognised if a specific ml_type is selected.
ValueError: If unknown ml_type is chosen.
Returns:
tuple: name of the best feature, best feature score, best feature score standard deviation.
"""
best_feature = None
if ml_type == "classification" and metric not in ['accuracy', 'precision', 'recall', 'f1']:
raise ValueError("Classification metric not recognized. Please choose 'accuracy', 'precision', 'recall' and/or 'f1'")
elif ml_type == "regression" and metric not in ['r2']:
raise ValueError("Regression metric not recognized. Please choose 'r2'")
for feat in features:
if method == "remove":
pred_features = [col for col in df.columns if feat != col] # All but feat
elif method == "add":
pred_features = [feat] + stored_features # Feat with stored features
X, y = df.drop(columns=['target', 'pid'])[pred_features], df['target']
if ml_type == "classification":
nb = GaussianNB()
model_cv = cross_validate(
nb,
X=X,
y=y,
cv=StratifiedKFold(n_splits=5, shuffle=True),
n_jobs=-1,
scoring=('accuracy', 'precision', 'recall', 'f1')
)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="Precision is ill-defined and being set to 0.0 due to no predicted samples. Use `zero_division` parameter to control this behavior.")
if metric == "accuracy":
acc = np.mean(model_cv['test_accuracy'])
acc_std = np.std(model_cv['test_accuracy'])
if not best_feature or (acc > best_metric_score):
best_feature = feat
best_metric_score = acc
best_metric_score_std = acc_std
elif metric == "precision":
prec = np.mean(model_cv['test_precision'])
prec_std = np.std(model_cv['test_precision'])
if not best_feature or (prec > best_metric_score):
best_feature = feat
best_metric_score = prec
best_metric_score_std = prec_std
elif metric == "recall":
rec = np.mean(model_cv['test_recall'])
rec_std = np.std(model_cv['test_recall'])
if not best_feature or (rec > best_metric_score):
best_feature = feat
best_metric_score = rec
best_metric_score_std = rec_std
else:
f1 = np.mean(model_cv['test_f1'])
f1_std = np.std(model_cv['test_f1'])
if not best_feature or (f1 > best_metric_score):
best_feature = feat
best_metric_score = f1
best_metric_score_std = f1_std
elif ml_type == "regression":
lass = Lasso()
model_cv = cross_validate(
lass,
X=X,
y=y,
cv=StratifiedKFold(n_splits=5, shuffle=True),
n_jobs=-1,
scoring=('r2')
)
if metric == "r2":
r2 = np.mean(model_cv['test_r2'])
r2_std = np.std(model_cv['test_r2'])
if not best_feature or (r2 > best_metric_score):
best_feature = feat
best_metric_score = r2
best_metric_score_std = r2_std
else:
raise ValueError("ML type not yet implemented!")
return best_feature, best_metric_score, best_metric_score_std
return features_list def select_features(df, n_min=20, n_max=50, method="remove", n_not_improve=10):
def between_sensors_feature_selection(): n_features = df.shape[1] - 2 # -2 beacause pid and target are not considered
if n_max > n_features:
n_max = n_features
if n_min > n_features:
raise ValueError("The number of features in the dataframe must be at least as n_min-1 parameter.")
if n_max < n_min:
raise ValueError("n_max parameter needs to be greater than or equal to n_min parameter.")
features = df.columns.tolist()
features.remove("pid")
features.remove("target")
feature_importance = []
if method == "remove":
for i in reversed(range(n_features)):
best_feature, best_metric_score, best_metric_score_std = \
self.select_best_feature(df, features, method=method, ml_type="classification", metric="recall")
feature_importance.append(tuple(i+1, best_feature, best_metric_score, best_metric_score_std))
features.remove(best_feature)
feature_importance_df = pd.DataFrame(feature_importance, columns=['i', 'name', 'metric', 'metric_sd'])
# Selekcijski kriterij značilk v rangu max-min
# Npr. izbira najboljšega score-a v tem rangu. Ali pa dokler se v tem rangu score zvišuje za 0.0X, ko se ne izberi tisti set značilk.
# Set značilk se bo izbral od i=1 do i=index_izbrane_značilke
# "Tipping point" značilka mora biti v rangu max-min
selection_area = feature_importance_df[(feature_importance_df["i"] >= n_min+1) & (feature_importance_df["i"] <= n_max)]
selection_area.set_index(["i", "name"], inplace=True)
diffrences = selection_area.diff()
diffrences.dropna(how='any', inplace=True)
# Morda tudi komulativna sumacija? Kjer se preprosto index z najvišjo vrednostjo
cumulative_sumation = diffrences.cumsum()
tipping_feature_indx_1 = cumulative_sumation.idxmax()["metric"]
# Zelo konzervativna metoda, ki ob prvem neizboljšanjem rezultata preneha z iskanjem boljše alternative
tipping_feature_indx_2 = None
for indx, row in diffrences.iterrows():
if row["metric"] > 0:
tipping_feature_indx_2 = indx
else:
break
# Metoda, ki pusti n_not_improve značilkam, da premagajo dosedajno najboljši score
tipping_feature_indx_3 = None
cum_sum_score = 0
i_worse = 0
# TODO: morda bi bilo smisleno združiti diff, cumsum in scores stolpce ...
for indx, row in selection_area.iterrows():
if row["metric"] > 0:
tipping_feature_indx_3 = indx
cum_sum_score += row["metric"]
i_worse = 0
else:
i_worse += 1
if i_worse == n_not_improve:
break
def make_predictions_with_features(df, groups_substrings, include_group=True, with_cols=[], print_flag=False):
pass pass
def vizualize_feature_selection_process(): def vizualize_feature_selection_process():