stress_at_work_analysis/machine_learning/feature_selection.py

221 lines
10 KiB
Python

import os
import sys
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import Lasso
""" Feature selection pipeline: a methods that can be used in the wrapper metod alongside other wrapper contents (hyperparameter tuning etc.).
(1) Establish methods for each of the steps in feature selection protocol.
(2) Ensure that above methods are given only a part of data and use appropriate random seeds - to later simulate use case in production.
(3) Implement a method which gives graphical exploration of (1) (a) and (b) steps of the feature selection.
(4) Prepare a core method that can be fit into a wrapper (see sklearn wrapper methods) and integrates methods from (1)
"""
class FeatureSelection:
def __init__(self, X_train, X_test, y_train, y_test): # TODO: what about leave-one-subject-out CV?
pass # TODO....
def select_best_feature(df, features, method="remove", ml_type="classification", metric="recall", stored_features=[]):
"""The method selects the best feature by testing the prediction on the feature set with or without the current feature.
The "remove" method removes a particular feature and predicts on the test set without it. The "add" method adds a particulat
feature to the previously established feature set (stored_features). The best feature is selected dependent on the metric
specified as a parameter.
Args:
df (DataFrame): Input data on which the predictions will be made.
features (list): List of features to select the best/worst from
method (str, optional): remove or add features. Defaults to "remove".
ml_type (str, optional): Either classification or regression ml problem controls the ML algorithm and metric. Defaults to "classification".
metric (str, optional): Selected metric with which the best/worst feature will be determined. Defaults to "recall".
stored_features (list, optional): In case if method is 'add', stored features refer to the features that had been previously added. Defaults to [].
Raises:
ValueError: Raises if classification or regression metrics are not recognised if a specific ml_type is selected.
ValueError: If unknown ml_type is chosen.
Returns:
tuple: name of the best feature, best feature score, best feature score standard deviation.
"""
best_feature = None
if ml_type == "classification" and metric not in ['accuracy', 'precision', 'recall', 'f1']:
raise ValueError("Classification metric not recognized. Please choose 'accuracy', 'precision', 'recall' and/or 'f1'")
elif ml_type == "regression" and metric not in ['r2']:
raise ValueError("Regression metric not recognized. Please choose 'r2'")
for feat in features:
if method == "remove":
pred_features = [col for col in df.columns if feat != col] # All but feat
elif method == "add":
pred_features = [feat] + stored_features # Feat with stored features
X, y = df.drop(columns=['target', 'pid'])[pred_features], df['target']
if ml_type == "classification":
nb = GaussianNB()
model_cv = cross_validate(
nb,
X=X,
y=y,
cv=StratifiedKFold(n_splits=5, shuffle=True),
n_jobs=-1,
scoring=('accuracy', 'precision', 'recall', 'f1')
)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="Precision is ill-defined and being set to 0.0 due to no predicted samples. Use `zero_division` parameter to control this behavior.")
if metric == "accuracy":
acc = np.mean(model_cv['test_accuracy'])
acc_std = np.std(model_cv['test_accuracy'])
if not best_feature or (acc > best_metric_score):
best_feature = feat
best_metric_score = acc
best_metric_score_std = acc_std
elif metric == "precision":
prec = np.mean(model_cv['test_precision'])
prec_std = np.std(model_cv['test_precision'])
if not best_feature or (prec > best_metric_score):
best_feature = feat
best_metric_score = prec
best_metric_score_std = prec_std
elif metric == "recall":
rec = np.mean(model_cv['test_recall'])
rec_std = np.std(model_cv['test_recall'])
if not best_feature or (rec > best_metric_score):
best_feature = feat
best_metric_score = rec
best_metric_score_std = rec_std
else:
f1 = np.mean(model_cv['test_f1'])
f1_std = np.std(model_cv['test_f1'])
if not best_feature or (f1 > best_metric_score):
best_feature = feat
best_metric_score = f1
best_metric_score_std = f1_std
elif ml_type == "regression":
lass = Lasso()
model_cv = cross_validate(
lass,
X=X,
y=y,
cv=StratifiedKFold(n_splits=5, shuffle=True),
n_jobs=-1,
scoring=('r2')
)
if metric == "r2":
r2 = np.mean(model_cv['test_r2'])
r2_std = np.std(model_cv['test_r2'])
if not best_feature or (r2 > best_metric_score):
best_feature = feat
best_metric_score = r2
best_metric_score_std = r2_std
else:
raise ValueError("ML type not yet implemented!")
return best_feature, best_metric_score, best_metric_score_std
def select_features(df, n_min=20, n_max=50, method="remove", n_not_improve=10):
n_features = df.shape[1] - 2 # -2 beacause pid and target are not considered
if n_max > n_features:
n_max = n_features
if n_min > n_features:
raise ValueError("The number of features in the dataframe must be at least as n_min-1 parameter.")
if n_max < n_min:
raise ValueError("n_max parameter needs to be greater than or equal to n_min parameter.")
features = df.columns.tolist()
features.remove("pid")
features.remove("target")
feature_importance = []
if method == "remove":
for i in reversed(range(n_features)):
best_feature, best_metric_score, best_metric_score_std = \
self.select_best_feature(df, features, method=method, ml_type="classification", metric="recall")
feature_importance.append(tuple(i+1, best_feature, best_metric_score, best_metric_score_std))
features.remove(best_feature)
feature_importance_df = pd.DataFrame(feature_importance, columns=['i', 'name', 'metric', 'metric_sd'])
# Selekcijski kriterij značilk v rangu max-min
# Npr. izbira najboljšega score-a v tem rangu. Ali pa dokler se v tem rangu score zvišuje za 0.0X, ko se ne izberi tisti set značilk.
# Set značilk se bo izbral od i=1 do i=index_izbrane_značilke
# "Tipping point" značilka mora biti v rangu max-min
selection_area = feature_importance_df[(feature_importance_df["i"] >= n_min+1) & (feature_importance_df["i"] <= n_max)]
selection_area.set_index(["i", "name"], inplace=True)
diffrences = selection_area.diff()
diffrences.dropna(how='any', inplace=True)
# Morda tudi komulativna sumacija? Kjer se preprosto index z najvišjo vrednostjo
cumulative_sumation = diffrences.cumsum()
tipping_feature_indx_1 = cumulative_sumation.idxmax()["metric"]
# Zelo konzervativna metoda, ki ob prvem neizboljšanjem rezultata preneha z iskanjem boljše alternative
tipping_feature_indx_2 = None
for indx, row in diffrences.iterrows():
if row["metric"] > 0:
tipping_feature_indx_2 = indx
else:
break
# Metoda, ki pusti n_not_improve značilkam, da premagajo dosedajno najboljši score
tipping_feature_indx_3 = None
cum_sum_score = 0
i_worse = 0
# TODO: morda bi bilo smisleno združiti diff, cumsum in scores stolpce ...
for indx, row in selection_area.iterrows():
if row["metric"] > 0:
tipping_feature_indx_3 = indx
cum_sum_score += row["metric"]
i_worse = 0
else:
i_worse += 1
if i_worse == n_not_improve:
break
def make_predictions_with_features(df, groups_substrings, include_group=True, with_cols=[], print_flag=False):
pass
def vizualize_feature_selection_process():
pass
def execute_feature_selection_step():
pass