stress_at_work_analysis/exploration/ml_pipeline_classification.py

463 lines
18 KiB
Python

# ---
# jupyter:
# jupytext:
# formats: ipynb,py:percent
# text_representation:
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.13.0
# kernelspec:
# display_name: straw2analysis
# language: python
# name: straw2analysis
# ---
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
# %matplotlib inline
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn import linear_model, svm, naive_bayes, neighbors, tree, ensemble
from sklearn.model_selection import LeaveOneGroupOut, cross_validate, StratifiedKFold
from sklearn.dummy import DummyClassifier
from sklearn.impute import SimpleImputer
from lightgbm import LGBMClassifier
import xgboost as xg
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
nb_dir = os.path.split(os.getcwd())[0]
if nb_dir not in sys.path:
sys.path.append(nb_dir)
import machine_learning.helper
# %% [markdown]
# # RAPIDS models
# %% [markdown]
# ## Set script's parameters
#
# %% jupyter={"source_hidden": false, "outputs_hidden": false} nteract={"transient": {"deleting": false}}
cv_method_str = 'logo' # logo, half_logo, 5kfold # Cross-validation method (could be regarded as a hyperparameter)
n_sl = 3 # Number of largest/smallest accuracies (of particular CV) outputs
undersampling = False # (bool) If True this will train and test data on balanced dataset (using undersampling method)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
model_input = pd.read_csv("../data/stressfulness_event_with_target_0_ver2/input_appraisal_stressfulness_event_mean.csv")
# model_input = model_input[model_input.columns.drop(list(model_input.filter(regex='empatica_temperature')))]
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
index_columns = ["local_segment", "local_segment_label", "local_segment_start_datetime", "local_segment_end_datetime"]
model_input.set_index(index_columns, inplace=True)
model_input['target'].value_counts()
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
# bins = [-10, 0, 10] # bins for z-scored targets
bins = [-1, 0, 4] # bins for stressfulness (0-4) target
model_input['target'], edges = pd.cut(model_input.target, bins=bins, labels=['low', 'high'], retbins=True, right=True) #['low', 'medium', 'high']
model_input['target'].value_counts(), edges
# model_input = model_input[model_input['target'] != "medium"]
model_input['target'] = model_input['target'].astype(str).apply(lambda x: 0 if x == "low" else 1)
model_input['target'].value_counts()
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
# UnderSampling
if undersampling:
no_stress = model_input[model_input['target'] == 0]
stress = model_input[model_input['target'] == 1]
no_stress = no_stress.sample(n=len(stress))
model_input = pd.concat([stress,no_stress], axis=0)
# model_input_new = pd.DataFrame(columns=model_input.columns)
# for pid in model_input["pid"].unique():
# stress = model_input[(model_input["pid"] == pid) & (model_input['target'] == 1)]
# no_stress = model_input[(model_input["pid"] == pid) & (model_input['target'] == 0)]
# if (len(stress) == 0):
# continue
# if (len(no_stress) == 0):
# continue
# model_input_new = pd.concat([model_input_new, stress], axis=0)
# no_stress = no_stress.sample(n=min(len(stress), len(no_stress)))
# # In case there are more stress samples than no_stress, take all instances of no_stress.
# model_input_new = pd.concat([model_input_new, no_stress], axis=0)
# model_input = model_input_new
# model_input_new = pd.concat([model_input_new, no_stress], axis=0)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
if cv_method_str == 'half_logo':
model_input['pid_index'] = model_input.groupby('pid').cumcount()
model_input['pid_count'] = model_input.groupby('pid')['pid'].transform('count')
model_input["pid_index"] = (model_input['pid_index'] / model_input['pid_count'] + 1).round()
model_input["pid_half"] = model_input["pid"] + "_" + model_input["pid_index"].astype(int).astype(str)
data_x, data_y, data_groups = model_input.drop(["target", "pid", "pid_index", "pid_half"], axis=1), model_input["target"], model_input["pid_half"]
else:
data_x, data_y, data_groups = model_input.drop(["target", "pid"], axis=1), model_input["target"], model_input["pid"]
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
categorical_feature_colnames = ["gender", "startlanguage"]
additional_categorical_features = [col for col in data_x.columns if "mostcommonactivity" in col or "homelabel" in col]
categorical_feature_colnames += additional_categorical_features
categorical_features = data_x[categorical_feature_colnames].copy()
mode_categorical_features = categorical_features.mode().iloc[0]
# fillna with mode
categorical_features = categorical_features.fillna(mode_categorical_features)
# one-hot encoding
categorical_features = categorical_features.apply(lambda col: col.astype("category"))
if not categorical_features.empty:
categorical_features = pd.get_dummies(categorical_features)
numerical_features = data_x.drop(categorical_feature_colnames, axis=1)
train_x = pd.concat([numerical_features, categorical_features], axis=1)
train_x.dtypes
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
cv_method = StratifiedKFold(n_splits=5, shuffle=True) # Defaults to 5 k-folds in cross_validate method
if cv_method_str == 'logo' or cv_method_str == 'half_logo':
cv_method = LeaveOneGroupOut()
cv_method.get_n_splits(
train_x,
data_y,
groups=data_groups,
)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
imputer = SimpleImputer(missing_values=np.nan, strategy='median')
# %% [markdown]
# ### Baseline: Dummy Classifier (most frequent)
# %% jupyter={"source_hidden": false, "outputs_hidden": false} nteract={"transient": {"deleting": false}}
dummy_class = DummyClassifier(strategy="most_frequent")
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
dummy_classifier = cross_validate(
dummy_class,
X=imputer.fit_transform(train_x),
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
error_score='raise',
scoring=('accuracy', 'precision', 'recall', 'f1')
)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
print("Acc (median)", np.nanmedian(dummy_classifier['test_accuracy']))
print("Acc (mean)", np.mean(dummy_classifier['test_accuracy']))
print("Precision", np.mean(dummy_classifier['test_precision']))
print("Recall", np.mean(dummy_classifier['test_recall']))
print("F1", np.mean(dummy_classifier['test_f1']))
print(f"Largest {n_sl} ACC:", np.sort(-np.partition(-dummy_classifier['test_accuracy'], n_sl)[:n_sl])[::-1])
print(f"Smallest {n_sl} ACC:", np.sort(np.partition(dummy_classifier['test_accuracy'], n_sl)[:n_sl]))
# %% [markdown] nteract={"transient": {"deleting": false}}
# ### All models
# %% jupyter={"source_hidden": false, "outputs_hidden": false} nteract={"transient": {"deleting": false}}
final_scores = machine_learning.helper.run_all_classification_models(imputer.fit_transform(train_x), data_y, data_groups, cv_method)
# %% jupyter={"source_hidden": false, "outputs_hidden": false} nteract={"transient": {"deleting": false}}
# %%
final_scores.index.name = "metric"
final_scores = final_scores.set_index(["method", final_scores.index])
final_scores.to_csv(f"../presentation/event_stressful_detection_{cv_method_str}.csv")
# %% [markdown]
# ### Logistic Regression
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
logistic_regression = linear_model.LogisticRegression()
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
log_reg_scores = cross_validate(
logistic_regression,
X=imputer.fit_transform(train_x),
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
scoring=('accuracy', 'precision', 'recall', 'f1')
)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
print("Acc (median)", np.nanmedian(log_reg_scores['test_accuracy']))
print("Acc (mean)", np.mean(log_reg_scores['test_accuracy']))
print("Precision", np.mean(log_reg_scores['test_precision']))
print("Recall", np.mean(log_reg_scores['test_recall']))
print("F1", np.mean(log_reg_scores['test_f1']))
print(f"Largest {n_sl} ACC:", np.sort(-np.partition(-log_reg_scores['test_accuracy'], n_sl)[:n_sl])[::-1])
print(f"Smallest {n_sl} ACC:", np.sort(np.partition(log_reg_scores['test_accuracy'], n_sl)[:n_sl]))
# %% [markdown]
# ### Support Vector Machine
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
svc = svm.SVC()
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
svc_scores = cross_validate(
svc,
X=imputer.fit_transform(train_x),
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
scoring=('accuracy', 'precision', 'recall', 'f1')
)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
print("Acc (median)", np.nanmedian(svc_scores['test_accuracy']))
print("Acc (mean)", np.mean(svc_scores['test_accuracy']))
print("Precision", np.mean(svc_scores['test_precision']))
print("Recall", np.mean(svc_scores['test_recall']))
print("F1", np.mean(svc_scores['test_f1']))
print(f"Largest {n_sl} ACC:", np.sort(-np.partition(-svc_scores['test_accuracy'], n_sl)[:n_sl])[::-1])
print(f"Smallest {n_sl} ACC:", np.sort(np.partition(svc_scores['test_accuracy'], n_sl)[:n_sl]))
# %% [markdown]
# ### Gaussian Naive Bayes
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
gaussian_nb = naive_bayes.GaussianNB()
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
gaussian_nb_scores = cross_validate(
gaussian_nb,
X=imputer.fit_transform(train_x),
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
error_score='raise',
scoring=('accuracy', 'precision', 'recall', 'f1')
)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
print("Acc (median)", np.nanmedian(gaussian_nb_scores['test_accuracy']))
print("Acc (mean)", np.mean(gaussian_nb_scores['test_accuracy']))
print("Precision", np.mean(gaussian_nb_scores['test_precision']))
print("Recall", np.mean(gaussian_nb_scores['test_recall']))
print("F1", np.mean(gaussian_nb_scores['test_f1']))
print(f"Largest {n_sl} ACC:", np.sort(-np.partition(-gaussian_nb_scores['test_accuracy'], n_sl)[:n_sl])[::-1])
print(f"Smallest {n_sl} ACC:", np.sort(np.partition(gaussian_nb_scores['test_accuracy'], n_sl)[:n_sl]))
# %% [markdown]
# ### Stochastic Gradient Descent Classifier
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
sgdc = linear_model.SGDClassifier()
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
sgdc_scores = cross_validate(
sgdc,
X=imputer.fit_transform(train_x),
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
error_score='raise',
scoring=('accuracy', 'precision', 'recall', 'f1')
)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
print("Acc (median)", np.nanmedian(sgdc_scores['test_accuracy']))
print("Acc (mean)", np.mean(sgdc_scores['test_accuracy']))
print("Precision", np.mean(sgdc_scores['test_precision']))
print("Recall", np.mean(sgdc_scores['test_recall']))
print("F1", np.mean(sgdc_scores['test_f1']))
print(f"Largest {n_sl} ACC:", np.sort(-np.partition(-sgdc_scores['test_accuracy'], n_sl)[:n_sl])[::-1])
print(f"Smallest {n_sl} ACC:", np.sort(np.partition(sgdc_scores['test_accuracy'], n_sl)[:n_sl]))
# %% [markdown]
# ### K-nearest neighbors
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
knn = neighbors.KNeighborsClassifier()
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
knn_scores = cross_validate(
knn,
X=imputer.fit_transform(train_x),
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
error_score='raise',
scoring=('accuracy', 'precision', 'recall', 'f1')
)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
print("Acc (median)", np.nanmedian(knn_scores['test_accuracy']))
print("Acc (mean)", np.mean(knn_scores['test_accuracy']))
print("Precision", np.mean(knn_scores['test_precision']))
print("Recall", np.mean(knn_scores['test_recall']))
print("F1", np.mean(knn_scores['test_f1']))
print(f"Largest {n_sl} ACC:", np.sort(-np.partition(-knn_scores['test_accuracy'], n_sl)[:n_sl])[::-1])
print(f"Smallest {n_sl} ACC:", np.sort(np.partition(knn_scores['test_accuracy'], n_sl)[:n_sl]))
# %% [markdown]
# ### Decision Tree
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
dtree = tree.DecisionTreeClassifier()
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
dtree_scores = cross_validate(
dtree,
X=imputer.fit_transform(train_x),
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
error_score='raise',
scoring=('accuracy', 'precision', 'recall', 'f1')
)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
print("Acc (median)", np.nanmedian(dtree_scores['test_accuracy']))
print("Acc (mean)", np.mean(dtree_scores['test_accuracy']))
print("Precision", np.mean(dtree_scores['test_precision']))
print("Recall", np.mean(dtree_scores['test_recall']))
print("F1", np.mean(dtree_scores['test_f1']))
print(f"Largest {n_sl} ACC:", np.sort(-np.partition(-dtree_scores['test_accuracy'], n_sl)[:n_sl])[::-1])
print(f"Smallest {n_sl} ACC:", np.sort(np.partition(dtree_scores['test_accuracy'], n_sl)[:n_sl]))
# %% [markdown]
# ### Random Forest Classifier
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
rfc = ensemble.RandomForestClassifier()
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
rfc_scores = cross_validate(
rfc,
X=imputer.fit_transform(train_x),
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
error_score='raise',
scoring=('accuracy', 'precision', 'recall', 'f1'),
return_estimator=True
)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
print("Acc (median)", np.nanmedian(rfc_scores['test_accuracy']))
print("Acc (mean)", np.mean(rfc_scores['test_accuracy']))
print("Precision", np.mean(rfc_scores['test_precision']))
print("Recall", np.mean(rfc_scores['test_recall']))
print("F1", np.mean(rfc_scores['test_f1']))
print(f"Largest {n_sl} ACC:", np.sort(-np.partition(-rfc_scores['test_accuracy'], n_sl)[:n_sl])[::-1])
print(f"Smallest {n_sl} ACC:", np.sort(np.partition(rfc_scores['test_accuracy'], n_sl)[:n_sl]))
# %% [markdown]
# ### Feature importance (RFC)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
rfc_es_fimp = pd.DataFrame(columns=list(train_x.columns))
for idx, estimator in enumerate(rfc_scores['estimator']):
feature_importances = pd.DataFrame(estimator.feature_importances_,
index = list(train_x.columns),
columns=['importance'])
# print("\nFeatures sorted by their score for estimator {}:".format(idx))
# print(feature_importances.sort_values('importance', ascending=False).head(10))
rfc_es_fimp = pd.concat([rfc_es_fimp, feature_importances]).groupby(level=0).mean()
pd.set_option('display.max_rows', 100)
print(rfc_es_fimp.sort_values('importance', ascending=False).head(30))
rfc_es_fimp.sort_values('importance', ascending=False).head(30).plot.bar()
rfc_es_fimp.sort_values('importance', ascending=False).tail(30).plot.bar()
train_x['empatica_temperature_cr_stdDev_X_SO_mean'].value_counts()
# %% [markdown]
# ### Gradient Boosting Classifier
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
gbc = ensemble.GradientBoostingClassifier()
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
gbc_scores = cross_validate(
gbc,
X=imputer.fit_transform(train_x),
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
error_score='raise',
scoring=('accuracy', 'precision', 'recall', 'f1')
)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
print("Acc (median)", np.nanmedian(gbc_scores['test_accuracy']))
print("Acc (mean)", np.mean(gbc_scores['test_accuracy']))
print("Precision", np.mean(gbc_scores['test_precision']))
print("Recall", np.mean(gbc_scores['test_recall']))
print("F1", np.mean(gbc_scores['test_f1']))
print(f"Largest {n_sl} ACC:", np.sort(-np.partition(-gbc_scores['test_accuracy'], n_sl)[:n_sl])[::-1])
print(f"Smallest {n_sl} ACC:", np.sort(np.partition(gbc_scores['test_accuracy'], n_sl)[:n_sl]))
# %% [markdown]
# ### LGBM Classifier
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
lgbm = LGBMClassifier()
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
lgbm_scores = cross_validate(
lgbm,
X=imputer.fit_transform(train_x),
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
error_score='raise',
scoring=('accuracy', 'precision', 'recall', 'f1')
)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
print("Acc (median)", np.nanmedian(lgbm_scores['test_accuracy']))
print("Acc (mean)", np.mean(lgbm_scores['test_accuracy']))
print("Precision", np.mean(lgbm_scores['test_precision']))
print("Recall", np.mean(lgbm_scores['test_recall']))
print("F1", np.mean(lgbm_scores['test_f1']))
print(f"Largest {n_sl} ACC:", np.sort(-np.partition(-lgbm_scores['test_accuracy'], n_sl)[:n_sl])[::-1])
print(f"Smallest {n_sl} ACC:", np.sort(np.partition(lgbm_scores['test_accuracy'], n_sl)[:n_sl]))
# %% [markdown]
# ### XGBoost Classifier
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
xgb_classifier = xg.sklearn.XGBClassifier()
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
xgb_classifier_scores = cross_validate(
xgb_classifier,
X=imputer.fit_transform(train_x),
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
error_score='raise',
scoring=('accuracy', 'precision', 'recall', 'f1')
)
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
print("Acc (median)", np.nanmedian(xgb_classifier_scores['test_accuracy']))
print("Acc (mean)", np.mean(xgb_classifier_scores['test_accuracy']))
print("Precision", np.mean(xgb_classifier_scores['test_precision']))
print("Recall", np.mean(xgb_classifier_scores['test_recall']))
print("F1", np.mean(xgb_classifier_scores['test_f1']))
print(f"Largest {n_sl} ACC:", np.sort(-np.partition(-xgb_classifier_scores['test_accuracy'], n_sl)[:n_sl])[::-1])
print(f"Smallest {n_sl} ACC:", np.sort(np.partition(xgb_classifier_scores['test_accuracy'], n_sl)[:n_sl]))