import os import sys import warnings import numpy as np import matplotlib.pyplot as plt import pandas as pd from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif, f_regression from sklearn.model_selection import cross_validate, StratifiedKFold, GroupKFold from sklearn.naive_bayes import GaussianNB from sklearn.linear_model import Lasso """ Feature selection pipeline: a methods that can be used in the wrapper metod alongside other wrapper contents (hyperparameter tuning etc.). (1) Establish methods for each of the steps in feature selection protocol. (2) Ensure that above methods are given only a part of data and use appropriate random seeds - to later simulate use case in production. (3) Implement a method which gives graphical exploration of (1) (a) and (b) steps of the feature selection. (4) Prepare a core method that can be fit into a wrapper (see sklearn wrapper methods) and integrates methods from (1) """ class FeatureSelection: def __init__(self, X, y, groups): self.X = X self.y = y self.groups = groups def select_best_feature(self, features, method="remove", ml_category="classification", ml_subcategory="bin", metric="recall", stored_features=[]): """The method selects the best feature by testing the prediction on the feature set with or without the current feature. The "remove" method removes a particular feature and predicts on the test set without it. The "add" method adds a particular feature to the previously established feature set (stored_features). The best feature is selected dependent on the metric specified as a parameter. Args: df (DataFrame): Input data on which the predictions will be made. features (list): List of features to select the best/worst from method (str, optional): remove or add features. Defaults to "remove". ml_category (str, optional): Either classification or regression ml problem controls the ML algorithm and metric. Defaults to "classification". ml_subcategory (str, optional): In case of classification '_bin' for binary classification and 'multi' for multiclass classification. For regression an empty string '' is sufficient. Defaults to "bin". metric (str, optional): Selected metric with which the best/worst feature will be determined. Defaults to "recall". stored_features (list, optional): In case if method is 'add', stored features refer to the features that had been previously added. Defaults to []. Raises: ValueError: Raises if classification or regression metrics are not recognised if a specific ml_type is selected. ValueError: If unknown ml_type is chosen. Returns: tuple: name of the best feature, best feature score, best feature score standard deviation. """ best_feature = None # Validacije tipov ML in specificiranimi metrikami if ml_category == "classification": if ml_subcategory == "bin" and metric not in ['accuracy', 'precision', 'recall', 'f1']: raise ValueError("Classification metric not recognized. Please choose 'accuracy', 'precision', 'recall' and/or 'f1'") elif ml_subcategory == "multi": ml_subcategory_error = False if metric != "accuracy" and "_" in metric: metric_s, metric_t = metric.split("_") if metric_s not in ['accuracy', 'precision', 'recall', 'f1'] or metric_t not in ['micro', 'macro', 'weighted']: ml_subcategory_error = True else: ml_subcategory_error = True if ml_subcategory_error: raise ValueError(""""Classification metric for multi-class classification must be specified precisely. Available metric are: 'accuracy', 'precision', 'recall' and 'f1'. Only accuracy must be specified as 'accuracy'. For others please add appropriate suffixes: '_macro', '_micro', or '_weighted', e.g., 'f1_macro'""") elif ml_category == "regression" and metric not in ['r2']: raise ValueError("Regression metric not recognized. Please choose 'r2'") for feat in features: if method == "remove": pred_features = [col for col in self.X.columns if feat != col] # All but feat elif method == "add": pred_features = [feat] + stored_features # Feat with stored features X = self.X[pred_features].copy() if self.groups is not None: cv = GroupKFold(n_splits=5) else: cv = StratifiedKFold(n_splits=5, shuffle=True) # See link about scoring for multiclassfication # http://iamirmasoud.com/2022/06/19/understanding-micro-macro-and-weighted-averages-for-scikit-learn-metrics-in-multi-class-classification-with-example/ if ml_category == "classification": nb = GaussianNB() model_cv = cross_validate( nb, X=X, y=self.y, cv=cv, groups=self.groups, n_jobs=-1, scoring=(metric) ) elif ml_category == "regression": lass = Lasso() model_cv = cross_validate( lass, X=X, y=y, cv=cv, groups=self.groups, n_jobs=-1, scoring=('r2') ) else: raise ValueError("ML type not yet implemented!") # Section of metrics' scores comparison. with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="Precision is ill-defined and being set to 0.0 due to no predicted samples. Use `zero_division` parameter to control this behavior.") metric_score = np.nanmean(model_cv["test_score"]) metric_score_std = np.nanstd(model_cv["test_score"]) if not best_feature or (metric_score > best_metric_score): best_feature = feat best_metric_score = metric_score best_metric_score_std = metric_score_std return best_feature, best_metric_score, best_metric_score_std def select_features(self, n_min=20, n_max=50, k=100, method="remove", ml_type="classification_bin", metric="recall", n_tolerance=10): """This method selects a set of features and returns them as a list. It returns number of features determined in the interval of [n_min, n_max]. The method consists of two steps: (1) The method uses sklearn kBest method which selects k best features dependent on the ml_type parameter. (2) The sequential features removal procedure is executed. Using the remaing features from (1). The best score is detected using a removal procedure. The procedure sequentially removes the features that attribute the least to the choosen evaluation metric. If in this sequence the score ML score is improved the next feature is remove otherwise there is a tolerance criteria (n_tolerance) with which the next n removed features are inspected whether currently best score is improved. Args: n_min (int, optional): Minimal amount of features returned. n_max (int, optional): Maximal amount of features returned. k (int, optional): Determines the k in the k-best features method. If None, SelectKBest feature selection does not execute. ml_type(str, optional): Type of ML problem. Currently implemented options: 'classification_bin', 'classification_multi', and 'regression_' method (str, optional): "remove" or "add" features. Defaults to "remove". n_tolerance (int, optional): If the best score is not improved in n that is specified by this parameter the method returns index of feature with current best score as a tipping point feature. Returns: list: list of selected features """ if k is not None and k <= n_max: raise ValueError("The k parameter needs to be greater than the n_max parameter.") # Select k-best feature dependent on the type of ML task ml_category, ml_subcategory = ml_type.split("_") if k is not None: if ml_category == "classification": if ml_subcategory== "bin": selector = SelectKBest(mutual_info_classif, k=k) elif ml_subcategory== "multi": selector = SelectKBest(f_classif, k=k) else: raise ValueError("Unknown ML type: cannot recognize ML classification subtype.") elif ml_category == "regression": selector = SelectKBest(f_regression, k=k) else: raise ValueError("Unknown ML type: cannot recognize ML type. Must be either classification or regression.") selector.fit(self.X, self.y) cols_idxs = selector.get_support(indices=True) self.X = self.X.iloc[:,cols_idxs] print("All columns (after SelectKBest method):") print(self.X.columns) # Sequential feature addition / removal n_features = self.X.shape[1] if n_max >= n_features: n_max = n_features-1 # The algorithm removes at least one feature if n_min > n_features: raise ValueError("The number of remaining features in the dataframe must be at least as n_min+1 parameter.") if n_max < n_min: raise ValueError("n_max parameter needs to be greater than or equal to n_min parameter.") features = self.X.columns.tolist() feature_importance = [] if method == "remove": best_score = 0 best_feature_indx = None i_worse = 0 for i in reversed(range(n_features)): if i+1 == n_min: break best_feature, best_metric_score, best_metric_score_std = \ self.select_best_feature(features, method=method, ml_category=ml_category, ml_subcategory=ml_subcategory, metric=metric) feature_importance.append((i+1, best_feature, best_metric_score, best_metric_score_std)) features.remove(best_feature) print("Features left:", i) if i <= n_max: if best_metric_score >= best_score: best_score = best_metric_score best_feature_indx = i+1 i_worse = 0 else: i_worse += 1 if i_worse == n_tolerance: break feature_importance_df = pd.DataFrame(feature_importance, columns=['i', 'name', 'metric', 'metric_sd']) print(feature_importance_df) print("best_feature_indx", best_feature_indx) print("best_score", best_score) features_to_remove = feature_importance_df[feature_importance_df["i"] >= best_feature_indx]["name"].values.tolist() selected_features = [feat for feat in self.X.columns.tolist() if feat not in features_to_remove] return selected_features else: raise ValueError("Method type not recognized: only the 'remove' method is currently implemented.")