stress_at_work_analysis/machine_learning/helper.py

731 lines
20 KiB
Python

from pathlib import Path
import numpy as np
import pandas as pd
from sklearn import (
ensemble,
gaussian_process,
kernel_ridge,
linear_model,
naive_bayes,
svm,
)
from sklearn.dummy import DummyClassifier, DummyRegressor
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import (
BaseCrossValidator,
LeaveOneGroupOut,
StratifiedKFold,
cross_validate,
)
from xgboost import XGBClassifier, XGBRegressor
def safe_outer_merge_on_index(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
if left.empty:
return right
elif right.empty:
return left
else:
return pd.merge(
left,
right,
how="outer",
left_index=True,
right_index=True,
validate="one_to_one",
)
def to_csv_with_settings(
df: pd.DataFrame, folder: Path, filename_prefix: str, data_type: str
) -> None:
full_path = construct_full_path(folder, filename_prefix, data_type)
df.to_csv(
path_or_buf=full_path,
sep=",",
na_rep="NA",
header=True,
index=True,
encoding="utf-8",
)
print("Exported the dataframe to " + str(full_path))
def read_csv_with_settings(
folder: Path, filename_prefix: str, data_type: str, grouping_variable: list
) -> pd.DataFrame:
full_path = construct_full_path(folder, filename_prefix, data_type)
return pd.read_csv(
filepath_or_buffer=full_path,
sep=",",
header=0,
na_values="NA",
encoding="utf-8",
index_col=(["participant_id"] + grouping_variable),
parse_dates=True,
infer_datetime_format=True,
cache_dates=True,
)
def construct_full_path(folder: Path, filename_prefix: str, data_type: str) -> Path:
export_filename = filename_prefix + "_" + data_type + ".csv"
full_path = folder / export_filename
return full_path
def insert_row(df, row):
return pd.concat([df, pd.DataFrame([row], columns=df.columns)], ignore_index=True)
def impute_encode_categorical_features(model_input: pd.DataFrame) -> pd.DataFrame:
categorical_feature_col_names = [
"gender",
"startlanguage",
"limesurvey_demand_control_ratio_quartile",
]
additional_categorical_features = [
col
for col in model_input.columns
if "mostcommonactivity" in col or "homelabel" in col
]
categorical_feature_col_names += additional_categorical_features
categorical_features = model_input[categorical_feature_col_names].copy()
mode_categorical_features = categorical_features.mode().iloc[0]
# fillna with mode
categorical_features = categorical_features.fillna(mode_categorical_features)
# one-hot encoding
categorical_features = categorical_features.apply(
lambda col: col.astype("category")
)
if not categorical_features.empty:
categorical_features = pd.get_dummies(categorical_features)
numerical_features = model_input.drop(categorical_feature_col_names, axis=1)
model_input = pd.concat([numerical_features, categorical_features], axis=1)
return model_input
def prepare_sklearn_data_format(
model_input: pd.DataFrame, cv_method: str = "logo"
) -> tuple:
index_columns = [
"local_segment",
"local_segment_label",
"local_segment_start_datetime",
"local_segment_end_datetime",
]
model_input.set_index(index_columns, inplace=True)
if cv_method == "half_logo":
model_input["pid_index"] = model_input.groupby("pid").cumcount()
model_input["pid_count"] = model_input.groupby("pid")["pid"].transform("count")
model_input["pid_index"] = (
model_input["pid_index"] / model_input["pid_count"] + 1
).round()
model_input["pid_half"] = (
model_input["pid"] + "_" + model_input["pid_index"].astype(int).astype(str)
)
data_x, data_y, data_groups = (
model_input.drop(["target", "pid", "pid_index", "pid_half"], axis=1),
model_input["target"],
model_input["pid_half"],
)
else:
data_x, data_y, data_groups = (
model_input.drop(["target", "pid"], axis=1),
model_input["target"],
model_input["pid"],
)
return data_x, data_y, data_groups
def prepare_cross_validator(
data_x: pd.DataFrame,
data_y: pd.DataFrame,
data_groups: pd.DataFrame,
cv_method: str = "logo",
) -> BaseCrossValidator:
if cv_method == "logo" or cv_method == "half_logo":
cv = LeaveOneGroupOut()
cv.get_n_splits(
data_x,
data_y,
groups=data_groups,
)
else:
cv = StratifiedKFold(n_splits=5, shuffle=True)
return cv
def aggregate_and_transpose(df: pd.DataFrame, statistics=None) -> pd.DataFrame:
if statistics is None:
statistics = ["max", "mean"]
return (
df.agg(statistics)
.transpose()
.reset_index()
.rename(columns={"index": "test_metric"})
)
def run_all_regression_models(
data_x: pd.DataFrame,
data_y: pd.DataFrame,
data_groups: pd.DataFrame,
cross_validator: BaseCrossValidator,
) -> pd.DataFrame:
metrics = ["r2", "neg_mean_absolute_error", "neg_root_mean_squared_error"]
test_metrics = ["test_" + metric for metric in metrics]
scores = pd.DataFrame(columns=["method", "test_metric", "max", "nanmedian"])
# Validate models
dummy_regr = DummyRegressor(strategy="mean")
dummy_regr_scores = cross_validate(
dummy_regr,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
print("Dummy model:")
print("R^2: ", np.nanmedian(dummy_regr_scores["test_r2"]))
scores_df = pd.DataFrame(dummy_regr_scores)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian])
scores_df["method"] = "dummy"
scores = pd.concat([scores, scores_df])
del dummy_regr
del dummy_regr_scores
lin_reg = linear_model.LinearRegression()
lin_reg_scores = cross_validate(
lin_reg,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
print("Linear regression:")
print("R^2: ", np.nanmedian(lin_reg_scores["test_r2"]))
scores_df = pd.DataFrame(lin_reg_scores)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian])
scores_df["method"] = "linear_reg"
scores = pd.concat([scores, scores_df])
del lin_reg
del lin_reg_scores
ridge_reg = linear_model.Ridge(alpha=0.5)
ridge_reg_scores = cross_validate(
ridge_reg,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
print("Ridge regression")
scores_df = pd.DataFrame(ridge_reg_scores)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian])
scores_df["method"] = "ridge_reg"
scores = pd.concat([scores, scores_df])
del ridge_reg
del ridge_reg_scores
lasso_reg = linear_model.Lasso(alpha=0.1)
lasso_reg_score = cross_validate(
lasso_reg,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
print("Lasso regression")
scores_df = pd.DataFrame(lasso_reg_score)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian])
scores_df["method"] = "lasso_reg"
scores = pd.concat([scores, scores_df])
del lasso_reg
del lasso_reg_score
bayesian_ridge_reg = linear_model.BayesianRidge()
bayesian_ridge_reg_score = cross_validate(
bayesian_ridge_reg,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
print("Bayesian Ridge")
scores_df = pd.DataFrame(bayesian_ridge_reg_score)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian])
scores_df["method"] = "bayesian_ridge"
scores = pd.concat([scores, scores_df])
del bayesian_ridge_reg
del bayesian_ridge_reg_score
ransac_reg = linear_model.RANSACRegressor()
ransac_reg_score = cross_validate(
ransac_reg,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
print("RANSAC (outlier robust regression)")
scores_df = pd.DataFrame(ransac_reg_score)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian])
scores_df["method"] = "RANSAC"
scores = pd.concat([scores, scores_df])
del ransac_reg
del ransac_reg_score
svr = svm.SVR()
svr_score = cross_validate(
svr,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
print("Support vector regression")
scores_df = pd.DataFrame(svr_score)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian])
scores_df["method"] = "SVR"
scores = pd.concat([scores, scores_df])
del svr
del svr_score
kridge = kernel_ridge.KernelRidge()
kridge_score = cross_validate(
kridge,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
print("Kernel Ridge regression")
scores_df = pd.DataFrame(kridge_score)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian])
scores_df["method"] = "kernel_ridge"
scores = pd.concat([scores, scores_df])
del kridge
del kridge_score
gpr = gaussian_process.GaussianProcessRegressor()
gpr_score = cross_validate(
gpr,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
print("Gaussian Process Regression")
scores_df = pd.DataFrame(gpr_score)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian])
scores_df["method"] = "gaussian_proc"
scores = pd.concat([scores, scores_df])
del gpr
del gpr_score
rfr = ensemble.RandomForestRegressor(max_features=0.3, n_jobs=-1)
rfr_score = cross_validate(
rfr,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
print("Random Forest Regression")
scores_df = pd.DataFrame(rfr_score)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian])
scores_df["method"] = "random_forest"
scores = pd.concat([scores, scores_df])
del rfr
del rfr_score
xgb = XGBRegressor()
xgb_score = cross_validate(
xgb,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
print("XGBoost Regressor")
scores_df = pd.DataFrame(xgb_score)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian])
scores_df["method"] = "XGBoost"
scores = pd.concat([scores, scores_df])
del xgb
del xgb_score
ada = ensemble.AdaBoostRegressor()
ada_score = cross_validate(
ada,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
print("ADA Boost Regressor")
scores_df = pd.DataFrame(ada_score)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian])
scores_df["method"] = "ADA_boost"
scores = pd.concat([scores, scores_df])
del ada
del ada_score
return scores
def confusion_matrix_scorer(clf, X, y):
y_pred = clf.predict(X)
cm = confusion_matrix(y, y_pred)
return {"tn": cm[0, 0], "fp": cm[0, 1], "fn": cm[1, 0], "tp": cm[1, 1]}
def aggregate_confusion_matrix(scores_dict: dict) -> pd.DataFrame:
scores_aggregated = aggregate_and_transpose(
pd.DataFrame(scores_dict), statistics=["sum"]
)
return scores_aggregated[
~scores_aggregated.test_metric.isin(["fit_time", "score_time"])
]
def run_all_classification_models(
data_x: pd.DataFrame,
data_y: pd.DataFrame,
data_groups: pd.DataFrame,
cross_validator: BaseCrossValidator,
):
data_y_value_counts = data_y.value_counts()
if len(data_y_value_counts) == 1:
raise (ValueError("There is only one unique value in data_y."))
if len(data_y_value_counts) == 2:
metrics = ["accuracy", "average_precision", "recall", "f1"]
else:
metrics = ["accuracy", "precision_micro", "recall_micro", "f1_micro"]
test_metrics = ["test_" + metric for metric in metrics]
scores = pd.DataFrame(columns=["method", "test_metric", "max", "mean"])
dummy_class = DummyClassifier(strategy="most_frequent")
dummy_score = cross_validate(
dummy_class,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
error_score="raise",
scoring=metrics,
)
dummy_confusion_matrix = cross_validate(
dummy_class,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
error_score="raise",
scoring=confusion_matrix_scorer,
)
print("Dummy")
scores_df = pd.DataFrame(dummy_score)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"])
scores_df = pd.concat(
[
scores_df,
aggregate_confusion_matrix(dummy_confusion_matrix).rename(
columns={"sum": "mean"}
# Note: the column is misleadingly renamed to get concise output.
),
]
)
scores_df["method"] = "dummy_classifier"
scores = pd.concat([scores, scores_df])
del dummy_class
del dummy_score
del dummy_confusion_matrix
logistic_regression = linear_model.LogisticRegression()
log_reg_scores = cross_validate(
logistic_regression,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
log_reg_confusion_matrix = cross_validate(
logistic_regression,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=confusion_matrix_scorer,
)
print("Logistic regression")
scores_df = pd.DataFrame(log_reg_scores)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"])
scores_df = pd.concat(
[
scores_df,
aggregate_confusion_matrix(log_reg_confusion_matrix).rename(
columns={"sum": "mean"}
# Note: the column is misleadingly renamed to get concise output.
),
]
)
scores_df["method"] = "logistic_regression"
scores = pd.concat([scores, scores_df])
del logistic_regression
del log_reg_scores
del log_reg_confusion_matrix
svc = svm.SVC()
svc_scores = cross_validate(
svc,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
svc_confusion_matrix = cross_validate(
svc,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=confusion_matrix_scorer,
)
print("Support Vector Machine")
scores_df = pd.DataFrame(svc_scores)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"])
scores_df = pd.concat(
[
scores_df,
aggregate_confusion_matrix(svc_confusion_matrix).rename(
columns={"sum": "mean"}
# Note: the column is misleadingly renamed to get concise output.
),
]
)
scores_df["method"] = "SVC"
scores = pd.concat([scores, scores_df])
del svc
del svc_scores
del svc_confusion_matrix
gaussian_nb = naive_bayes.GaussianNB()
gaussian_nb_scores = cross_validate(
gaussian_nb,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
gaussian_nb_confusion_matrix = cross_validate(
gaussian_nb,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=confusion_matrix_scorer,
)
print("Gaussian Naive Bayes")
scores_df = pd.DataFrame(gaussian_nb_scores)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"])
scores_df = pd.concat(
[
scores_df,
aggregate_confusion_matrix(gaussian_nb_confusion_matrix).rename(
columns={"sum": "mean"}
# Note: the column is misleadingly renamed to get concise output.
),
]
)
scores_df["method"] = "gaussian_naive_bayes"
scores = pd.concat([scores, scores_df])
del gaussian_nb
del gaussian_nb_scores
del gaussian_nb_confusion_matrix
sgdc = linear_model.SGDClassifier()
sgdc_scores = cross_validate(
sgdc,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
sgdc_confusion_matrix = cross_validate(
sgdc,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=confusion_matrix_scorer,
)
print("Stochastic Gradient Descent")
scores_df = pd.DataFrame(sgdc_scores)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"])
scores_df = pd.concat(
[
scores_df,
aggregate_confusion_matrix(sgdc_confusion_matrix).rename(
columns={"sum": "mean"}
# Note: the column is misleadingly renamed to get concise output.
),
]
)
scores_df["method"] = "stochastic_gradient_descent_classifier"
scores = pd.concat([scores, scores_df])
del sgdc
del sgdc_scores
del sgdc_confusion_matrix
rfc = ensemble.RandomForestClassifier()
rfc_scores = cross_validate(
rfc,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
rfc_confusion_matrix = cross_validate(
rfc,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=confusion_matrix_scorer,
)
print("Random Forest")
scores_df = pd.DataFrame(rfc_scores)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"])
scores_df = pd.concat(
[
scores_df,
aggregate_confusion_matrix(rfc_confusion_matrix).rename(
columns={"sum": "mean"}
# Note: the column is misleadingly renamed to get concise output.
),
]
)
scores_df["method"] = "random_forest_classifier"
scores = pd.concat([scores, scores_df])
del rfc
del rfc_scores
del rfc_confusion_matrix
xgb_classifier = XGBClassifier()
xgb_scores = cross_validate(
xgb_classifier,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=metrics,
)
xgb_confusion_matrix = cross_validate(
xgb_classifier,
X=data_x,
y=data_y,
groups=data_groups,
cv=cross_validator,
n_jobs=-1,
scoring=confusion_matrix_scorer,
)
print("XGBoost")
scores_df = pd.DataFrame(xgb_scores)[test_metrics]
scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"])
scores_df = pd.concat(
[
scores_df,
aggregate_confusion_matrix(xgb_confusion_matrix).rename(
columns={"sum": "mean"}
# Note: the column is misleadingly renamed to get concise output.
),
]
)
scores_df["method"] = "XGBoost_classifier"
scores = pd.concat([scores, scores_df])
del xgb_classifier
del xgb_scores
del xgb_confusion_matrix
return scores