stress_at_work_analysis/machine_learning/helper.py

468 lines
14 KiB
Python

from pathlib import Path
import numpy as np
import pandas as pd
from sklearn import (
ensemble,
gaussian_process,
kernel_ridge,
linear_model,
naive_bayes,
svm,
)
from sklearn.dummy import DummyClassifier, DummyRegressor
from sklearn.model_selection import LeaveOneGroupOut, cross_validate
from xgboost import XGBClassifier, XGBRegressor
def safe_outer_merge_on_index(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
if left.empty:
return right
elif right.empty:
return left
else:
return pd.merge(
left,
right,
how="outer",
left_index=True,
right_index=True,
validate="one_to_one",
)
def to_csv_with_settings(
df: pd.DataFrame, folder: Path, filename_prefix: str, data_type: str
) -> None:
full_path = construct_full_path(folder, filename_prefix, data_type)
df.to_csv(
path_or_buf=full_path,
sep=",",
na_rep="NA",
header=True,
index=True,
encoding="utf-8",
)
print("Exported the dataframe to " + str(full_path))
def read_csv_with_settings(
folder: Path, filename_prefix: str, data_type: str, grouping_variable: list
) -> pd.DataFrame:
full_path = construct_full_path(folder, filename_prefix, data_type)
return pd.read_csv(
filepath_or_buffer=full_path,
sep=",",
header=0,
na_values="NA",
encoding="utf-8",
index_col=(["participant_id"] + grouping_variable),
parse_dates=True,
infer_datetime_format=True,
cache_dates=True,
)
def construct_full_path(folder: Path, filename_prefix: str, data_type: str) -> Path:
export_filename = filename_prefix + "_" + data_type + ".csv"
full_path = folder / export_filename
return full_path
def insert_row(df, row):
return pd.concat([df, pd.DataFrame([row], columns=df.columns)], ignore_index=True)
def prepare_regression_model_input(model_input, cv_method="logo"):
index_columns = [
"local_segment",
"local_segment_label",
"local_segment_start_datetime",
"local_segment_end_datetime",
]
model_input.set_index(index_columns, inplace=True)
if cv_method == "logo":
data_x, data_y, data_groups = (
model_input.drop(["target", "pid"], axis=1),
model_input["target"],
model_input["pid"],
)
else:
model_input["pid_index"] = model_input.groupby("pid").cumcount()
model_input["pid_count"] = model_input.groupby("pid")["pid"].transform("count")
model_input["pid_index"] = (
model_input["pid_index"] / model_input["pid_count"] + 1
).round()
model_input["pid_half"] = (
model_input["pid"] + "_" + model_input["pid_index"].astype(int).astype(str)
)
data_x, data_y, data_groups = (
model_input.drop(["target", "pid", "pid_index", "pid_half"], axis=1),
model_input["target"],
model_input["pid_half"],
)
categorical_feature_colnames = [
"gender",
"startlanguage",
"limesurvey_demand_control_ratio_quartile",
]
additional_categorical_features = [
col
for col in data_x.columns
if "mostcommonactivity" in col or "homelabel" in col
]
categorical_feature_colnames += additional_categorical_features
categorical_features = data_x[categorical_feature_colnames].copy()
mode_categorical_features = categorical_features.mode().iloc[0]
# fillna with mode
categorical_features = categorical_features.fillna(mode_categorical_features)
# one-hot encoding
categorical_features = categorical_features.apply(
lambda col: col.astype("category")
)
if not categorical_features.empty:
categorical_features = pd.get_dummies(categorical_features)
numerical_features = data_x.drop(categorical_feature_colnames, axis=1)
train_x = pd.concat([numerical_features, categorical_features], axis=1)
return train_x, data_y, data_groups
def run_all_regression_models(input_csv):
# Prepare data
data_x, data_y, data_groups = prepare_regression_model_input(input_csv)
# Prepare cross validation
logo = LeaveOneGroupOut()
logo.get_n_splits(
data_x,
data_y,
groups=data_groups,
)
metrics = ["r2", "neg_mean_absolute_error", "neg_root_mean_squared_error"]
test_metrics = ["test_" + metric for metric in metrics]
scores = pd.DataFrame(columns=["method", "max", "nanmedian"])
# Validate models
dummy_regr = DummyRegressor(strategy="mean")
dummy_regr_scores = cross_validate(
dummy_regr,
X=data_x,
y=data_y,
groups=data_groups,
cv=logo,
n_jobs=-1,
scoring=metrics,
)
print("Dummy model:")
print("R^2: ", np.nanmedian(dummy_regr_scores["test_r2"]))
scores_df = pd.DataFrame(dummy_regr_scores)[test_metrics]
scores_df = scores_df.agg(["max", np.nanmedian]).transpose()
scores_df["method"] = "dummy"
scores = pd.concat([scores, scores_df])
lin_reg_rapids = linear_model.LinearRegression()
lin_reg_scores = cross_validate(
lin_reg_rapids,
X=data_x,
y=data_y,
groups=data_groups,
cv=logo,
n_jobs=-1,
scoring=metrics,
)
print("Linear regression:")
print("R^2: ", np.nanmedian(lin_reg_scores["test_r2"]))
scores_df = pd.DataFrame(lin_reg_scores)[test_metrics]
scores_df = scores_df.agg(["max", np.nanmedian]).transpose()
scores_df["method"] = "linear_reg"
scores = pd.concat([scores, scores_df])
ridge_reg = linear_model.Ridge(alpha=0.5)
ridge_reg_scores = cross_validate(
ridge_reg,
X=data_x,
y=data_y,
groups=data_groups,
cv=logo,
n_jobs=-1,
scoring=metrics,
)
print("Ridge regression")
scores_df = pd.DataFrame(ridge_reg_scores)[test_metrics]
scores_df = scores_df.agg(["max", np.nanmedian]).transpose()
scores_df["method"] = "ridge_reg"
scores = pd.concat([scores, scores_df])
lasso_reg = linear_model.Lasso(alpha=0.1)
lasso_reg_score = cross_validate(
lasso_reg,
X=data_x,
y=data_y,
groups=data_groups,
cv=logo,
n_jobs=-1,
scoring=metrics,
)
print("Lasso regression")
scores_df = pd.DataFrame(lasso_reg_score)[test_metrics]
scores_df = scores_df.agg(["max", np.nanmedian]).transpose()
scores_df["method"] = "lasso_reg"
scores = pd.concat([scores, scores_df])
bayesian_ridge_reg = linear_model.BayesianRidge()
bayesian_ridge_reg_score = cross_validate(
bayesian_ridge_reg,
X=data_x,
y=data_y,
groups=data_groups,
cv=logo,
n_jobs=-1,
scoring=metrics,
)
print("Bayesian Ridge")
scores_df = pd.DataFrame(bayesian_ridge_reg_score)[test_metrics]
scores_df = scores_df.agg(["max", np.nanmedian]).transpose()
scores_df["method"] = "bayesian_ridge"
scores = pd.concat([scores, scores_df])
ransac_reg = linear_model.RANSACRegressor()
ransac_reg_score = cross_validate(
ransac_reg,
X=data_x,
y=data_y,
groups=data_groups,
cv=logo,
n_jobs=-1,
scoring=metrics,
)
print("RANSAC (outlier robust regression)")
scores_df = pd.DataFrame(ransac_reg_score)[test_metrics]
scores_df = scores_df.agg(["max", np.nanmedian]).transpose()
scores_df["method"] = "RANSAC"
scores = pd.concat([scores, scores_df])
svr = svm.SVR()
svr_score = cross_validate(
svr, X=data_x, y=data_y, groups=data_groups, cv=logo, n_jobs=-1, scoring=metrics
)
print("Support vector regression")
scores_df = pd.DataFrame(svr_score)[test_metrics]
scores_df = scores_df.agg(["max", np.nanmedian]).transpose()
scores_df["method"] = "SVR"
scores = pd.concat([scores, scores_df])
kridge = kernel_ridge.KernelRidge()
kridge_score = cross_validate(
kridge,
X=data_x,
y=data_y,
groups=data_groups,
cv=logo,
n_jobs=-1,
scoring=metrics,
)
print("Kernel Ridge regression")
scores_df = pd.DataFrame(kridge_score)[test_metrics]
scores_df = scores_df.agg(["max", np.nanmedian]).transpose()
scores_df["method"] = "kernel_ridge"
scores = pd.concat([scores, scores_df])
gpr = gaussian_process.GaussianProcessRegressor()
gpr_score = cross_validate(
gpr, X=data_x, y=data_y, groups=data_groups, cv=logo, n_jobs=-1, scoring=metrics
)
print("Gaussian Process Regression")
scores_df = pd.DataFrame(gpr_score)[test_metrics]
scores_df = scores_df.agg(["max", np.nanmedian]).transpose()
scores_df["method"] = "gaussian_proc"
scores = pd.concat([scores, scores_df])
rfr = ensemble.RandomForestRegressor(max_features=0.3, n_jobs=-1)
rfr_score = cross_validate(
rfr, X=data_x, y=data_y, groups=data_groups, cv=logo, n_jobs=-1, scoring=metrics
)
print("Random Forest Regression")
scores_df = pd.DataFrame(rfr_score)[test_metrics]
scores_df = scores_df.agg(["max", np.nanmedian]).transpose()
scores_df["method"] = "random_forest"
scores = pd.concat([scores, scores_df])
xgb = XGBRegressor()
xgb_score = cross_validate(
xgb, X=data_x, y=data_y, groups=data_groups, cv=logo, n_jobs=-1, scoring=metrics
)
print("XGBoost Regressor")
scores_df = pd.DataFrame(xgb_score)[test_metrics]
scores_df = scores_df.agg(["max", np.nanmedian]).transpose()
scores_df["method"] = "XGBoost"
scores = pd.concat([scores, scores_df])
ada = ensemble.AdaBoostRegressor()
ada_score = cross_validate(
ada, X=data_x, y=data_y, groups=data_groups, cv=logo, n_jobs=-1, scoring=metrics
)
print("ADA Boost Regressor")
scores_df = pd.DataFrame(ada_score)[test_metrics]
scores_df = scores_df.agg(["max", np.nanmedian]).transpose()
scores_df["method"] = "ADA_boost"
scores = pd.concat([scores, scores_df])
return scores
def run_all_classification_models(data_x, data_y, data_groups, cv_method):
metrics = ["accuracy", "average_precision", "recall", "f1"]
test_metrics = ["test_" + metric for metric in metrics]
scores = pd.DataFrame(columns=["method", "max", "mean"])
dummy_class = DummyClassifier(strategy="most_frequent")
dummy_score = cross_validate(
dummy_class,
X=data_x,
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
error_score="raise",
scoring=metrics,
)
print("Dummy")
scores_df = pd.DataFrame(dummy_score)[test_metrics]
scores_df = scores_df.agg(["max", "mean"]).transpose()
scores_df["method"] = "Dummy"
scores = pd.concat([scores, scores_df])
logistic_regression = linear_model.LogisticRegression()
log_reg_scores = cross_validate(
logistic_regression,
X=data_x,
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
scoring=metrics,
)
print("Logistic regression")
scores_df = pd.DataFrame(log_reg_scores)[test_metrics]
scores_df = scores_df.agg(["max", "mean"]).transpose()
scores_df["method"] = "logistic_reg"
scores = pd.concat([scores, scores_df])
svc = svm.SVC()
svc_scores = cross_validate(
svc,
X=data_x,
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
scoring=metrics,
)
print("Support Vector Machine")
scores_df = pd.DataFrame(svc_scores)[test_metrics]
scores_df = scores_df.agg(["max", "mean"]).transpose()
scores_df["method"] = "svc"
scores = pd.concat([scores, scores_df])
gaussian_nb = naive_bayes.GaussianNB()
gaussian_nb_scores = cross_validate(
gaussian_nb,
X=data_x,
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
scoring=metrics,
)
print("Gaussian Naive Bayes")
scores_df = pd.DataFrame(gaussian_nb_scores)[test_metrics]
scores_df = scores_df.agg(["max", "mean"]).transpose()
scores_df["method"] = "gaussian_naive_bayes"
scores = pd.concat([scores, scores_df])
sgdc = linear_model.SGDClassifier()
sgdc_scores = cross_validate(
sgdc,
X=data_x,
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
scoring=metrics,
)
print("Stochastic Gradient Descent")
scores_df = pd.DataFrame(sgdc_scores)[test_metrics]
scores_df = scores_df.agg(["max", "mean"]).transpose()
scores_df["method"] = "stochastic_gradient_descent"
scores = pd.concat([scores, scores_df])
rfc = ensemble.RandomForestClassifier()
rfc_scores = cross_validate(
rfc,
X=data_x,
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
scoring=metrics,
)
print("Random Forest")
scores_df = pd.DataFrame(rfc_scores)[test_metrics]
scores_df = scores_df.agg(["max", "mean"]).transpose()
scores_df["method"] = "random_forest"
scores = pd.concat([scores, scores_df])
xgb_classifier = XGBClassifier()
xgb_scores = cross_validate(
xgb_classifier,
X=data_x,
y=data_y,
groups=data_groups,
cv=cv_method,
n_jobs=-1,
scoring=metrics,
)
print("XGBoost")
scores_df = pd.DataFrame(xgb_scores)[test_metrics]
scores_df = scores_df.agg(["max", "mean"]).transpose()
scores_df["method"] = "xgboost"
scores = pd.concat([scores, scores_df])
return scores