245 lines
12 KiB
Python
245 lines
12 KiB
Python
import os
|
|
import sys
|
|
import warnings
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import pandas as pd
|
|
|
|
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif, f_regression
|
|
from sklearn.model_selection import cross_validate, StratifiedKFold, GroupKFold
|
|
from sklearn.naive_bayes import GaussianNB
|
|
from sklearn.linear_model import Lasso
|
|
|
|
|
|
""" Feature selection pipeline: a methods that can be used in the wrapper metod alongside other wrapper contents (hyperparameter tuning etc.).
|
|
|
|
(1) Establish methods for each of the steps in feature selection protocol.
|
|
(2) Ensure that above methods are given only a part of data and use appropriate random seeds - to later simulate use case in production.
|
|
(3) Implement a method which gives graphical exploration of (1) (a) and (b) steps of the feature selection.
|
|
(4) Prepare a core method that can be fit into a wrapper (see sklearn wrapper methods) and integrates methods from (1)
|
|
|
|
"""
|
|
|
|
class FeatureSelection:
|
|
|
|
def __init__(self, X, y, groups):
|
|
self.X = X
|
|
self.y = y
|
|
self.groups = groups
|
|
|
|
|
|
def select_best_feature(self, features, method="remove", ml_category="classification", ml_subcategory="bin", metric="recall", stored_features=[]):
|
|
"""The method selects the best feature by testing the prediction on the feature set with or without the current feature.
|
|
The "remove" method removes a particular feature and predicts on the test set without it. The "add" method adds a particular
|
|
feature to the previously established feature set (stored_features). The best feature is selected dependent on the metric
|
|
specified as a parameter.
|
|
|
|
Args:
|
|
df (DataFrame): Input data on which the predictions will be made.
|
|
features (list): List of features to select the best/worst from
|
|
method (str, optional): remove or add features. Defaults to "remove".
|
|
ml_category (str, optional): Either classification or regression ml problem controls the ML algorithm and metric.
|
|
Defaults to "classification".
|
|
ml_subcategory (str, optional): In case of classification '_bin' for binary classification
|
|
and 'multi' for multiclass classification. For regression an empty string '' is sufficient.
|
|
Defaults to "bin".
|
|
metric (str, optional): Selected metric with which the best/worst feature will be determined. Defaults to "recall".
|
|
stored_features (list, optional): In case if method is 'add', stored features refer to the features that had been previously added. Defaults to [].
|
|
|
|
Raises:
|
|
ValueError: Raises if classification or regression metrics are not recognised if a specific ml_type is selected.
|
|
ValueError: If unknown ml_type is chosen.
|
|
|
|
Returns:
|
|
tuple: name of the best feature, best feature score, best feature score standard deviation.
|
|
"""
|
|
|
|
best_feature = None
|
|
|
|
# Validacije tipov ML in specificiranimi metrikami
|
|
if ml_category == "classification":
|
|
if ml_subcategory == "bin" and metric not in ['accuracy', 'precision', 'recall', 'f1']:
|
|
raise ValueError("Classification metric not recognized. Please choose 'accuracy', 'precision', 'recall' and/or 'f1'")
|
|
elif ml_subcategory == "multi":
|
|
ml_subcategory_error = False
|
|
if metric != "accuracy" and "_" in metric:
|
|
metric_s, metric_t = metric.split("_")
|
|
if metric_s not in ['accuracy', 'precision', 'recall', 'f1'] or metric_t not in ['micro', 'macro', 'weighted']:
|
|
ml_subcategory_error = True
|
|
else:
|
|
ml_subcategory_error = True
|
|
|
|
if ml_subcategory_error:
|
|
raise ValueError(""""Classification metric for multi-class classification must be specified precisely.
|
|
Available metric are: 'accuracy', 'precision', 'recall' and 'f1'.
|
|
Only accuracy must be specified as 'accuracy'.
|
|
For others please add appropriate suffixes: '_macro', '_micro', or '_weighted', e.g., 'f1_macro'""")
|
|
elif ml_category == "regression" and metric not in ['r2']:
|
|
raise ValueError("Regression metric not recognized. Please choose 'r2'")
|
|
|
|
for feat in features:
|
|
if method == "remove":
|
|
pred_features = [col for col in self.X.columns if feat != col] # All but feat
|
|
elif method == "add":
|
|
pred_features = [feat] + stored_features # Feat with stored features
|
|
|
|
X = self.X[pred_features].copy()
|
|
|
|
if self.groups is not None:
|
|
cv = GroupKFold(n_splits=5)
|
|
else:
|
|
cv = StratifiedKFold(n_splits=5, shuffle=True)
|
|
|
|
# See link about scoring for multiclassfication
|
|
# http://iamirmasoud.com/2022/06/19/understanding-micro-macro-and-weighted-averages-for-scikit-learn-metrics-in-multi-class-classification-with-example/
|
|
if ml_category == "classification":
|
|
nb = GaussianNB()
|
|
model_cv = cross_validate(
|
|
nb,
|
|
X=X,
|
|
y=self.y,
|
|
cv=cv,
|
|
groups=self.groups,
|
|
n_jobs=-1,
|
|
scoring=(metric)
|
|
)
|
|
|
|
|
|
elif ml_category == "regression":
|
|
lass = Lasso()
|
|
model_cv = cross_validate(
|
|
lass,
|
|
X=X,
|
|
y=y,
|
|
cv=cv,
|
|
groups=self.groups,
|
|
n_jobs=-1,
|
|
scoring=('r2')
|
|
)
|
|
|
|
else:
|
|
raise ValueError("ML type not yet implemented!")
|
|
|
|
# Section of metrics' scores comparison.
|
|
with warnings.catch_warnings():
|
|
warnings.filterwarnings("ignore", message="Precision is ill-defined and being set to 0.0 due to no predicted samples. Use `zero_division` parameter to control this behavior.")
|
|
|
|
metric_score = np.nanmean(model_cv["test_score"])
|
|
metric_score_std = np.nanstd(model_cv["test_score"])
|
|
|
|
if not best_feature or (metric_score > best_metric_score):
|
|
best_feature = feat
|
|
best_metric_score = metric_score
|
|
best_metric_score_std = metric_score_std
|
|
|
|
return best_feature, best_metric_score, best_metric_score_std
|
|
|
|
|
|
def select_features(self, n_min=20, n_max=50, k=100, method="remove", ml_type="classification_bin", metric="recall", n_tolerance=10):
|
|
"""This method selects a set of features and returns them as a list. It returns number of features
|
|
determined in the interval of [n_min, n_max].
|
|
|
|
The method consists of two steps:
|
|
(1) The method uses sklearn kBest method which selects k best features dependent on the ml_type parameter.
|
|
(2) The sequential features removal procedure is executed. Using the remaing features from (1).
|
|
The best score is detected using a removal procedure. The procedure sequentially removes the features
|
|
that attribute the least to the choosen evaluation metric. If in this sequence the score ML score is
|
|
improved the next feature is remove otherwise there is a tolerance criteria (n_tolerance)
|
|
with which the next n removed features are inspected whether currently best score is improved.
|
|
|
|
Args:
|
|
n_min (int, optional): Minimal amount of features returned.
|
|
n_max (int, optional): Maximal amount of features returned.
|
|
k (int, optional): Determines the k in the k-best features method.
|
|
If None, SelectKBest feature selection does not execute.
|
|
ml_type(str, optional): Type of ML problem. Currently implemented options:
|
|
'classification_bin', 'classification_multi', and 'regression_'
|
|
method (str, optional): "remove" or "add" features. Defaults to "remove".
|
|
n_tolerance (int, optional): If the best score is not improved in n that is specified by this parameter
|
|
the method returns index of feature with current best score as a tipping point feature.
|
|
|
|
Returns:
|
|
list: list of selected features
|
|
"""
|
|
|
|
if k is not None and k <= n_max:
|
|
raise ValueError("The k parameter needs to be greater than the n_max parameter.")
|
|
|
|
# Select k-best feature dependent on the type of ML task
|
|
ml_category, ml_subcategory = ml_type.split("_")
|
|
|
|
if k is not None:
|
|
if ml_category == "classification":
|
|
if ml_subcategory== "bin":
|
|
selector = SelectKBest(mutual_info_classif, k=k)
|
|
elif ml_subcategory== "multi":
|
|
selector = SelectKBest(f_classif, k=k)
|
|
else:
|
|
raise ValueError("Unknown ML type: cannot recognize ML classification subtype.")
|
|
elif ml_category == "regression":
|
|
selector = SelectKBest(f_regression, k=k)
|
|
else:
|
|
raise ValueError("Unknown ML type: cannot recognize ML type. Must be either classification or regression.")
|
|
|
|
selector.fit(self.X, self.y)
|
|
cols_idxs = selector.get_support(indices=True)
|
|
self.X = self.X.iloc[:,cols_idxs]
|
|
|
|
print("All columns (after SelectKBest method):")
|
|
print(self.X.columns)
|
|
|
|
# Sequential feature addition / removal
|
|
n_features = self.X.shape[1]
|
|
if n_max >= n_features:
|
|
n_max = n_features-1 # The algorithm removes at least one feature
|
|
|
|
if n_min > n_features:
|
|
raise ValueError("The number of remaining features in the dataframe must be at least as n_min+1 parameter.")
|
|
|
|
if n_max < n_min:
|
|
raise ValueError("n_max parameter needs to be greater than or equal to n_min parameter.")
|
|
|
|
features = self.X.columns.tolist()
|
|
feature_importance = []
|
|
if method == "remove":
|
|
best_score = 0
|
|
best_feature_indx = None
|
|
i_worse = 0
|
|
for i in reversed(range(n_features)):
|
|
|
|
if i+1 == n_min:
|
|
break
|
|
|
|
best_feature, best_metric_score, best_metric_score_std = \
|
|
self.select_best_feature(features, method=method, ml_category=ml_category, ml_subcategory=ml_subcategory, metric=metric)
|
|
|
|
feature_importance.append((i+1, best_feature, best_metric_score, best_metric_score_std))
|
|
|
|
features.remove(best_feature)
|
|
print("Features left:", i)
|
|
|
|
if i <= n_max:
|
|
if best_metric_score >= best_score:
|
|
best_score = best_metric_score
|
|
best_feature_indx = i+1
|
|
i_worse = 0
|
|
else:
|
|
i_worse += 1
|
|
|
|
if i_worse == n_tolerance:
|
|
break
|
|
|
|
feature_importance_df = pd.DataFrame(feature_importance, columns=['i', 'name', 'metric', 'metric_sd'])
|
|
|
|
print(feature_importance_df)
|
|
print("best_feature_indx", best_feature_indx)
|
|
print("best_score", best_score)
|
|
|
|
features_to_remove = feature_importance_df[feature_importance_df["i"] >= best_feature_indx]["name"].values.tolist()
|
|
selected_features = [feat for feat in self.X.columns.tolist() if feat not in features_to_remove]
|
|
|
|
return selected_features
|
|
|
|
else:
|
|
raise ValueError("Method type not recognized: only the 'remove' method is currently implemented.") |