253 lines
12 KiB
Python
253 lines
12 KiB
Python
import os
|
|
import sys
|
|
import warnings
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import pandas as pd
|
|
|
|
from sklearn.feature_selection import SequentialFeatureSelector
|
|
from sklearn.model_selection import cross_validate, StratifiedKFold
|
|
from sklearn.naive_bayes import GaussianNB
|
|
from sklearn.linear_model import Lasso
|
|
|
|
|
|
""" Feature selection pipeline: a methods that can be used in the wrapper metod alongside other wrapper contents (hyperparameter tuning etc.).
|
|
|
|
(1) Establish methods for each of the steps in feature selection protocol.
|
|
(2) Ensure that above methods are given only a part of data and use appropriate random seeds - to later simulate use case in production.
|
|
(3) Implement a method which gives graphical exploration of (1) (a) and (b) steps of the feature selection.
|
|
(4) Prepare a core method that can be fit into a wrapper (see sklearn wrapper methods) and integrates methods from (1)
|
|
|
|
"""
|
|
|
|
class FeatureSelection:
|
|
|
|
def __init__(self, X, y):
|
|
self.X = X
|
|
self.y = y
|
|
|
|
|
|
def select_best_feature(self, features, method="remove", ml_type="classification", metric="recall", stored_features=[]):
|
|
"""The method selects the best feature by testing the prediction on the feature set with or without the current feature.
|
|
The "remove" method removes a particular feature and predicts on the test set without it. The "add" method adds a particulat
|
|
feature to the previously established feature set (stored_features). The best feature is selected dependent on the metric
|
|
specified as a parameter.
|
|
|
|
Args:
|
|
df (DataFrame): Input data on which the predictions will be made.
|
|
features (list): List of features to select the best/worst from
|
|
method (str, optional): remove or add features. Defaults to "remove".
|
|
ml_type (str, optional): Either classification or regression ml problem controls the ML algorithm and metric. Defaults to "classification".
|
|
metric (str, optional): Selected metric with which the best/worst feature will be determined. Defaults to "recall".
|
|
stored_features (list, optional): In case if method is 'add', stored features refer to the features that had been previously added. Defaults to [].
|
|
|
|
Raises:
|
|
ValueError: Raises if classification or regression metrics are not recognised if a specific ml_type is selected.
|
|
ValueError: If unknown ml_type is chosen.
|
|
|
|
Returns:
|
|
tuple: name of the best feature, best feature score, best feature score standard deviation.
|
|
"""
|
|
|
|
best_feature = None
|
|
|
|
if ml_type == "classification" and metric not in ['accuracy', 'precision', 'recall', 'f1']:
|
|
raise ValueError("Classification metric not recognized. Please choose 'accuracy', 'precision', 'recall' and/or 'f1'")
|
|
elif ml_type == "regression" and metric not in ['r2']:
|
|
raise ValueError("Regression metric not recognized. Please choose 'r2'")
|
|
|
|
for feat in features:
|
|
if method == "remove":
|
|
pred_features = [col for col in self.X.columns if feat != col] # All but feat
|
|
elif method == "add":
|
|
pred_features = [feat] + stored_features # Feat with stored features
|
|
|
|
X = self.X[pred_features].copy()
|
|
|
|
if ml_type == "classification":
|
|
nb = GaussianNB()
|
|
model_cv = cross_validate(
|
|
nb,
|
|
X=X,
|
|
y=self.y,
|
|
cv=StratifiedKFold(n_splits=5, shuffle=True),
|
|
n_jobs=-1,
|
|
scoring=('accuracy', 'precision', 'recall', 'f1')
|
|
)
|
|
|
|
with warnings.catch_warnings():
|
|
warnings.filterwarnings("ignore", message="Precision is ill-defined and being set to 0.0 due to no predicted samples. Use `zero_division` parameter to control this behavior.")
|
|
|
|
if metric == "accuracy":
|
|
acc = np.mean(model_cv['test_accuracy'])
|
|
acc_std = np.std(model_cv['test_accuracy'])
|
|
|
|
if not best_feature or (acc > best_metric_score):
|
|
best_feature = feat
|
|
best_metric_score = acc
|
|
best_metric_score_std = acc_std
|
|
|
|
elif metric == "precision":
|
|
prec = np.mean(model_cv['test_precision'])
|
|
prec_std = np.std(model_cv['test_precision'])
|
|
|
|
if not best_feature or (prec > best_metric_score):
|
|
best_feature = feat
|
|
best_metric_score = prec
|
|
best_metric_score_std = prec_std
|
|
|
|
elif metric == "recall":
|
|
rec = np.mean(model_cv['test_recall'])
|
|
rec_std = np.std(model_cv['test_recall'])
|
|
|
|
if not best_feature or (rec > best_metric_score):
|
|
best_feature = feat
|
|
best_metric_score = rec
|
|
best_metric_score_std = rec_std
|
|
|
|
else:
|
|
f1 = np.mean(model_cv['test_f1'])
|
|
f1_std = np.std(model_cv['test_f1'])
|
|
|
|
if not best_feature or (f1 > best_metric_score):
|
|
best_feature = feat
|
|
best_metric_score = f1
|
|
best_metric_score_std = f1_std
|
|
|
|
elif ml_type == "regression":
|
|
lass = Lasso()
|
|
model_cv = cross_validate(
|
|
lass,
|
|
X=X,
|
|
y=y,
|
|
cv=StratifiedKFold(n_splits=5, shuffle=True),
|
|
n_jobs=-1,
|
|
scoring=('r2')
|
|
)
|
|
|
|
if metric == "r2":
|
|
r2 = np.mean(model_cv['test_r2'])
|
|
r2_std = np.std(model_cv['test_r2'])
|
|
|
|
if not best_feature or (r2 > best_metric_score):
|
|
best_feature = feat
|
|
best_metric_score = r2
|
|
best_metric_score_std = r2_std
|
|
else:
|
|
raise ValueError("ML type not yet implemented!")
|
|
|
|
return best_feature, best_metric_score, best_metric_score_std
|
|
|
|
|
|
def select_features(self, n_min=20, n_max=50, method="remove", n_not_improve=10):
|
|
"""This method selects a set of features and returns them as a list. It returns number of features
|
|
determined in the interval of [n_min, n_max]. The best score is detected using a removal procedure.
|
|
The procedure sequentially removes the features that attribute the least to the choosen evaluation metric.
|
|
If in this sequence the score ML score is improved the next feature is remove otherwise there is a
|
|
tolerance criteria (n_not_improve) with which the next n remove features are inspected whether
|
|
currently best score is improved. The features are returned in specified interval as a list.
|
|
|
|
Args:
|
|
n_min (int): Minimal amount of features returned.
|
|
n_max (int): Maximal amount of features returned.
|
|
method (str, optional): "remove" or "add" features. Defaults to "remove".
|
|
n_not_improve (int): If the best score is not improved in n that is specified by this parameter
|
|
the method returns index of feature with current best score as a tipping point feature.
|
|
|
|
Returns:
|
|
list: list of selected features
|
|
"""
|
|
|
|
n_features = self.X.shape[1]
|
|
if n_max >= n_features:
|
|
n_max = n_features-1 # The algorithm removes at least one feature
|
|
|
|
if n_min > n_features:
|
|
raise ValueError("The number of features in the dataframe must be at least as n_min+1 parameter.")
|
|
|
|
if n_max < n_min:
|
|
raise ValueError("n_max parameter needs to be greater than or equal to n_min parameter.")
|
|
|
|
features = self.X.columns.tolist()
|
|
feature_importance = []
|
|
if method == "remove":
|
|
best_score = 0
|
|
best_feature_indx = None
|
|
i_worse = 0
|
|
for i in reversed(range(n_features)):
|
|
|
|
if i+1 == n_min:
|
|
break
|
|
|
|
best_feature, best_metric_score, best_metric_score_std = \
|
|
self.select_best_feature(features, method=method, ml_type="classification", metric="recall")
|
|
|
|
feature_importance.append((i+1, best_feature, best_metric_score, best_metric_score_std))
|
|
|
|
features.remove(best_feature)
|
|
|
|
if i <= n_max:
|
|
if best_metric_score >= best_score:
|
|
best_score = best_metric_score
|
|
best_feature_indx = i+1
|
|
i_worse = 0
|
|
else:
|
|
i_worse += 1
|
|
|
|
if i_worse == n_not_improve:
|
|
break
|
|
|
|
feature_importance_df = pd.DataFrame(feature_importance, columns=['i', 'name', 'metric', 'metric_sd'])
|
|
|
|
print(feature_importance_df)
|
|
print("best_feature_indx", best_feature_indx)
|
|
print("best_score", best_score)
|
|
|
|
features_to_remove = feature_importance_df[feature_importance_df["i"] >= best_feature_indx]["name"].values.tolist()
|
|
selected_features = [feat for feat in self.X.columns.tolist() if feat not in features_to_remove]
|
|
|
|
return selected_features
|
|
|
|
"""
|
|
# Selekcijski kriterij značilk v rangu max-min
|
|
# Npr. izbira najboljšega score-a v tem rangu. Ali pa dokler se v tem rangu score zvišuje za 0.0X, ko se ne izberi tisti set značilk.
|
|
|
|
# Set značilk se bo izbral od i=1 do i=index_izbrane_značilke
|
|
|
|
# "Tipping point" značilka mora biti v rangu max-min
|
|
selection_area = feature_importance_df[(feature_importance_df["i"] >= n_min+1) & (feature_importance_df["i"] <= n_max)]
|
|
selection_area.set_index(["i", "name"], inplace=True)
|
|
print(selection_area)
|
|
diffrences = selection_area.diff()
|
|
diffrences.dropna(how='any', inplace=True)
|
|
print(diffrences)
|
|
|
|
# Morda tudi komulativna sumacija? Kjer se preprosto index z najvišjo vrednostjo
|
|
cumulative_sumation = diffrences.cumsum()
|
|
print(cumulative_sumation)
|
|
tipping_feature_indx_1 = cumulative_sumation.idxmax()["metric"]
|
|
print(tipping_feature_indx_1)
|
|
|
|
|
|
# Metoda, ki pusti n_not_improve značilkam, da premagajo dosedajno najboljši score
|
|
tipping_feature_indx_2 = None
|
|
best_score = 0
|
|
i_worse = 0
|
|
for indx, row in selection_area.iterrows():
|
|
if row["metric"] > best_score:
|
|
tipping_feature_indx_2 = indx
|
|
best_score = row["metric"]
|
|
i_worse = 0
|
|
else:
|
|
i_worse += 1
|
|
|
|
if i_worse == n_not_improve:
|
|
break
|
|
|
|
print(tipping_feature_indx_2)
|
|
selection_area.reset_index(inplace=True)
|
|
features_to_remove = feature_importance_df[feature_importance_df["i"] >= tipping_feature_indx_2[0]]["name"].values.tolist()
|
|
|
|
selected_features = [feat for feat in self.X.columns.tolist() if feat not in features_to_remove]
|
|
""" |