from pathlib import Path import numpy as np import pandas as pd from sklearn import ( ensemble, gaussian_process, kernel_ridge, linear_model, naive_bayes, svm, ) from sklearn.dummy import DummyClassifier, DummyRegressor from sklearn.metrics import confusion_matrix from sklearn.model_selection import ( BaseCrossValidator, LeaveOneGroupOut, StratifiedKFold, cross_validate, ) from xgboost import XGBClassifier, XGBRegressor def safe_outer_merge_on_index(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame: if left.empty: return right elif right.empty: return left else: return pd.merge( left, right, how="outer", left_index=True, right_index=True, validate="one_to_one", ) def to_csv_with_settings( df: pd.DataFrame, folder: Path, filename_prefix: str, data_type: str ) -> None: full_path = construct_full_path(folder, filename_prefix, data_type) df.to_csv( path_or_buf=full_path, sep=",", na_rep="NA", header=True, index=True, encoding="utf-8", ) print("Exported the dataframe to " + str(full_path)) def read_csv_with_settings( folder: Path, filename_prefix: str, data_type: str, grouping_variable: list ) -> pd.DataFrame: full_path = construct_full_path(folder, filename_prefix, data_type) return pd.read_csv( filepath_or_buffer=full_path, sep=",", header=0, na_values="NA", encoding="utf-8", index_col=(["participant_id"] + grouping_variable), parse_dates=True, infer_datetime_format=True, cache_dates=True, ) def construct_full_path(folder: Path, filename_prefix: str, data_type: str) -> Path: export_filename = filename_prefix + "_" + data_type + ".csv" full_path = folder / export_filename return full_path def insert_row(df, row): return pd.concat([df, pd.DataFrame([row], columns=df.columns)], ignore_index=True) def impute_encode_categorical_features(model_input: pd.DataFrame) -> pd.DataFrame: categorical_feature_col_names = [ "gender", "startlanguage", "limesurvey_demand_control_ratio_quartile", ] additional_categorical_features = [ col for col in model_input.columns if "mostcommonactivity" in col or "homelabel" in col ] categorical_feature_col_names += additional_categorical_features categorical_features = model_input[categorical_feature_col_names].copy() mode_categorical_features = categorical_features.mode().iloc[0] # fillna with mode categorical_features = categorical_features.fillna(mode_categorical_features) # one-hot encoding categorical_features = categorical_features.apply( lambda col: col.astype("category") ) if not categorical_features.empty: categorical_features = pd.get_dummies(categorical_features) numerical_features = model_input.drop(categorical_feature_col_names, axis=1) model_input = pd.concat([numerical_features, categorical_features], axis=1) return model_input def prepare_sklearn_data_format( model_input: pd.DataFrame, cv_method: str = "logo" ) -> tuple: index_columns = [ "local_segment", "local_segment_label", "local_segment_start_datetime", "local_segment_end_datetime", ] model_input.set_index(index_columns, inplace=True) if cv_method == "half_logo": model_input["pid_index"] = model_input.groupby("pid").cumcount() model_input["pid_count"] = model_input.groupby("pid")["pid"].transform("count") model_input["pid_index"] = ( model_input["pid_index"] / model_input["pid_count"] + 1 ).round() model_input["pid_half"] = ( model_input["pid"] + "_" + model_input["pid_index"].astype(int).astype(str) ) data_x, data_y, data_groups = ( model_input.drop(["target", "pid", "pid_index", "pid_half"], axis=1), model_input["target"], model_input["pid_half"], ) else: data_x, data_y, data_groups = ( model_input.drop(["target", "pid"], axis=1), model_input["target"], model_input["pid"], ) return data_x, data_y, data_groups def prepare_cross_validator( data_x: pd.DataFrame, data_y: pd.DataFrame, data_groups: pd.DataFrame, cv_method: str = "logo", ) -> BaseCrossValidator: if cv_method == "logo" or cv_method == "half_logo": cv = LeaveOneGroupOut() cv.get_n_splits( data_x, data_y, groups=data_groups, ) else: cv = StratifiedKFold(n_splits=5, shuffle=True) return cv def aggregate_and_transpose(df: pd.DataFrame, statistics=None) -> pd.DataFrame: if statistics is None: statistics = ["max", "mean"] return ( df.agg(statistics) .transpose() .reset_index() .rename(columns={"index": "test_metric"}) ) def run_all_regression_models( data_x: pd.DataFrame, data_y: pd.DataFrame, data_groups: pd.DataFrame, cross_validator: BaseCrossValidator, ) -> pd.DataFrame: metrics = ["r2", "neg_mean_absolute_error", "neg_root_mean_squared_error"] test_metrics = ["test_" + metric for metric in metrics] scores = pd.DataFrame(columns=["method", "test_metric", "max", "nanmedian"]) # Validate models dummy_regr = DummyRegressor(strategy="mean") dummy_regr_scores = cross_validate( dummy_regr, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) print("Dummy model:") print("R^2: ", np.nanmedian(dummy_regr_scores["test_r2"])) scores_df = pd.DataFrame(dummy_regr_scores)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian]) scores_df["method"] = "dummy" scores = pd.concat([scores, scores_df]) del dummy_regr del dummy_regr_scores lin_reg = linear_model.LinearRegression() lin_reg_scores = cross_validate( lin_reg, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) print("Linear regression:") print("R^2: ", np.nanmedian(lin_reg_scores["test_r2"])) scores_df = pd.DataFrame(lin_reg_scores)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian]) scores_df["method"] = "linear_reg" scores = pd.concat([scores, scores_df]) del lin_reg del lin_reg_scores ridge_reg = linear_model.Ridge(alpha=0.5) ridge_reg_scores = cross_validate( ridge_reg, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) print("Ridge regression") scores_df = pd.DataFrame(ridge_reg_scores)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian]) scores_df["method"] = "ridge_reg" scores = pd.concat([scores, scores_df]) del ridge_reg del ridge_reg_scores lasso_reg = linear_model.Lasso(alpha=0.1) lasso_reg_score = cross_validate( lasso_reg, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) print("Lasso regression") scores_df = pd.DataFrame(lasso_reg_score)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian]) scores_df["method"] = "lasso_reg" scores = pd.concat([scores, scores_df]) del lasso_reg del lasso_reg_score bayesian_ridge_reg = linear_model.BayesianRidge() bayesian_ridge_reg_score = cross_validate( bayesian_ridge_reg, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) print("Bayesian Ridge") scores_df = pd.DataFrame(bayesian_ridge_reg_score)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian]) scores_df["method"] = "bayesian_ridge" scores = pd.concat([scores, scores_df]) del bayesian_ridge_reg del bayesian_ridge_reg_score ransac_reg = linear_model.RANSACRegressor() ransac_reg_score = cross_validate( ransac_reg, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) print("RANSAC (outlier robust regression)") scores_df = pd.DataFrame(ransac_reg_score)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian]) scores_df["method"] = "RANSAC" scores = pd.concat([scores, scores_df]) del ransac_reg del ransac_reg_score svr = svm.SVR() svr_score = cross_validate( svr, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) print("Support vector regression") scores_df = pd.DataFrame(svr_score)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian]) scores_df["method"] = "SVR" scores = pd.concat([scores, scores_df]) del svr del svr_score kridge = kernel_ridge.KernelRidge() kridge_score = cross_validate( kridge, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) print("Kernel Ridge regression") scores_df = pd.DataFrame(kridge_score)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian]) scores_df["method"] = "kernel_ridge" scores = pd.concat([scores, scores_df]) del kridge del kridge_score gpr = gaussian_process.GaussianProcessRegressor() gpr_score = cross_validate( gpr, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) print("Gaussian Process Regression") scores_df = pd.DataFrame(gpr_score)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian]) scores_df["method"] = "gaussian_proc" scores = pd.concat([scores, scores_df]) del gpr del gpr_score rfr = ensemble.RandomForestRegressor(max_features=0.3, n_jobs=-1) rfr_score = cross_validate( rfr, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) print("Random Forest Regression") scores_df = pd.DataFrame(rfr_score)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian]) scores_df["method"] = "random_forest" scores = pd.concat([scores, scores_df]) del rfr del rfr_score xgb = XGBRegressor() xgb_score = cross_validate( xgb, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) print("XGBoost Regressor") scores_df = pd.DataFrame(xgb_score)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian]) scores_df["method"] = "XGBoost" scores = pd.concat([scores, scores_df]) del xgb del xgb_score ada = ensemble.AdaBoostRegressor() ada_score = cross_validate( ada, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) print("ADA Boost Regressor") scores_df = pd.DataFrame(ada_score)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", np.nanmedian]) scores_df["method"] = "ADA_boost" scores = pd.concat([scores, scores_df]) del ada del ada_score return scores def confusion_matrix_scorer(clf, X, y): y_pred = clf.predict(X) cm = confusion_matrix(y, y_pred) return {"tn": cm[0, 0], "fp": cm[0, 1], "fn": cm[1, 0], "tp": cm[1, 1]} def aggregate_confusion_matrix(scores_dict: dict) -> pd.DataFrame: scores_aggregated = aggregate_and_transpose( pd.DataFrame(scores_dict), statistics=["sum"] ) return scores_aggregated[ ~scores_aggregated.test_metric.isin(["fit_time", "score_time"]) ] def run_all_classification_models( data_x: pd.DataFrame, data_y: pd.DataFrame, data_groups: pd.DataFrame, cross_validator: BaseCrossValidator, ): data_y_value_counts = data_y.value_counts() if len(data_y_value_counts) == 1: raise (ValueError("There is only one unique value in data_y.")) if len(data_y_value_counts) == 2: metrics = ["accuracy", "average_precision", "recall", "f1"] else: metrics = ["accuracy", "precision_micro", "recall_micro", "f1_micro"] test_metrics = ["test_" + metric for metric in metrics] scores = pd.DataFrame(columns=["method", "test_metric", "max", "mean"]) dummy_class = DummyClassifier(strategy="most_frequent") dummy_score = cross_validate( dummy_class, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, error_score="raise", scoring=metrics, ) dummy_confusion_matrix = cross_validate( dummy_class, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, error_score="raise", scoring=confusion_matrix_scorer, ) print("Dummy") scores_df = pd.DataFrame(dummy_score)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"]) scores_df = pd.concat( [ scores_df, aggregate_confusion_matrix(dummy_confusion_matrix).rename( columns={"sum": "mean"} # Note: the column is misleadingly renamed to get concise output. ), ] ) scores_df["method"] = "dummy_classifier" scores = pd.concat([scores, scores_df]) del dummy_class del dummy_score del dummy_confusion_matrix logistic_regression = linear_model.LogisticRegression() log_reg_scores = cross_validate( logistic_regression, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) log_reg_confusion_matrix = cross_validate( logistic_regression, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=confusion_matrix_scorer, ) print("Logistic regression") scores_df = pd.DataFrame(log_reg_scores)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"]) scores_df = pd.concat( [ scores_df, aggregate_confusion_matrix(log_reg_confusion_matrix).rename( columns={"sum": "mean"} # Note: the column is misleadingly renamed to get concise output. ), ] ) scores_df["method"] = "logistic_regression" scores = pd.concat([scores, scores_df]) del logistic_regression del log_reg_scores del log_reg_confusion_matrix svc = svm.SVC() svc_scores = cross_validate( svc, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) svc_confusion_matrix = cross_validate( svc, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=confusion_matrix_scorer, ) print("Support Vector Machine") scores_df = pd.DataFrame(svc_scores)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"]) scores_df = pd.concat( [ scores_df, aggregate_confusion_matrix(svc_confusion_matrix).rename( columns={"sum": "mean"} # Note: the column is misleadingly renamed to get concise output. ), ] ) scores_df["method"] = "SVC" scores = pd.concat([scores, scores_df]) del svc del svc_scores del svc_confusion_matrix gaussian_nb = naive_bayes.GaussianNB() gaussian_nb_scores = cross_validate( gaussian_nb, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) gaussian_nb_confusion_matrix = cross_validate( gaussian_nb, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=confusion_matrix_scorer, ) print("Gaussian Naive Bayes") scores_df = pd.DataFrame(gaussian_nb_scores)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"]) scores_df = pd.concat( [ scores_df, aggregate_confusion_matrix(gaussian_nb_confusion_matrix).rename( columns={"sum": "mean"} # Note: the column is misleadingly renamed to get concise output. ), ] ) scores_df["method"] = "gaussian_naive_bayes" scores = pd.concat([scores, scores_df]) del gaussian_nb del gaussian_nb_scores del gaussian_nb_confusion_matrix sgdc = linear_model.SGDClassifier() sgdc_scores = cross_validate( sgdc, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) sgdc_confusion_matrix = cross_validate( sgdc, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=confusion_matrix_scorer, ) print("Stochastic Gradient Descent") scores_df = pd.DataFrame(sgdc_scores)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"]) scores_df = pd.concat( [ scores_df, aggregate_confusion_matrix(sgdc_confusion_matrix).rename( columns={"sum": "mean"} # Note: the column is misleadingly renamed to get concise output. ), ] ) scores_df["method"] = "stochastic_gradient_descent_classifier" scores = pd.concat([scores, scores_df]) del sgdc del sgdc_scores del sgdc_confusion_matrix rfc = ensemble.RandomForestClassifier() rfc_scores = cross_validate( rfc, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) rfc_confusion_matrix = cross_validate( rfc, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=confusion_matrix_scorer, ) print("Random Forest") scores_df = pd.DataFrame(rfc_scores)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"]) scores_df = pd.concat( [ scores_df, aggregate_confusion_matrix(rfc_confusion_matrix).rename( columns={"sum": "mean"} # Note: the column is misleadingly renamed to get concise output. ), ] ) scores_df["method"] = "random_forest_classifier" scores = pd.concat([scores, scores_df]) del rfc del rfc_scores del rfc_confusion_matrix xgb_classifier = XGBClassifier() xgb_scores = cross_validate( xgb_classifier, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=metrics, ) xgb_confusion_matrix = cross_validate( xgb_classifier, X=data_x, y=data_y, groups=data_groups, cv=cross_validator, n_jobs=-1, scoring=confusion_matrix_scorer, ) print("XGBoost") scores_df = pd.DataFrame(xgb_scores)[test_metrics] scores_df = aggregate_and_transpose(scores_df, statistics=["max", "mean"]) scores_df = pd.concat( [ scores_df, aggregate_confusion_matrix(xgb_confusion_matrix).rename( columns={"sum": "mean"} # Note: the column is misleadingly renamed to get concise output. ), ] ) scores_df["method"] = "XGBoost_classifier" scores = pd.concat([scores, scores_df]) del xgb_classifier del xgb_scores del xgb_confusion_matrix return scores