221 lines
10 KiB
Python
221 lines
10 KiB
Python
import os
|
|
import sys
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import pandas as pd
|
|
|
|
from sklearn.feature_selection import SequentialFeatureSelector
|
|
from sklearn.naive_bayes import GaussianNB
|
|
from sklearn.linear_model import Lasso
|
|
|
|
|
|
""" Feature selection pipeline: a methods that can be used in the wrapper metod alongside other wrapper contents (hyperparameter tuning etc.).
|
|
|
|
(1) Establish methods for each of the steps in feature selection protocol.
|
|
(2) Ensure that above methods are given only a part of data and use appropriate random seeds - to later simulate use case in production.
|
|
(3) Implement a method which gives graphical exploration of (1) (a) and (b) steps of the feature selection.
|
|
(4) Prepare a core method that can be fit into a wrapper (see sklearn wrapper methods) and integrates methods from (1)
|
|
|
|
"""
|
|
|
|
class FeatureSelection:
|
|
|
|
def __init__(self, X_train, X_test, y_train, y_test): # TODO: what about leave-one-subject-out CV?
|
|
pass # TODO....
|
|
|
|
|
|
def select_best_feature(df, features, method="remove", ml_type="classification", metric="recall", stored_features=[]):
|
|
"""The method selects the best feature by testing the prediction on the feature set with or without the current feature.
|
|
The "remove" method removes a particular feature and predicts on the test set without it. The "add" method adds a particulat
|
|
feature to the previously established feature set (stored_features). The best feature is selected dependent on the metric
|
|
specified as a parameter.
|
|
|
|
Args:
|
|
df (DataFrame): Input data on which the predictions will be made.
|
|
features (list): List of features to select the best/worst from
|
|
method (str, optional): remove or add features. Defaults to "remove".
|
|
ml_type (str, optional): Either classification or regression ml problem controls the ML algorithm and metric. Defaults to "classification".
|
|
metric (str, optional): Selected metric with which the best/worst feature will be determined. Defaults to "recall".
|
|
stored_features (list, optional): In case if method is 'add', stored features refer to the features that had been previously added. Defaults to [].
|
|
|
|
Raises:
|
|
ValueError: Raises if classification or regression metrics are not recognised if a specific ml_type is selected.
|
|
ValueError: If unknown ml_type is chosen.
|
|
|
|
Returns:
|
|
tuple: name of the best feature, best feature score, best feature score standard deviation.
|
|
"""
|
|
|
|
best_feature = None
|
|
|
|
if ml_type == "classification" and metric not in ['accuracy', 'precision', 'recall', 'f1']:
|
|
raise ValueError("Classification metric not recognized. Please choose 'accuracy', 'precision', 'recall' and/or 'f1'")
|
|
elif ml_type == "regression" and metric not in ['r2']:
|
|
raise ValueError("Regression metric not recognized. Please choose 'r2'")
|
|
|
|
for feat in features:
|
|
if method == "remove":
|
|
pred_features = [col for col in df.columns if feat != col] # All but feat
|
|
elif method == "add":
|
|
pred_features = [feat] + stored_features # Feat with stored features
|
|
|
|
X, y = df.drop(columns=['target', 'pid'])[pred_features], df['target']
|
|
|
|
if ml_type == "classification":
|
|
nb = GaussianNB()
|
|
model_cv = cross_validate(
|
|
nb,
|
|
X=X,
|
|
y=y,
|
|
cv=StratifiedKFold(n_splits=5, shuffle=True),
|
|
n_jobs=-1,
|
|
scoring=('accuracy', 'precision', 'recall', 'f1')
|
|
)
|
|
|
|
with warnings.catch_warnings():
|
|
warnings.filterwarnings("ignore", message="Precision is ill-defined and being set to 0.0 due to no predicted samples. Use `zero_division` parameter to control this behavior.")
|
|
|
|
if metric == "accuracy":
|
|
acc = np.mean(model_cv['test_accuracy'])
|
|
acc_std = np.std(model_cv['test_accuracy'])
|
|
|
|
if not best_feature or (acc > best_metric_score):
|
|
best_feature = feat
|
|
best_metric_score = acc
|
|
best_metric_score_std = acc_std
|
|
|
|
elif metric == "precision":
|
|
prec = np.mean(model_cv['test_precision'])
|
|
prec_std = np.std(model_cv['test_precision'])
|
|
|
|
if not best_feature or (prec > best_metric_score):
|
|
best_feature = feat
|
|
best_metric_score = prec
|
|
best_metric_score_std = prec_std
|
|
|
|
elif metric == "recall":
|
|
rec = np.mean(model_cv['test_recall'])
|
|
rec_std = np.std(model_cv['test_recall'])
|
|
|
|
if not best_feature or (rec > best_metric_score):
|
|
best_feature = feat
|
|
best_metric_score = rec
|
|
best_metric_score_std = rec_std
|
|
|
|
else:
|
|
f1 = np.mean(model_cv['test_f1'])
|
|
f1_std = np.std(model_cv['test_f1'])
|
|
|
|
if not best_feature or (f1 > best_metric_score):
|
|
best_feature = feat
|
|
best_metric_score = f1
|
|
best_metric_score_std = f1_std
|
|
|
|
elif ml_type == "regression":
|
|
lass = Lasso()
|
|
model_cv = cross_validate(
|
|
lass,
|
|
X=X,
|
|
y=y,
|
|
cv=StratifiedKFold(n_splits=5, shuffle=True),
|
|
n_jobs=-1,
|
|
scoring=('r2')
|
|
)
|
|
|
|
if metric == "r2":
|
|
r2 = np.mean(model_cv['test_r2'])
|
|
r2_std = np.std(model_cv['test_r2'])
|
|
|
|
if not best_feature or (r2 > best_metric_score):
|
|
best_feature = feat
|
|
best_metric_score = r2
|
|
best_metric_score_std = r2_std
|
|
else:
|
|
raise ValueError("ML type not yet implemented!")
|
|
|
|
return best_feature, best_metric_score, best_metric_score_std
|
|
|
|
|
|
def select_features(df, n_min=20, n_max=50, method="remove", n_not_improve=10):
|
|
|
|
n_features = df.shape[1] - 2 # -2 beacause pid and target are not considered
|
|
if n_max > n_features:
|
|
n_max = n_features
|
|
|
|
if n_min > n_features:
|
|
raise ValueError("The number of features in the dataframe must be at least as n_min-1 parameter.")
|
|
|
|
if n_max < n_min:
|
|
raise ValueError("n_max parameter needs to be greater than or equal to n_min parameter.")
|
|
|
|
features = df.columns.tolist()
|
|
features.remove("pid")
|
|
features.remove("target")
|
|
feature_importance = []
|
|
if method == "remove":
|
|
for i in reversed(range(n_features)):
|
|
|
|
best_feature, best_metric_score, best_metric_score_std = \
|
|
self.select_best_feature(df, features, method=method, ml_type="classification", metric="recall")
|
|
feature_importance.append(tuple(i+1, best_feature, best_metric_score, best_metric_score_std))
|
|
|
|
features.remove(best_feature)
|
|
|
|
feature_importance_df = pd.DataFrame(feature_importance, columns=['i', 'name', 'metric', 'metric_sd'])
|
|
|
|
# Selekcijski kriterij značilk v rangu max-min
|
|
# Npr. izbira najboljšega score-a v tem rangu. Ali pa dokler se v tem rangu score zvišuje za 0.0X, ko se ne izberi tisti set značilk.
|
|
|
|
# Set značilk se bo izbral od i=1 do i=index_izbrane_značilke
|
|
|
|
# "Tipping point" značilka mora biti v rangu max-min
|
|
|
|
selection_area = feature_importance_df[(feature_importance_df["i"] >= n_min+1) & (feature_importance_df["i"] <= n_max)]
|
|
selection_area.set_index(["i", "name"], inplace=True)
|
|
diffrences = selection_area.diff()
|
|
diffrences.dropna(how='any', inplace=True)
|
|
|
|
# Morda tudi komulativna sumacija? Kjer se preprosto index z najvišjo vrednostjo
|
|
cumulative_sumation = diffrences.cumsum()
|
|
tipping_feature_indx_1 = cumulative_sumation.idxmax()["metric"]
|
|
|
|
# Zelo konzervativna metoda, ki ob prvem neizboljšanjem rezultata preneha z iskanjem boljše alternative
|
|
tipping_feature_indx_2 = None
|
|
for indx, row in diffrences.iterrows():
|
|
if row["metric"] > 0:
|
|
tipping_feature_indx_2 = indx
|
|
else:
|
|
break
|
|
|
|
# Metoda, ki pusti n_not_improve značilkam, da premagajo dosedajno najboljši score
|
|
tipping_feature_indx_3 = None
|
|
cum_sum_score = 0
|
|
i_worse = 0
|
|
# TODO: morda bi bilo smisleno združiti diff, cumsum in scores stolpce ...
|
|
for indx, row in selection_area.iterrows():
|
|
if row["metric"] > 0:
|
|
tipping_feature_indx_3 = indx
|
|
cum_sum_score += row["metric"]
|
|
i_worse = 0
|
|
else:
|
|
i_worse += 1
|
|
|
|
if i_worse == n_not_improve:
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def make_predictions_with_features(df, groups_substrings, include_group=True, with_cols=[], print_flag=False):
|
|
pass
|
|
|
|
def vizualize_feature_selection_process():
|
|
pass
|
|
|
|
def execute_feature_selection_step():
|
|
pass |